[tracker/tracker-0.10] libtracker-miner: Add bulk operations to the MinerFS processing pool
- From: Martyn James Russell <mr src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/tracker-0.10] libtracker-miner: Add bulk operations to the MinerFS processing pool
- Date: Thu, 10 Mar 2011 11:25:06 +0000 (UTC)
commit d5771d725ecd48bee40f5f14e32f2439277cec07
Author: Carlos Garnacho <carlosg gnome org>
Date: Mon Mar 7 18:47:57 2011 +0100
libtracker-miner: Add bulk operations to the MinerFS processing pool
These bulk operations are grouped when flushing the processing queue, so
several operations are send as a single sparql expression.
.../tracker-miner-fs-processing-pool.c | 299 +++++++++++++++++---
.../tracker-miner-fs-processing-pool.h | 10 +
2 files changed, 263 insertions(+), 46 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index d71b2eb..e2628eb 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -129,12 +129,33 @@ typedef enum {
TRACKER_PROCESSING_TASK_STATUS_LAST
} TrackerProcessingTaskStatus;
+typedef struct {
+ const gchar *bulk_operation;
+ GList *tasks;
+ gchar *sparql;
+} BulkOperationMerge;
+
+typedef enum {
+ CONTENT_NONE,
+ CONTENT_SPARQL_STRING,
+ CONTENT_SPARQL_BUILDER,
+ CONTENT_BULK_OPERATION
+} TaskContentType;
+
struct _TrackerProcessingTask {
/* The file being processed */
GFile *file;
- /* The FULL sparql to be updated in the store */
- TrackerSparqlBuilder *sparql;
- gchar *sparql_string;
+
+ TaskContentType content;
+
+ union {
+ TrackerSparqlBuilder *builder;
+ gchar *string;
+ struct {
+ const gchar *bulk_operation;
+ TrackerBulkMatchType match;
+ } bulk;
+ } data;
/* The context of the task */
gpointer context;
@@ -195,6 +216,20 @@ tracker_processing_task_new (GFile *file)
return task;
}
+static void
+tracker_processing_task_data_unset (TrackerProcessingTask *task)
+{
+ if (task->content == CONTENT_SPARQL_STRING) {
+ g_free (task->data.string);
+ } else if (task->content == CONTENT_SPARQL_BUILDER) {
+ if (task->data.builder) {
+ g_object_unref (task->data.builder);
+ }
+ }
+
+ task->content = CONTENT_NONE;
+}
+
void
tracker_processing_task_free (TrackerProcessingTask *task)
{
@@ -210,10 +245,9 @@ tracker_processing_task_free (TrackerProcessingTask *task)
task->context_free_func) {
task->context_free_func (task->context);
}
- if (task->sparql) {
- g_object_unref (task->sparql);
- }
- g_free (task->sparql_string);
+
+ tracker_processing_task_data_unset (task);
+
g_object_unref (task->file);
g_slice_free (TrackerProcessingTask, task);
}
@@ -249,31 +283,41 @@ void
tracker_processing_task_set_sparql (TrackerProcessingTask *task,
TrackerSparqlBuilder *sparql)
{
- if (task->sparql) {
- g_object_unref (task->sparql);
- }
- if (task->sparql_string) {
- g_free (task->sparql_string);
- task->sparql_string = NULL;
+ tracker_processing_task_data_unset (task);
+
+ if (sparql) {
+ task->data.builder = g_object_ref (sparql);
+ task->content = CONTENT_SPARQL_BUILDER;
}
- task->sparql = g_object_ref (sparql);
}
void
tracker_processing_task_set_sparql_string (TrackerProcessingTask *task,
gchar *sparql_string)
{
- if (task->sparql) {
- g_object_unref (task->sparql);
- task->sparql = NULL;
- }
- if (task->sparql_string) {
- g_free (task->sparql_string);
+ tracker_processing_task_data_unset (task);
+
+ if (sparql_string) {
+ /* We take ownership of the input string! */
+ task->data.string = sparql_string;
+ task->content = CONTENT_SPARQL_STRING;
}
- /* We take ownership of the input string! */
- task->sparql_string = sparql_string;
}
+void
+tracker_processing_task_set_bulk_operation (TrackerProcessingTask *task,
+ const gchar *sparql,
+ TrackerBulkMatchType match)
+{
+ tracker_processing_task_data_unset (task);
+
+ if (sparql) {
+ /* This string is expected to remain constant */
+ task->data.bulk.bulk_operation = sparql;
+ task->data.bulk.match = match;
+ task->content = CONTENT_BULK_OPERATION;
+ }
+}
/*------------------- PROCESSING POOL ----------------------*/
@@ -569,10 +613,13 @@ tracker_processing_pool_sparql_update_array_cb (GObject *object,
tracker_processing_pool_remove_task (task->pool, task);
/* Call finished handler with the error, if any */
- task->finished_handler (task, task->finished_user_data,
- (global_error ?
- global_error :
- g_ptr_array_index (sparql_array_errors, i)));
+ task->finished_handler (task, task->finished_user_data, global_error);
+
+ /* FIXME: sparql_array_errors don't match the task list anymore.
+ * (global_error ?
+ * global_error :
+ * g_ptr_array_index (sparql_array_errors, i)));
+ */
/* No need to deallocate the task here, it will be done when
* unref-ing the GPtrArray below */
@@ -586,18 +633,121 @@ tracker_processing_pool_sparql_update_array_cb (GObject *object,
g_clear_error (&global_error);
}
+static void
+bulk_operation_merge_finish (BulkOperationMerge *merge)
+{
+ if (merge->sparql) {
+ g_free (merge->sparql);
+ merge->sparql = NULL;
+ }
+
+ if (merge->bulk_operation && merge->tasks) {
+ GString *equals_string = NULL, *children_string = NULL, *sparql;
+ guint n_equals = 0;
+ GList *l;
+
+ for (l = merge->tasks; l; l = l->next) {
+ TrackerProcessingTask *task = l->data;
+ gchar *uri;
+
+ uri = g_file_get_uri (task->file);
+
+ if (task->data.bulk.match & TRACKER_BULK_MATCH_EQUALS) {
+ if (!equals_string) {
+ equals_string = g_string_new ("");
+ } else {
+ g_string_append_c (equals_string, ',');
+ }
+
+ g_string_append_printf (equals_string, "\"%s\"", uri);
+ n_equals++;
+ }
+
+ if (task->data.bulk.match & TRACKER_BULK_MATCH_CHILDREN) {
+ if (!children_string) {
+ children_string = g_string_new ("");
+ } else {
+ g_string_append_c (children_string, ',');
+ }
+
+ g_string_append_printf (children_string, "\"%s\"", uri);
+ }
+
+ g_free (uri);
+ }
+
+ sparql = g_string_new ("");
+
+ if (equals_string) {
+ g_string_append (sparql, merge->bulk_operation);
+
+ if (n_equals == 1) {
+ g_string_append_printf (sparql,
+ " WHERE { "
+ " ?f nie:url %s"
+ "} ",
+ equals_string->str);
+ } else {
+ g_string_append_printf (sparql,
+ " WHERE { "
+ " ?f nie:url ?u ."
+ " FILTER (?u IN (%s))"
+ "} ",
+ equals_string->str);
+ }
+
+ g_string_free (equals_string, TRUE);
+ }
+
+ if (children_string) {
+ g_string_append (sparql, merge->bulk_operation);
+ g_string_append_printf (sparql,
+ " WHERE { "
+ " ?f nie:url ?u ."
+ " FILTER (tracker:uri-is-descendant (%s, ?u))"
+ "} ",
+ children_string->str);
+ g_string_free (children_string, TRUE);
+ }
+
+ merge->sparql = g_string_free (sparql, FALSE);
+ }
+}
+
+static BulkOperationMerge *
+bulk_operation_merge_new (const gchar *bulk_operation)
+{
+ BulkOperationMerge *operation;
+
+ operation = g_slice_new0 (BulkOperationMerge);
+ operation->bulk_operation = bulk_operation;
+
+ return operation;
+}
+
+static void
+bulk_operation_merge_free (BulkOperationMerge *operation)
+{
+ g_list_free (operation->tasks);
+ g_free (operation->sparql);
+ g_slice_free (BulkOperationMerge, operation);
+}
+
void
tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
const gchar *reason)
{
- guint i;
- gchar **sparql_array;
+ GPtrArray *bulk_ops;
+ GArray *sparql_array;
+ guint i, j;
if (!pool->sparql_buffer)
return;
/* Loop buffer and construct array of strings */
- sparql_array = g_new (gchar *, pool->sparql_buffer->len);
+ sparql_array = g_array_new (FALSE, TRUE, sizeof (gchar *));
+ bulk_ops = g_ptr_array_new_with_free_func ((GDestroyNotify) bulk_operation_merge_free);
+
for (i = 0; i < pool->sparql_buffer->len; i++) {
TrackerProcessingTask *task;
@@ -613,10 +763,44 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
task->pool = pool;
g_queue_push_head (pool->tasks[TRACKER_PROCESSING_TASK_STATUS_PROCESSING], task);
- /* Add original string, not a duplicate */
- sparql_array[i] = (task->sparql ?
- (gchar *) tracker_sparql_builder_get_result (task->sparql) :
- task->sparql_string);
+ if (task->content == CONTENT_SPARQL_STRING) {
+ g_array_append_val (sparql_array, task->data.string);
+ } else if (task->content == CONTENT_SPARQL_BUILDER) {
+ const gchar *str = tracker_sparql_builder_get_result (task->data.builder);
+ g_array_append_val (sparql_array, str);
+ } else if (task->content == CONTENT_BULK_OPERATION) {
+ BulkOperationMerge *bulk = NULL;
+ gint j;
+
+ for (j = 0; j < bulk_ops->len; j++) {
+ BulkOperationMerge *cur;
+
+ cur = g_ptr_array_index (bulk_ops, j);
+
+ if (cur->bulk_operation == task->data.bulk.bulk_operation) {
+ bulk = cur;
+ break;
+ }
+ }
+
+ if (!bulk) {
+ bulk = bulk_operation_merge_new (task->data.bulk.bulk_operation);
+ g_ptr_array_add (bulk_ops, bulk);
+ }
+
+ bulk->tasks = g_list_prepend (bulk->tasks, task);
+ }
+ }
+
+ for (j = 0; j < bulk_ops->len; j++) {
+ BulkOperationMerge *bulk;
+
+ bulk = g_ptr_array_index (bulk_ops, j);
+ bulk_operation_merge_finish (bulk);
+
+ if (bulk->sparql) {
+ g_array_prepend_val (sparql_array, bulk->sparql);
+ }
}
trace ("(Processing Pool %s) Flushing array-update of tasks %p with %u items (%s)",
@@ -626,8 +810,8 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
reason ? reason : "Unknown reason");
tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
- sparql_array,
- pool->sparql_buffer->len,
+ (gchar **) sparql_array->data,
+ sparql_array->len,
G_PRIORITY_DEFAULT,
NULL,
tracker_processing_pool_sparql_update_array_cb,
@@ -640,7 +824,10 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
}
/* Clear temp buffer */
- g_free (sparql_array);
+ g_array_free (sparql_array, TRUE);
+
+ g_ptr_array_free (bulk_ops, TRUE);
+
pool->sparql_buffer_start_time = 0;
/* Note the whole buffer is passed to the update_array callback,
* so no need to free it. */
@@ -656,8 +843,8 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
{
GList *previous;
- /* The task MUST have a proper SPARQL here */
- g_assert (task->sparql != NULL || task->sparql_string != NULL);
+ /* The task MUST have a proper content here */
+ g_assert (task->content != CONTENT_NONE);
/* First, check if the task was already added as being WAITING */
previous = g_queue_find (pool->tasks[TRACKER_PROCESSING_TASK_STATUS_WAIT], task);
@@ -677,6 +864,8 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
/* If buffering not requested, OR the limit of READY tasks is actually 1,
* flush previous buffer (if any) and then the new update */
if (!buffer || pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1) {
+ const gchar *sparql = NULL;
+
trace ("(Processing Pool %s) Pushed READY/PROCESSING task %p for file '%s'",
G_OBJECT_TYPE_NAME (pool->miner),
task,
@@ -695,14 +884,32 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
task);
/* And update the new one */
- tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
- (task->sparql ?
- tracker_sparql_builder_get_result (task->sparql) :
- task->sparql_string),
- G_PRIORITY_DEFAULT,
- NULL,
- tracker_processing_pool_sparql_update_cb,
- task);
+ if (task->content == CONTENT_SPARQL_STRING) {
+ sparql = task->data.string;
+ } else if (task->content == CONTENT_SPARQL_BUILDER) {
+ sparql = tracker_sparql_builder_get_result (task->data.builder);
+ } else if (task->content == CONTENT_BULK_OPERATION) {
+ BulkOperationMerge *operation;
+
+ operation = bulk_operation_merge_new (task->data.bulk.bulk_operation);
+ operation->tasks = g_list_prepend (NULL, task);
+ bulk_operation_merge_finish (operation);
+
+ if (operation->sparql) {
+ sparql = operation->sparql;
+ }
+
+ bulk_operation_merge_free (operation);
+ }
+
+ if (sparql) {
+ tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
+ sparql,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ tracker_processing_pool_sparql_update_cb,
+ task);
+ }
return TRUE;
} else {
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index 9d0e09a..f5322e0 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -28,6 +28,11 @@
G_BEGIN_DECLS
+typedef enum {
+ TRACKER_BULK_MATCH_EQUALS = 1 << 0,
+ TRACKER_BULK_MATCH_CHILDREN = 1 << 1
+} TrackerBulkMatchType;
+
typedef struct _TrackerProcessingTask TrackerProcessingTask;
typedef struct _TrackerProcessingPool TrackerProcessingPool;
@@ -48,6 +53,11 @@ void tracker_processing_task_set_sparql (TrackerProcess
void tracker_processing_task_set_sparql_string (TrackerProcessingTask *task,
gchar *sparql_string);
+/* API for bulk operations */
+void tracker_processing_task_set_bulk_operation (TrackerProcessingTask *task,
+ const gchar *sparql,
+ TrackerBulkMatchType match);
+
TrackerProcessingPool *tracker_processing_pool_new (TrackerMinerFS *miner,
guint limit_wait,
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]