[tracker/miner-fs-refactor-multi-insert] libtracker-miner: When QUEUE_WAIT, force a processing pool buffer flush
- From: Aleksander Morgado <aleksm src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-fs-refactor-multi-insert] libtracker-miner: When QUEUE_WAIT, force a processing pool buffer flush
- Date: Tue, 19 Oct 2010 07:52:30 +0000 (UTC)
commit e56de525ecb1b9888dac3f8ab38bdfb9b8dceff7
Author: Aleksander Morgado <aleksander lanedo com>
Date: Tue Oct 19 09:50:55 2010 +0200
libtracker-miner: When QUEUE_WAIT, force a processing pool buffer flush
.../tracker-miner-fs-processing-pool.c | 28 +++++++++++++++++--
src/libtracker-miner/tracker-miner-fs.c | 7 +++++
2 files changed, 32 insertions(+), 3 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index 0caa713..88b71fb 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -115,6 +115,8 @@ typedef enum {
struct _ProcessingTask {
/* The file being processed */
GFile *file;
+ /* File URI, useful for logs */
+ gchar *file_uri;
/* The FULL sparql to be updated in the store */
gchar *sparql;
/* The context of the task */
@@ -139,6 +141,7 @@ processing_task_new (GFile *file)
task = g_slice_new0 (ProcessingTask);
task->file = g_object_ref (file);
+ task->file_uri = g_file_get_uri (task->file);
task->status = PROCESSING_TASK_STATUS_NO_POOL;
return task;
}
@@ -155,6 +158,7 @@ processing_task_free (ProcessingTask *task)
task->context_free_func (task->context);
}
g_free (task->sparql);
+ g_free (task->file_uri);
g_object_unref (task->file);
g_slice_free (ProcessingTask, task);
}
@@ -374,6 +378,9 @@ processing_pool_wait_task (ProcessingPool *pool,
/* Set status of the task as WAIT */
task->status = PROCESSING_TASK_STATUS_WAIT;
+ g_debug ("(Processing Pool) Pushed WAIT task %p for file '%s'",
+ task, task->file_uri);
+
/* Push a new task in WAIT status (so just add it to the tasks queue,
* and don't process it. */
g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_WAIT], task);
@@ -396,6 +403,9 @@ processing_pool_sparql_update_cb (GObject *object,
task = user_data;
+ g_debug ("(Processing Pool) Finished update of task %p for file '%s'",
+ task, task->file_uri);
+
/* Before calling user-provided callback, REMOVE the task from the pool;
* as the user-provided callback may actually modify the pool again */
processing_pool_remove_task (task->pool, task);
@@ -420,11 +430,16 @@ processing_pool_sparql_update_array_cb (GObject *object,
/* Get arrays of errors and queries */
sparql_array = user_data;
+
+ g_debug ("(Processing Pool) Finished array-update of tasks %p",
+ sparql_array);
+
sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
result,
&global_error);
if (global_error) {
- g_critical ("(Sparql buffer) Could not execute array-update with '%u' items: %s",
+ g_critical ("(Sparql buffer) Could not execute array-update of tasks %p with '%u' items: %s",
+ sparql_array,
sparql_array->len,
global_error->message);
}
@@ -476,8 +491,9 @@ processing_pool_buffer_flush (ProcessingPool *pool)
g_ptr_array_add (sparql_array, task->sparql);
}
- g_debug ("(Sparql buffer) Flushing buffer with '%u' items",
- pool->sparql_buffer->len);
+ g_debug ("(Processing Pool) Flushing array-update of tasks %p with %u items",
+ pool->sparql_buffer, pool->sparql_buffer->len);
+
tracker_sparql_connection_update_array_async (pool->connection,
(gchar **)(sparql_array->pdata),
sparql_array->len,
@@ -534,6 +550,9 @@ processing_pool_process_task (ProcessingPool *pool,
/* If buffering not requested, flush previous buffer and then the new update */
if (!buffer) {
+ g_debug ("(Processing Pool) Pushed PROCESS task %p for file '%s'",
+ task, task->file_uri);
+
/* Flush previous */
processing_pool_buffer_flush (pool);
/* And update the new one */
@@ -563,6 +582,9 @@ processing_pool_process_task (ProcessingPool *pool,
pool->sparql_buffer_current_parent = g_object_ref (parent);
}
+ g_debug ("(Processing Pool) Pushed PROCESS task %p for file '%s' into array %p",
+ task, task->file_uri, pool->sparql_buffer);
+
/* Add task to array */
g_ptr_array_add (pool->sparql_buffer, task);
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 7aa0081..b15632f 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -2390,6 +2390,13 @@ item_queue_handlers_cb (gpointer user_data)
* the next directories batch.
*/
fs->private->item_queues_handler_id = 0;
+
+ /* We should flush the processing pool buffer here, because
+ * if there was a previous task on the same file we want to
+ * process now, we want it to get finished before we can go
+ * on with the queues... */
+ processing_pool_buffer_flush (fs->private->processing_pool);
+
return FALSE;
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]