[tracker/tracker-0.10] libtracker-miner: Add bulk operations to the MinerFS processing pool



commit d5771d725ecd48bee40f5f14e32f2439277cec07
Author: Carlos Garnacho <carlosg gnome org>
Date:   Mon Mar 7 18:47:57 2011 +0100

    libtracker-miner: Add bulk operations to the MinerFS processing pool
    
    These bulk operations are grouped when flushing the processing queue, so
    several operations are send as a single sparql expression.

 .../tracker-miner-fs-processing-pool.c             |  299 +++++++++++++++++---
 .../tracker-miner-fs-processing-pool.h             |   10 +
 2 files changed, 263 insertions(+), 46 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index d71b2eb..e2628eb 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -129,12 +129,33 @@ typedef enum {
 	TRACKER_PROCESSING_TASK_STATUS_LAST
 } TrackerProcessingTaskStatus;
 
+typedef struct {
+	const gchar *bulk_operation;
+	GList *tasks;
+	gchar *sparql;
+} BulkOperationMerge;
+
+typedef enum {
+	CONTENT_NONE,
+	CONTENT_SPARQL_STRING,
+	CONTENT_SPARQL_BUILDER,
+	CONTENT_BULK_OPERATION
+} TaskContentType;
+
 struct _TrackerProcessingTask {
 	/* The file being processed */
 	GFile *file;
-	/* The FULL sparql to be updated in the store */
-	TrackerSparqlBuilder *sparql;
-	gchar *sparql_string;
+
+	TaskContentType content;
+
+	union {
+		TrackerSparqlBuilder *builder;
+		gchar *string;
+		struct {
+			const gchar *bulk_operation;
+			TrackerBulkMatchType match;
+		} bulk;
+	} data;
 
 	/* The context of the task */
 	gpointer context;
@@ -195,6 +216,20 @@ tracker_processing_task_new (GFile *file)
 	return task;
 }
 
+static void
+tracker_processing_task_data_unset (TrackerProcessingTask *task)
+{
+	if (task->content == CONTENT_SPARQL_STRING) {
+		g_free (task->data.string);
+	} else if (task->content == CONTENT_SPARQL_BUILDER) {
+		if (task->data.builder) {
+			g_object_unref (task->data.builder);
+		}
+	}
+
+	task->content = CONTENT_NONE;
+}
+
 void
 tracker_processing_task_free (TrackerProcessingTask *task)
 {
@@ -210,10 +245,9 @@ tracker_processing_task_free (TrackerProcessingTask *task)
 	    task->context_free_func) {
 		task->context_free_func (task->context);
 	}
-	if (task->sparql) {
-		g_object_unref (task->sparql);
-	}
-	g_free (task->sparql_string);
+
+	tracker_processing_task_data_unset (task);
+
 	g_object_unref (task->file);
 	g_slice_free (TrackerProcessingTask, task);
 }
@@ -249,31 +283,41 @@ void
 tracker_processing_task_set_sparql (TrackerProcessingTask *task,
                                     TrackerSparqlBuilder  *sparql)
 {
-	if (task->sparql) {
-		g_object_unref (task->sparql);
-	}
-	if (task->sparql_string) {
-		g_free (task->sparql_string);
-		task->sparql_string = NULL;
+	tracker_processing_task_data_unset (task);
+
+	if (sparql) {
+		task->data.builder = g_object_ref (sparql);
+		task->content = CONTENT_SPARQL_BUILDER;
 	}
-	task->sparql = g_object_ref (sparql);
 }
 
 void
 tracker_processing_task_set_sparql_string (TrackerProcessingTask *task,
                                            gchar                 *sparql_string)
 {
-	if (task->sparql) {
-		g_object_unref (task->sparql);
-		task->sparql = NULL;
-	}
-	if (task->sparql_string) {
-		g_free (task->sparql_string);
+	tracker_processing_task_data_unset (task);
+
+	if (sparql_string) {
+		/* We take ownership of the input string! */
+		task->data.string = sparql_string;
+		task->content = CONTENT_SPARQL_STRING;
 	}
-	/* We take ownership of the input string! */
-	task->sparql_string = sparql_string;
 }
 
+void
+tracker_processing_task_set_bulk_operation (TrackerProcessingTask *task,
+                                            const gchar           *sparql,
+                                            TrackerBulkMatchType   match)
+{
+	tracker_processing_task_data_unset (task);
+
+	if (sparql) {
+		/* This string is expected to remain constant */
+		task->data.bulk.bulk_operation = sparql;
+		task->data.bulk.match = match;
+		task->content = CONTENT_BULK_OPERATION;
+	}
+}
 
 /*------------------- PROCESSING POOL ----------------------*/
 
@@ -569,10 +613,13 @@ tracker_processing_pool_sparql_update_array_cb (GObject      *object,
 		tracker_processing_pool_remove_task (task->pool, task);
 
 		/* Call finished handler with the error, if any */
-		task->finished_handler (task, task->finished_user_data,
-		                        (global_error ?
-		                         global_error :
-		                         g_ptr_array_index (sparql_array_errors, i)));
+		task->finished_handler (task, task->finished_user_data, global_error);
+
+		/* FIXME: sparql_array_errors don't match the task list anymore.
+		 *                      (global_error ?
+		 *                       global_error :
+		 *                       g_ptr_array_index (sparql_array_errors, i)));
+		 */
 
 		/* No need to deallocate the task here, it will be done when
 		 * unref-ing the GPtrArray below */
@@ -586,18 +633,121 @@ tracker_processing_pool_sparql_update_array_cb (GObject      *object,
 	g_clear_error (&global_error);
 }
 
+static void
+bulk_operation_merge_finish (BulkOperationMerge *merge)
+{
+	if (merge->sparql) {
+		g_free (merge->sparql);
+		merge->sparql = NULL;
+	}
+
+	if (merge->bulk_operation && merge->tasks) {
+		GString *equals_string = NULL, *children_string = NULL, *sparql;
+		guint n_equals = 0;
+		GList *l;
+
+		for (l = merge->tasks; l; l = l->next) {
+			TrackerProcessingTask *task = l->data;
+			gchar *uri;
+
+			uri = g_file_get_uri (task->file);
+
+			if (task->data.bulk.match & TRACKER_BULK_MATCH_EQUALS) {
+				if (!equals_string) {
+					equals_string = g_string_new ("");
+				} else {
+					g_string_append_c (equals_string, ',');
+				}
+
+				g_string_append_printf (equals_string, "\"%s\"", uri);
+				n_equals++;
+			}
+
+			if (task->data.bulk.match & TRACKER_BULK_MATCH_CHILDREN) {
+				if (!children_string) {
+					children_string = g_string_new ("");
+				} else {
+					g_string_append_c (children_string, ',');
+				}
+
+				g_string_append_printf (children_string, "\"%s\"", uri);
+			}
+
+			g_free (uri);
+		}
+
+		sparql = g_string_new ("");
+
+		if (equals_string) {
+			g_string_append (sparql, merge->bulk_operation);
+
+			if (n_equals == 1) {
+				g_string_append_printf (sparql,
+				                        " WHERE { "
+				                        "  ?f nie:url %s"
+				                        "} ",
+				                        equals_string->str);
+			} else {
+				g_string_append_printf (sparql,
+				                        " WHERE { "
+				                        "  ?f nie:url ?u ."
+				                        "  FILTER (?u IN (%s))"
+				                        "} ",
+				                        equals_string->str);
+			}
+
+			g_string_free (equals_string, TRUE);
+		}
+
+		if (children_string) {
+			g_string_append (sparql, merge->bulk_operation);
+			g_string_append_printf (sparql,
+			                        " WHERE { "
+			                        "  ?f nie:url ?u ."
+			                        "  FILTER (tracker:uri-is-descendant (%s, ?u))"
+			                        "} ",
+			                        children_string->str);
+			g_string_free (children_string, TRUE);
+		}
+
+		merge->sparql = g_string_free (sparql, FALSE);
+	}
+}
+
+static BulkOperationMerge *
+bulk_operation_merge_new (const gchar *bulk_operation)
+{
+	BulkOperationMerge *operation;
+
+	operation = g_slice_new0 (BulkOperationMerge);
+	operation->bulk_operation = bulk_operation;
+
+	return operation;
+}
+
+static void
+bulk_operation_merge_free (BulkOperationMerge *operation)
+{
+	g_list_free (operation->tasks);
+	g_free (operation->sparql);
+	g_slice_free (BulkOperationMerge, operation);
+}
+
 void
 tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
                                       const gchar           *reason)
 {
-	guint i;
-	gchar **sparql_array;
+	GPtrArray *bulk_ops;
+	GArray *sparql_array;
+	guint i, j;
 
 	if (!pool->sparql_buffer)
 		return;
 
 	/* Loop buffer and construct array of strings */
-	sparql_array = g_new (gchar *, pool->sparql_buffer->len);
+	sparql_array = g_array_new (FALSE, TRUE, sizeof (gchar *));
+	bulk_ops = g_ptr_array_new_with_free_func ((GDestroyNotify) bulk_operation_merge_free);
+
 	for (i = 0; i < pool->sparql_buffer->len; i++) {
 		TrackerProcessingTask *task;
 
@@ -613,10 +763,44 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 		task->pool = pool;
 		g_queue_push_head (pool->tasks[TRACKER_PROCESSING_TASK_STATUS_PROCESSING], task);
 
-		/* Add original string, not a duplicate */
-		sparql_array[i] = (task->sparql ?
-		                   (gchar *) tracker_sparql_builder_get_result (task->sparql) :
-		                   task->sparql_string);
+		if (task->content == CONTENT_SPARQL_STRING) {
+			g_array_append_val (sparql_array, task->data.string);
+		} else if (task->content == CONTENT_SPARQL_BUILDER) {
+			const gchar *str = tracker_sparql_builder_get_result (task->data.builder);
+			g_array_append_val (sparql_array, str);
+		} else if (task->content == CONTENT_BULK_OPERATION) {
+			BulkOperationMerge *bulk = NULL;
+			gint j;
+
+			for (j = 0; j < bulk_ops->len; j++) {
+				BulkOperationMerge *cur;
+
+				cur = g_ptr_array_index (bulk_ops, j);
+
+				if (cur->bulk_operation == task->data.bulk.bulk_operation) {
+					bulk = cur;
+					break;
+				}
+			}
+
+			if (!bulk) {
+				bulk = bulk_operation_merge_new (task->data.bulk.bulk_operation);
+				g_ptr_array_add (bulk_ops, bulk);
+			}
+
+			bulk->tasks = g_list_prepend (bulk->tasks, task);
+		}
+	}
+
+	for (j = 0; j < bulk_ops->len; j++) {
+		BulkOperationMerge *bulk;
+
+		bulk = g_ptr_array_index (bulk_ops, j);
+		bulk_operation_merge_finish (bulk);
+
+		if (bulk->sparql) {
+			g_array_prepend_val (sparql_array, bulk->sparql);
+		}
 	}
 
 	trace ("(Processing Pool %s) Flushing array-update of tasks %p with %u items (%s)",
@@ -626,8 +810,8 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 	       reason ? reason : "Unknown reason");
 
 	tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
-	                                              sparql_array,
-	                                              pool->sparql_buffer->len,
+	                                              (gchar **) sparql_array->data,
+	                                              sparql_array->len,
 	                                              G_PRIORITY_DEFAULT,
 	                                              NULL,
 	                                              tracker_processing_pool_sparql_update_array_cb,
@@ -640,7 +824,10 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 	}
 
 	/* Clear temp buffer */
-	g_free (sparql_array);
+	g_array_free (sparql_array, TRUE);
+
+	g_ptr_array_free (bulk_ops, TRUE);
+
 	pool->sparql_buffer_start_time = 0;
 	/* Note the whole buffer is passed to the update_array callback,
 	 * so no need to free it. */
@@ -656,8 +843,8 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
 {
 	GList *previous;
 
-	/* The task MUST have a proper SPARQL here */
-	g_assert (task->sparql != NULL || task->sparql_string != NULL);
+	/* The task MUST have a proper content here */
+	g_assert (task->content != CONTENT_NONE);
 
 	/* First, check if the task was already added as being WAITING */
 	previous = g_queue_find (pool->tasks[TRACKER_PROCESSING_TASK_STATUS_WAIT], task);
@@ -677,6 +864,8 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
 	/* If buffering not requested, OR the limit of READY tasks is actually 1,
 	 * flush previous buffer (if any) and then the new update */
 	if (!buffer || pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1) {
+		const gchar *sparql = NULL;
+
 		trace ("(Processing Pool %s) Pushed READY/PROCESSING task %p for file '%s'",
 		       G_OBJECT_TYPE_NAME (pool->miner),
 		       task,
@@ -695,14 +884,32 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
 		       task);
 
 		/* And update the new one */
-		tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
-		                                        (task->sparql ?
-		                                         tracker_sparql_builder_get_result (task->sparql) :
-		                                         task->sparql_string),
-		                                        G_PRIORITY_DEFAULT,
-		                                        NULL,
-		                                        tracker_processing_pool_sparql_update_cb,
-		                                        task);
+		if (task->content == CONTENT_SPARQL_STRING) {
+			sparql = task->data.string;
+		} else if (task->content == CONTENT_SPARQL_BUILDER) {
+			sparql = tracker_sparql_builder_get_result (task->data.builder);
+		} else if (task->content == CONTENT_BULK_OPERATION) {
+			BulkOperationMerge *operation;
+
+			operation = bulk_operation_merge_new (task->data.bulk.bulk_operation);
+			operation->tasks = g_list_prepend (NULL, task);
+			bulk_operation_merge_finish (operation);
+
+			if (operation->sparql) {
+				sparql = operation->sparql;
+			}
+
+			bulk_operation_merge_free (operation);
+		}
+
+		if (sparql) {
+			tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
+			                                        sparql,
+			                                        G_PRIORITY_DEFAULT,
+			                                        NULL,
+			                                        tracker_processing_pool_sparql_update_cb,
+			                                        task);
+		}
 
 		return TRUE;
 	} else {
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index 9d0e09a..f5322e0 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -28,6 +28,11 @@
 
 G_BEGIN_DECLS
 
+typedef enum {
+	TRACKER_BULK_MATCH_EQUALS   = 1 << 0,
+	TRACKER_BULK_MATCH_CHILDREN = 1 << 1
+} TrackerBulkMatchType;
+
 
 typedef struct _TrackerProcessingTask TrackerProcessingTask;
 typedef struct _TrackerProcessingPool TrackerProcessingPool;
@@ -48,6 +53,11 @@ void                   tracker_processing_task_set_sparql        (TrackerProcess
 void                   tracker_processing_task_set_sparql_string (TrackerProcessingTask *task,
                                                                   gchar                 *sparql_string);
 
+/* API for bulk operations */
+void                   tracker_processing_task_set_bulk_operation (TrackerProcessingTask *task,
+                                                                   const gchar           *sparql,
+                                                                   TrackerBulkMatchType   match);
+
 
 TrackerProcessingPool *tracker_processing_pool_new                   (TrackerMinerFS          *miner,
                                                                       guint                    limit_wait,



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]