[tracker/miner-fs-refactor-multi-insert: 6/14] libtracker-miner: When QUEUE_WAIT, force a processing pool buffer flush



commit 0179a7c45fba2120586d16e89037fd5a76a05f70
Author: Aleksander Morgado <aleksander lanedo com>
Date:   Tue Oct 19 09:50:55 2010 +0200

    libtracker-miner: When QUEUE_WAIT, force a processing pool buffer flush

 .../tracker-miner-fs-processing-pool.c             |   28 +++++++++++++++++--
 src/libtracker-miner/tracker-miner-fs.c            |    7 +++++
 2 files changed, 32 insertions(+), 3 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index 0caa713..88b71fb 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -115,6 +115,8 @@ typedef enum {
 struct _ProcessingTask {
 	/* The file being processed */
 	GFile *file;
+	/* File URI, useful for logs */
+	gchar *file_uri;
 	/* The FULL sparql to be updated in the store */
 	gchar *sparql;
 	/* The context of the task */
@@ -139,6 +141,7 @@ processing_task_new (GFile *file)
 
 	task = g_slice_new0 (ProcessingTask);
 	task->file = g_object_ref (file);
+	task->file_uri = g_file_get_uri (task->file);
 	task->status = PROCESSING_TASK_STATUS_NO_POOL;
 	return task;
 }
@@ -155,6 +158,7 @@ processing_task_free (ProcessingTask *task)
 		task->context_free_func (task->context);
 	}
 	g_free (task->sparql);
+	g_free (task->file_uri);
 	g_object_unref (task->file);
 	g_slice_free (ProcessingTask, task);
 }
@@ -374,6 +378,9 @@ processing_pool_wait_task (ProcessingPool *pool,
 	/* Set status of the task as WAIT */
 	task->status = PROCESSING_TASK_STATUS_WAIT;
 
+	g_debug ("(Processing Pool) Pushed WAIT task %p for file '%s'",
+	         task, task->file_uri);
+
 	/* Push a new task in WAIT status (so just add it to the tasks queue,
 	 * and don't process it. */
 	g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_WAIT], task);
@@ -396,6 +403,9 @@ processing_pool_sparql_update_cb (GObject      *object,
 
 	task = user_data;
 
+	g_debug ("(Processing Pool) Finished update of task %p for file '%s'",
+	         task, task->file_uri);
+
 	/* Before calling user-provided callback, REMOVE the task from the pool;
 	 * as the user-provided callback may actually modify the pool again */
 	processing_pool_remove_task (task->pool, task);
@@ -420,11 +430,16 @@ processing_pool_sparql_update_array_cb (GObject      *object,
 
 	/* Get arrays of errors and queries */
 	sparql_array = user_data;
+
+	g_debug ("(Processing Pool) Finished array-update of tasks %p",
+	         sparql_array);
+
 	sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
 	                                                                     result,
 	                                                                     &global_error);
 	if (global_error) {
-		g_critical ("(Sparql buffer) Could not execute array-update with '%u' items: %s",
+		g_critical ("(Sparql buffer) Could not execute array-update of tasks %p with '%u' items: %s",
+		            sparql_array,
 		            sparql_array->len,
 		            global_error->message);
 	}
@@ -476,8 +491,9 @@ processing_pool_buffer_flush (ProcessingPool *pool)
 		g_ptr_array_add (sparql_array, task->sparql);
 	}
 
-	g_debug ("(Sparql buffer) Flushing buffer with '%u' items",
-	         pool->sparql_buffer->len);
+	g_debug ("(Processing Pool) Flushing array-update of tasks %p with %u items",
+	         pool->sparql_buffer, pool->sparql_buffer->len);
+
 	tracker_sparql_connection_update_array_async (pool->connection,
 	                                              (gchar **)(sparql_array->pdata),
 	                                              sparql_array->len,
@@ -534,6 +550,9 @@ processing_pool_process_task (ProcessingPool                     *pool,
 
 	/* If buffering not requested, flush previous buffer and then the new update */
 	if (!buffer) {
+		g_debug ("(Processing Pool) Pushed PROCESS task %p for file '%s'",
+		         task, task->file_uri);
+
 		/* Flush previous */
 		processing_pool_buffer_flush (pool);
 		/* And update the new one */
@@ -563,6 +582,9 @@ processing_pool_process_task (ProcessingPool                     *pool,
 			pool->sparql_buffer_current_parent = g_object_ref (parent);
 		}
 
+		g_debug ("(Processing Pool) Pushed PROCESS task %p for file '%s' into array %p",
+		         task, task->file_uri, pool->sparql_buffer);
+
 		/* Add task to array */
 		g_ptr_array_add (pool->sparql_buffer, task);
 
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 7aa0081..b15632f 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -2390,6 +2390,13 @@ item_queue_handlers_cb (gpointer user_data)
 		 * the next directories batch.
 		 */
 		fs->private->item_queues_handler_id = 0;
+
+		/* We should flush the processing pool buffer here, because
+		 * if there was a previous task on the same file we want to
+		 * process now, we want it to get finished before we can go
+		 * on with the queues... */
+		processing_pool_buffer_flush (fs->private->processing_pool);
+
 		return FALSE;
 	}
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]