[tracker/miner-fs-refactor] libtracker-miner: integrate multi-insert feature in the processing pool
- From: Aleksander Morgado <aleksm src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-fs-refactor] libtracker-miner: integrate multi-insert feature in the processing pool
- Date: Thu, 14 Oct 2010 14:00:42 +0000 (UTC)
commit a789ead91b134c21d14691911703ccaff481faad
Author: Aleksander Morgado <aleksander lanedo com>
Date: Thu Oct 14 15:24:47 2010 +0200
libtracker-miner: integrate multi-insert feature in the processing pool
.../tracker-miner-fs-processing-pool.c | 180 ++++++++++++++++++--
.../tracker-miner-fs-processing-pool.h | 4 +-
src/libtracker-miner/tracker-miner-fs.c | 43 ++++--
3 files changed, 200 insertions(+), 27 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index 3a2543c..5ccfd07 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -20,6 +20,9 @@
#include "config.h"
#include "tracker-miner-fs-processing-pool.h"
+/* Maximum time (seconds) before forcing a sparql buffer flush */
+#define MAX_SPARQL_BUFFER_TIME 15
+
/*------------------- PROCESSING TASK ----------------------*/
typedef enum {
@@ -122,6 +125,11 @@ struct _ProcessingPool {
GQueue *tasks[PROCESSING_TASK_STATUS_LAST];
/* The processing pool limits */
guint limit[PROCESSING_TASK_STATUS_LAST];
+
+ /* SPARQL buffer to pile up several UPDATEs */
+ GPtrArray *sparql_buffer;
+ GFile *sparql_buffer_current_parent;
+ time_t sparql_buffer_start_time;
};
static void
@@ -150,6 +158,14 @@ processing_pool_free (ProcessingPool *pool)
g_queue_free (pool->tasks[i]);
}
+ if (pool->sparql_buffer_current_parent) {
+ g_object_unref (pool->sparql_buffer_current_parent);
+ }
+
+ if (pool->sparql_buffer) {
+ g_ptr_array_free (pool->sparql_buffer, TRUE);
+ }
+
g_object_unref (pool->connection);
g_free (pool);
}
@@ -263,7 +279,7 @@ processing_pool_find_task (ProcessingPool *pool,
return NULL;
}
-gboolean
+void
processing_pool_wait_task (ProcessingPool *pool,
ProcessingTask *task)
{
@@ -276,14 +292,12 @@ processing_pool_wait_task (ProcessingPool *pool,
* and don't process it. */
g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_WAIT], task);
task->pool = pool;
-
- return (!processing_pool_wait_limit_reached (pool));
}
static void
-sparql_update_cb (GObject *object,
- GAsyncResult *result,
- gpointer user_data)
+processing_pool_sparql_update_cb (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
{
ProcessingTask *task;
GError *error = NULL;
@@ -308,9 +322,102 @@ sparql_update_cb (GObject *object,
g_clear_error (&error);
}
+static void
+processing_pool_sparql_update_array_cb (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ GError *global_error = NULL;
+ GPtrArray *sparql_array_errors;
+ GPtrArray *sparql_array;
+ guint i;
+
+ /* Get arrays of errors and queries */
+ sparql_array = user_data;
+ sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
+ result,
+ &global_error);
+ if (global_error) {
+ g_critical ("(Sparql buffer) Could not execute array-update with '%u' items: %s",
+ sparql_array->len,
+ global_error->message);
+ }
+
+ /* Report status on each task of the batch update */
+ for (i = 0; i < sparql_array->len; i++) {
+ ProcessingTask *task;
+
+ task = g_ptr_array_index (sparql_array, i);
+
+ /* Before calling user-provided callback, REMOVE the task from the pool;
+ * as the user-provided callback may actually modify the pool again */
+ processing_pool_remove_task (task->pool, task);
+
+ /* Call finished handler with the error, if any */
+ task->finished_handler (task, task->finished_user_data,
+ (global_error ?
+ global_error :
+ g_ptr_array_index (sparql_array_errors, i)));
+
+ /* No need to deallocate the task here, it will be done when
+ * unref-ing the GPtrArray below */
+ }
+
+ /* Unref the arrays of errors and queries */
+ if (sparql_array_errors)
+ g_ptr_array_unref (sparql_array_errors);
+ /* Note that tasks are actually deallocated here */
+ g_ptr_array_unref (sparql_array);
+ g_clear_error (&global_error);
+}
+
+void
+processing_pool_buffer_flush (ProcessingPool *pool)
+{
+ guint i;
+ GPtrArray *sparql_array;
+
+ if (!pool->sparql_buffer)
+ return;
+
+ /* Loop buffer and construct array of strings */
+ sparql_array = g_ptr_array_new ();
+ for (i = 0; i < pool->sparql_buffer->len; i++) {
+ ProcessingTask *task;
+
+ task = g_ptr_array_index (pool->sparql_buffer, i);
+ /* Add original string, not a duplicate */
+ g_ptr_array_add (sparql_array, task->sparql);
+ }
+
+ g_debug ("(Sparql buffer) Flushing buffer with '%u' items",
+ pool->sparql_buffer->len);
+ tracker_sparql_connection_update_array_async (pool->connection,
+ (gchar **)(sparql_array->pdata),
+ sparql_array->len,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ processing_pool_sparql_update_array_cb,
+ pool->sparql_buffer);
+
+ /* Clear current parent */
+ if (pool->sparql_buffer_current_parent) {
+ g_object_unref (pool->sparql_buffer_current_parent);
+ pool->sparql_buffer_current_parent = NULL;
+ }
+
+ /* Clear temp buffer */
+ g_ptr_array_free (sparql_array, TRUE);
+ pool->sparql_buffer_start_time = 0;
+ /* Note the whole buffer is passed to the update_array callback,
+ * so no need to free it. */
+ pool->sparql_buffer = NULL;
+}
+
gboolean
processing_pool_process_task (ProcessingPool *pool,
ProcessingTask *task,
+ gboolean buffer,
ProcessingPoolTaskFinishedCallback finished_handler,
gpointer user_data)
{
@@ -339,15 +446,60 @@ processing_pool_process_task (ProcessingPool *pool,
task->finished_handler = finished_handler;
task->finished_user_data = user_data;
- /* Update in the store */
- tracker_sparql_connection_update_async (pool->connection,
- task->sparql,
- G_PRIORITY_DEFAULT,
- NULL,
- sparql_update_cb,
- task);
+ /* If buffering not requested, flush previous buffer and then the new update */
+ if (!buffer) {
+ /* Flush previous */
+ processing_pool_buffer_flush (pool);
+ /* And update the new one */
+ tracker_sparql_connection_update_async (pool->connection,
+ task->sparql,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ processing_pool_sparql_update_cb,
+ task);
+
+ return TRUE;
+ } else {
+ GFile *parent;
+ gboolean flushed = FALSE;
+
+ /* Get parent of this file we're updating/creating */
+ parent = g_file_get_parent (task->file);
- return (!processing_pool_process_limit_reached (pool));
+ /* Start buffer if not already done */
+ if (!pool->sparql_buffer) {
+ pool->sparql_buffer = g_ptr_array_new_with_free_func ((GDestroyNotify)processing_task_free);
+ pool->sparql_buffer_start_time = time (NULL);
+ }
+
+ /* Set current parent if not set already */
+ if (!pool->sparql_buffer_current_parent && parent) {
+ pool->sparql_buffer_current_parent = g_object_ref (parent);
+ }
+
+ /* Add task to array */
+ g_ptr_array_add (pool->sparql_buffer, task);
+
+ /* Flush buffer if:
+ * - Last item has no parent
+ * - Parent change was detected
+ * - 'limit_process' items reached
+ * - Not flushed in the last MAX_SPARQL_BUFFER_TIME seconds
+ */
+ if (!parent ||
+ !g_file_equal (parent, pool->sparql_buffer_current_parent) ||
+ processing_pool_process_limit_reached (pool) ||
+ (time (NULL) - pool->sparql_buffer_start_time > MAX_SPARQL_BUFFER_TIME)) {
+ /* Flush! */
+ processing_pool_buffer_flush (pool);
+ flushed = TRUE;
+ }
+
+ if (parent)
+ g_object_unref (parent);
+
+ return flushed;
+ }
}
void
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index e2274ae..28c7c8a 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -65,10 +65,11 @@ gboolean processing_pool_process_limit_reached (ProcessingPool *
void processing_pool_remove_task (ProcessingPool *pool,
ProcessingTask *task);
-gboolean processing_pool_wait_task (ProcessingPool *pool,
+void processing_pool_wait_task (ProcessingPool *pool,
ProcessingTask *task);
gboolean processing_pool_process_task (ProcessingPool *pool,
ProcessingTask *task,
+ gboolean buffer,
ProcessingPoolTaskFinishedCallback finished_handler,
gpointer user_data);
guint processing_pool_get_wait_task_count (ProcessingPool *pool);
@@ -77,6 +78,7 @@ ProcessingTask *processing_pool_get_last_wait (ProcessingPool *
void processing_pool_foreach (ProcessingPool *pool,
GFunc func,
gpointer user_data);
+void processing_pool_buffer_flush (ProcessingPool *pool);
G_END_DECLS
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index f6c46e6..7aa0081 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -1582,10 +1582,15 @@ item_add_or_update_cb (TrackerMinerFS *fs,
}
processing_task_set_sparql (task, full_sparql);
- processing_pool_process_task (fs->private->processing_pool,
- task,
- processing_pool_task_finished_cb,
- fs);
+ /* If process_task() returns FALSE, it means the actual db update was delayed,
+ * and in this case we need to setup queue handlers again */
+ if (!processing_pool_process_task (fs->private->processing_pool,
+ task,
+ TRUE, /* buffer! */
+ processing_pool_task_finished_cb,
+ fs)) {
+ item_queue_handlers_set_up (fs);
+ }
g_free (full_sparql);
}
@@ -1726,10 +1731,16 @@ item_remove (TrackerMinerFS *fs,
/* Add new task to processing pool */
task = processing_task_new (file);
processing_task_set_sparql (task, sparql->str);
- processing_pool_process_task (fs->private->processing_pool,
- task,
- processing_pool_task_finished_cb,
- fs);
+ /* If process_task() returns FALSE, it means the actual db update was delayed,
+ * and in this case we need to setup queue handlers again */
+ if (!processing_pool_process_task (fs->private->processing_pool,
+ task,
+ FALSE,
+ processing_pool_task_finished_cb,
+ fs)) {
+ item_queue_handlers_set_up (fs);
+ }
+
g_string_free (sparql, TRUE);
g_free (uri);
@@ -2041,10 +2052,15 @@ item_move (TrackerMinerFS *fs,
/* Add new task to processing pool */
task = processing_task_new (file);
processing_task_set_sparql (task, sparql->str);
- processing_pool_process_task (fs->private->processing_pool,
- task,
- processing_pool_task_finished_cb,
- fs);
+ /* If process_task() returns FALSE, it means the actual db update was delayed,
+ * and in this case we need to setup queue handlers again */
+ if (!processing_pool_process_task (fs->private->processing_pool,
+ task,
+ FALSE,
+ processing_pool_task_finished_cb,
+ fs)) {
+ item_queue_handlers_set_up (fs);
+ }
g_free (uri);
g_free (source_uri);
@@ -2447,6 +2463,9 @@ item_queue_handlers_cb (gpointer user_data)
process_stop (fs);
}
+ /* Flush any possible pending update here */
+ processing_pool_buffer_flush (fs->private->processing_pool);
+
tracker_thumbnailer_send ();
/* No more files left to process */
keep_processing = FALSE;
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]