[tracker/miner-fs-refactor] libtracker-miner: integrate multi-insert feature in the processing pool



commit a789ead91b134c21d14691911703ccaff481faad
Author: Aleksander Morgado <aleksander lanedo com>
Date:   Thu Oct 14 15:24:47 2010 +0200

    libtracker-miner: integrate multi-insert feature in the processing pool

 .../tracker-miner-fs-processing-pool.c             |  180 ++++++++++++++++++--
 .../tracker-miner-fs-processing-pool.h             |    4 +-
 src/libtracker-miner/tracker-miner-fs.c            |   43 ++++--
 3 files changed, 200 insertions(+), 27 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index 3a2543c..5ccfd07 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -20,6 +20,9 @@
 #include "config.h"
 #include "tracker-miner-fs-processing-pool.h"
 
+/* Maximum time (seconds) before forcing a sparql buffer flush */
+#define MAX_SPARQL_BUFFER_TIME  15
+
 /*------------------- PROCESSING TASK ----------------------*/
 
 typedef enum {
@@ -122,6 +125,11 @@ struct _ProcessingPool {
 	GQueue *tasks[PROCESSING_TASK_STATUS_LAST];
 	/* The processing pool limits */
 	guint  limit[PROCESSING_TASK_STATUS_LAST];
+
+	/* SPARQL buffer to pile up several UPDATEs */
+	GPtrArray      *sparql_buffer;
+	GFile          *sparql_buffer_current_parent;
+	time_t          sparql_buffer_start_time;
 };
 
 static void
@@ -150,6 +158,14 @@ processing_pool_free (ProcessingPool *pool)
 		g_queue_free (pool->tasks[i]);
 	}
 
+	if (pool->sparql_buffer_current_parent) {
+		g_object_unref (pool->sparql_buffer_current_parent);
+	}
+
+	if (pool->sparql_buffer) {
+		g_ptr_array_free (pool->sparql_buffer, TRUE);
+	}
+
 	g_object_unref (pool->connection);
 	g_free (pool);
 }
@@ -263,7 +279,7 @@ processing_pool_find_task (ProcessingPool *pool,
 	return NULL;
 }
 
-gboolean
+void
 processing_pool_wait_task (ProcessingPool *pool,
                            ProcessingTask *task)
 {
@@ -276,14 +292,12 @@ processing_pool_wait_task (ProcessingPool *pool,
 	 * and don't process it. */
 	g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_WAIT], task);
 	task->pool = pool;
-
-	return (!processing_pool_wait_limit_reached (pool));
 }
 
 static void
-sparql_update_cb (GObject      *object,
-                  GAsyncResult *result,
-                  gpointer      user_data)
+processing_pool_sparql_update_cb (GObject      *object,
+                                  GAsyncResult *result,
+                                  gpointer      user_data)
 {
 	ProcessingTask *task;
 	GError *error = NULL;
@@ -308,9 +322,102 @@ sparql_update_cb (GObject      *object,
 	g_clear_error (&error);
 }
 
+static void
+processing_pool_sparql_update_array_cb (GObject      *object,
+                                        GAsyncResult *result,
+                                        gpointer      user_data)
+{
+	GError *global_error = NULL;
+	GPtrArray *sparql_array_errors;
+	GPtrArray *sparql_array;
+	guint i;
+
+	/* Get arrays of errors and queries */
+	sparql_array = user_data;
+	sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
+	                                                                     result,
+	                                                                     &global_error);
+	if (global_error) {
+		g_critical ("(Sparql buffer) Could not execute array-update with '%u' items: %s",
+		            sparql_array->len,
+		            global_error->message);
+	}
+
+	/* Report status on each task of the batch update */
+	for (i = 0; i < sparql_array->len; i++) {
+		ProcessingTask *task;
+
+		task = g_ptr_array_index (sparql_array, i);
+
+		/* Before calling user-provided callback, REMOVE the task from the pool;
+		 * as the user-provided callback may actually modify the pool again */
+		processing_pool_remove_task (task->pool, task);
+
+		/* Call finished handler with the error, if any */
+		task->finished_handler (task, task->finished_user_data,
+		                        (global_error ?
+		                         global_error :
+		                         g_ptr_array_index (sparql_array_errors, i)));
+
+		/* No need to deallocate the task here, it will be done when
+		 * unref-ing the GPtrArray below */
+	}
+
+	/* Unref the arrays of errors and queries */
+	if (sparql_array_errors)
+		g_ptr_array_unref (sparql_array_errors);
+	/* Note that tasks are actually deallocated here */
+	g_ptr_array_unref (sparql_array);
+	g_clear_error (&global_error);
+}
+
+void
+processing_pool_buffer_flush (ProcessingPool *pool)
+{
+	guint i;
+	GPtrArray *sparql_array;
+
+	if (!pool->sparql_buffer)
+		return;
+
+	/* Loop buffer and construct array of strings */
+	sparql_array = g_ptr_array_new ();
+	for (i = 0; i < pool->sparql_buffer->len; i++) {
+		ProcessingTask *task;
+
+		task = g_ptr_array_index (pool->sparql_buffer, i);
+		/* Add original string, not a duplicate */
+		g_ptr_array_add (sparql_array, task->sparql);
+	}
+
+	g_debug ("(Sparql buffer) Flushing buffer with '%u' items",
+	         pool->sparql_buffer->len);
+	tracker_sparql_connection_update_array_async (pool->connection,
+	                                              (gchar **)(sparql_array->pdata),
+	                                              sparql_array->len,
+	                                              G_PRIORITY_DEFAULT,
+	                                              NULL,
+	                                              processing_pool_sparql_update_array_cb,
+	                                              pool->sparql_buffer);
+
+	/* Clear current parent */
+	if (pool->sparql_buffer_current_parent) {
+		g_object_unref (pool->sparql_buffer_current_parent);
+		pool->sparql_buffer_current_parent = NULL;
+	}
+
+	/* Clear temp buffer */
+	g_ptr_array_free (sparql_array, TRUE);
+	pool->sparql_buffer_start_time = 0;
+	/* Note the whole buffer is passed to the update_array callback,
+	 * so no need to free it. */
+	pool->sparql_buffer = NULL;
+}
+
 gboolean
 processing_pool_process_task (ProcessingPool                     *pool,
                               ProcessingTask                     *task,
+                              gboolean                            buffer,
                               ProcessingPoolTaskFinishedCallback  finished_handler,
                               gpointer                            user_data)
 {
@@ -339,15 +446,60 @@ processing_pool_process_task (ProcessingPool                     *pool,
 	task->finished_handler = finished_handler;
 	task->finished_user_data = user_data;
 
-	/* Update in the store */
-	tracker_sparql_connection_update_async (pool->connection,
-	                                        task->sparql,
-	                                        G_PRIORITY_DEFAULT,
-	                                        NULL,
-	                                        sparql_update_cb,
-	                                        task);
+	/* If buffering not requested, flush previous buffer and then the new update */
+	if (!buffer) {
+		/* Flush previous */
+		processing_pool_buffer_flush (pool);
+		/* And update the new one */
+		tracker_sparql_connection_update_async (pool->connection,
+		                                        task->sparql,
+		                                        G_PRIORITY_DEFAULT,
+		                                        NULL,
+		                                        processing_pool_sparql_update_cb,
+		                                        task);
+
+		return TRUE;
+	} else {
+		GFile *parent;
+		gboolean flushed = FALSE;
+
+		/* Get parent of this file we're updating/creating */
+		parent = g_file_get_parent (task->file);
 
-	return (!processing_pool_process_limit_reached (pool));
+		/* Start buffer if not already done */
+		if (!pool->sparql_buffer) {
+			pool->sparql_buffer = g_ptr_array_new_with_free_func ((GDestroyNotify)processing_task_free);
+			pool->sparql_buffer_start_time = time (NULL);
+		}
+
+		/* Set current parent if not set already */
+		if (!pool->sparql_buffer_current_parent && parent) {
+			pool->sparql_buffer_current_parent = g_object_ref (parent);
+		}
+
+		/* Add task to array */
+		g_ptr_array_add (pool->sparql_buffer, task);
+
+		/* Flush buffer if:
+		 *  - Last item has no parent
+		 *  - Parent change was detected
+		 *  - 'limit_process' items reached
+		 *  - Not flushed in the last MAX_SPARQL_BUFFER_TIME seconds
+		 */
+		if (!parent ||
+		    !g_file_equal (parent, pool->sparql_buffer_current_parent) ||
+		    processing_pool_process_limit_reached (pool) ||
+		    (time (NULL) - pool->sparql_buffer_start_time > MAX_SPARQL_BUFFER_TIME)) {
+			/* Flush! */
+			processing_pool_buffer_flush (pool);
+			flushed = TRUE;
+		}
+
+		if (parent)
+			g_object_unref (parent);
+
+		return flushed;
+	}
 }
 
 void
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index e2274ae..28c7c8a 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -65,10 +65,11 @@ gboolean        processing_pool_process_limit_reached (ProcessingPool          *
 
 void            processing_pool_remove_task           (ProcessingPool          *pool,
                                                        ProcessingTask          *task);
-gboolean        processing_pool_wait_task             (ProcessingPool          *pool,
+void            processing_pool_wait_task             (ProcessingPool          *pool,
                                                        ProcessingTask          *task);
 gboolean        processing_pool_process_task          (ProcessingPool          *pool,
                                                        ProcessingTask          *task,
+                                                       gboolean                 buffer,
                                                        ProcessingPoolTaskFinishedCallback  finished_handler,
                                                        gpointer                 user_data);
 guint           processing_pool_get_wait_task_count    (ProcessingPool         *pool);
@@ -77,6 +78,7 @@ ProcessingTask *processing_pool_get_last_wait          (ProcessingPool         *
 void            processing_pool_foreach                (ProcessingPool         *pool,
                                                         GFunc                   func,
                                                         gpointer                user_data);
+void            processing_pool_buffer_flush           (ProcessingPool         *pool);
 
 G_END_DECLS
 
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index f6c46e6..7aa0081 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -1582,10 +1582,15 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 		}
 
 		processing_task_set_sparql (task, full_sparql);
-		processing_pool_process_task (fs->private->processing_pool,
-		                              task,
-		                              processing_pool_task_finished_cb,
-		                              fs);
+		/* If process_task() returns FALSE, it means the actual db update was delayed,
+		 * and in this case we need to setup queue handlers again */
+		if (!processing_pool_process_task (fs->private->processing_pool,
+		                                   task,
+		                                   TRUE, /* buffer! */
+		                                   processing_pool_task_finished_cb,
+		                                   fs)) {
+			item_queue_handlers_set_up (fs);
+		}
 		g_free (full_sparql);
 	}
 
@@ -1726,10 +1731,16 @@ item_remove (TrackerMinerFS *fs,
 	/* Add new task to processing pool */
 	task = processing_task_new (file);
 	processing_task_set_sparql (task, sparql->str);
-	processing_pool_process_task (fs->private->processing_pool,
-	                              task,
-	                              processing_pool_task_finished_cb,
-	                              fs);
+	/* If process_task() returns FALSE, it means the actual db update was delayed,
+	 * and in this case we need to setup queue handlers again */
+	if (!processing_pool_process_task (fs->private->processing_pool,
+	                                   task,
+	                                   FALSE,
+	                                   processing_pool_task_finished_cb,
+	                                   fs)) {
+		item_queue_handlers_set_up (fs);
+	}
+
 
 	g_string_free (sparql, TRUE);
 	g_free (uri);
@@ -2041,10 +2052,15 @@ item_move (TrackerMinerFS *fs,
 	/* Add new task to processing pool */
 	task = processing_task_new (file);
 	processing_task_set_sparql (task, sparql->str);
-	processing_pool_process_task (fs->private->processing_pool,
-	                              task,
-	                              processing_pool_task_finished_cb,
-	                              fs);
+	/* If process_task() returns FALSE, it means the actual db update was delayed,
+	 * and in this case we need to setup queue handlers again */
+	if (!processing_pool_process_task (fs->private->processing_pool,
+	                                   task,
+	                                   FALSE,
+	                                   processing_pool_task_finished_cb,
+	                                   fs)) {
+		item_queue_handlers_set_up (fs);
+	}
 
 	g_free (uri);
 	g_free (source_uri);
@@ -2447,6 +2463,9 @@ item_queue_handlers_cb (gpointer user_data)
 			process_stop (fs);
 		}
 
+		/* Flush any possible pending update here */
+		processing_pool_buffer_flush (fs->private->processing_pool);
+
 		tracker_thumbnailer_send ();
 		/* No more files left to process */
 		keep_processing = FALSE;



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]