[tracker/miner-fs-refactor-multi-insert: 1/16] libtracker-miner: New processing pool out of the TrackerMinerFS object



commit f802b884046568d7de1b3c79cc42e74179598ffa
Author: Aleksander Morgado <aleksander lanedo com>
Date:   Thu Oct 14 16:17:03 2010 +0200

    libtracker-miner: New processing pool out of the TrackerMinerFS object

 src/libtracker-miner/Makefile.am                   |    2 +
 .../tracker-miner-fs-processing-pool.c             |  562 ++++++++++++++++++++
 .../tracker-miner-fs-processing-pool.h             |   85 +++
 3 files changed, 649 insertions(+), 0 deletions(-)
---
diff --git a/src/libtracker-miner/Makefile.am b/src/libtracker-miner/Makefile.am
index 2092292..595a155 100644
--- a/src/libtracker-miner/Makefile.am
+++ b/src/libtracker-miner/Makefile.am
@@ -33,6 +33,8 @@ libtracker_miner_ TRACKER_API_VERSION@_la_SOURCES = 	\
 	tracker-miner-dbus.h 				\
 	tracker-miner-object.c				\
 	tracker-miner-fs.c				\
+	tracker-miner-fs-processing-pool.h		\
+	tracker-miner-fs-processing-pool.c		\
 	tracker-miner-manager.c				\
 	tracker-miner-web.c				\
 	tracker-miner-web-dbus.h			\
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
new file mode 100644
index 0000000..5ccfd07
--- /dev/null
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -0,0 +1,562 @@
+/*
+ * Copyright (C) 2009, 2010 Nokia <ivan frade nokia com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ */
+
+#include "config.h"
+#include "tracker-miner-fs-processing-pool.h"
+
+/* Maximum time (seconds) before forcing a sparql buffer flush */
+#define MAX_SPARQL_BUFFER_TIME  15
+
+/*------------------- PROCESSING TASK ----------------------*/
+
+typedef enum {
+	PROCESSING_TASK_STATUS_NO_POOL,
+	PROCESSING_TASK_STATUS_WAIT = 0,
+	PROCESSING_TASK_STATUS_PROCESS,
+	PROCESSING_TASK_STATUS_LAST
+} ProcessingTaskStatus;
+
+struct _ProcessingTask {
+	/* The file being processed */
+	GFile *file;
+	/* The FULL sparql to be updated in the store */
+	gchar *sparql;
+	/* The context of the task */
+	gpointer context;
+	/* The context deallocation method, if any */
+	GFreeFunc context_free_func;
+
+	/* Internal status of the task */
+	ProcessingTaskStatus status;
+	/* The pool where the task was added */
+	ProcessingPool *pool;
+
+	/* Handler and user_data to use when task is fully processed */
+	ProcessingPoolTaskFinishedCallback finished_handler;
+	gpointer                           finished_user_data;
+};
+
+ProcessingTask *
+processing_task_new (GFile *file)
+{
+	ProcessingTask *task;
+
+	task = g_slice_new0 (ProcessingTask);
+	task->file = g_object_ref (file);
+	task->status = PROCESSING_TASK_STATUS_NO_POOL;
+	return task;
+}
+
+void
+processing_task_free (ProcessingTask *task)
+{
+	if (!task)
+		return;
+
+	/* Free context if requested to do so */
+	if (task->context &&
+	    task->context_free_func) {
+		task->context_free_func (task->context);
+	}
+	g_free (task->sparql);
+	g_object_unref (task->file);
+	g_slice_free (ProcessingTask, task);
+}
+
+GFile *
+processing_task_get_file (ProcessingTask *task)
+{
+	return task->file;
+}
+
+gpointer
+processing_task_get_context (ProcessingTask *task)
+{
+	return task->context;
+}
+
+void
+processing_task_set_context (ProcessingTask *task,
+                             gpointer        context,
+                             GFreeFunc       context_free_func)
+{
+	/* Free previous context if any and if requested to do so */
+	if (task->context &&
+	    task->context_free_func) {
+		task->context_free_func (task->context);
+	}
+
+	task->context = context;
+	task->context_free_func = context_free_func;
+}
+
+void
+processing_task_set_sparql (ProcessingTask *task,
+                            gchar          *sparql)
+{
+	g_free (task->sparql);
+	task->sparql = g_strdup (sparql);
+}
+
+
+/*------------------- PROCESSING POOL ----------------------*/
+
+struct _ProcessingPool {
+	/* Connection to the Store */
+	TrackerSparqlConnection *connection;
+
+	/* The tasks currently in WAIT or PROCESS status */
+	GQueue *tasks[PROCESSING_TASK_STATUS_LAST];
+	/* The processing pool limits */
+	guint  limit[PROCESSING_TASK_STATUS_LAST];
+
+	/* SPARQL buffer to pile up several UPDATEs */
+	GPtrArray      *sparql_buffer;
+	GFile          *sparql_buffer_current_parent;
+	time_t          sparql_buffer_start_time;
+};
+
+static void
+pool_queue_free_foreach (gpointer data,
+                         gpointer user_data)
+{
+	processing_task_free (data);
+}
+
+void
+processing_pool_free (ProcessingPool *pool)
+{
+	guint i;
+
+	if (!pool)
+		return;
+
+	/* Free any pending task here... shouldn't really
+	 * be any */
+	for (i = PROCESSING_TASK_STATUS_WAIT;
+	     i < PROCESSING_TASK_STATUS_LAST;
+	     i++) {
+		g_queue_foreach (pool->tasks[i],
+		                 pool_queue_free_foreach,
+		                 NULL);
+		g_queue_free (pool->tasks[i]);
+	}
+
+	if (pool->sparql_buffer_current_parent) {
+		g_object_unref (pool->sparql_buffer_current_parent);
+	}
+
+	if (pool->sparql_buffer) {
+		g_ptr_array_free (pool->sparql_buffer, TRUE);
+	}
+
+	g_object_unref (pool->connection);
+	g_free (pool);
+}
+
+ProcessingPool *
+processing_pool_new (TrackerSparqlConnection *connection,
+                     guint                    limit_wait,
+                     guint                    limit_process)
+{
+	ProcessingPool *pool;
+
+	pool = g_new0 (ProcessingPool, 1);
+
+	pool->connection = g_object_ref (connection);
+	pool->limit[PROCESSING_TASK_STATUS_WAIT] = limit_wait;
+	pool->limit[PROCESSING_TASK_STATUS_PROCESS] = limit_process;
+
+	pool->tasks[PROCESSING_TASK_STATUS_WAIT] = g_queue_new ();
+	pool->tasks[PROCESSING_TASK_STATUS_PROCESS] = g_queue_new ();
+
+	g_debug ("Processing pool created with a limit of "
+	         "%u tasks in WAIT status and "
+	         "%u tasks in PROCESS status",
+	         limit_wait,
+	         limit_process);
+
+	return pool;
+}
+
+void
+processing_pool_set_wait_limit (ProcessingPool *pool,
+                                guint           limit)
+{
+	g_message ("Processing pool limit for WAIT tasks set to %u", limit);
+	pool->limit[PROCESSING_TASK_STATUS_WAIT] = limit;
+}
+
+void
+processing_pool_set_process_limit (ProcessingPool *pool,
+                                   guint           limit)
+{
+	g_message ("Processing pool limit for PROCESS tasks set to %u", limit);
+	pool->limit[PROCESSING_TASK_STATUS_PROCESS] = limit;
+}
+
+guint
+processing_pool_get_wait_limit (ProcessingPool *pool)
+{
+	return pool->limit[PROCESSING_TASK_STATUS_WAIT];
+}
+
+guint
+processing_pool_get_process_limit (ProcessingPool *pool)
+{
+	return pool->limit[PROCESSING_TASK_STATUS_PROCESS];
+}
+
+gboolean
+processing_pool_wait_limit_reached (ProcessingPool *pool)
+{
+	return ((g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_WAIT]) >=
+	         pool->limit[PROCESSING_TASK_STATUS_WAIT]) ?
+	        TRUE : FALSE);
+}
+
+gboolean
+processing_pool_process_limit_reached (ProcessingPool *pool)
+{
+	return ((g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_PROCESS]) >=
+	         pool->limit[PROCESSING_TASK_STATUS_PROCESS]) ?
+	        TRUE : FALSE);
+}
+
+ProcessingTask *
+processing_pool_find_task (ProcessingPool *pool,
+                           GFile          *file,
+                           gboolean        path_search)
+{
+	guint i;
+
+	for (i = PROCESSING_TASK_STATUS_WAIT;
+	     i < PROCESSING_TASK_STATUS_PROCESS;
+	     i++) {
+		GList *l;
+
+		for (l = pool->tasks[i]->head; l; l = g_list_next (l)) {
+			ProcessingTask *task = l->data;
+
+			if (!path_search) {
+				/* Different operations for the same file URI could be
+				 * piled up here, each being a different GFile object.
+				 * Miner implementations should really notify on the
+				 * same GFile object that's being passed, so we check for
+				 * pointer equality here, rather than doing path comparisons
+				 */
+				if(task->file == file)
+					return task;
+			} else {
+				/* Note that if there are different GFiles being
+				 * processed for the same file path, we are actually
+				 * returning the first one found, If you want exactly
+				 * the same GFile as the one as input, use the
+				 * process_data_find() method instead */
+				if (g_file_equal (task->file, file))
+					return task;
+			}
+		}
+	}
+
+	/* Not found... */
+	return NULL;
+}
+
+void
+processing_pool_wait_task (ProcessingPool *pool,
+                           ProcessingTask *task)
+{
+	g_assert (task->status == PROCESSING_TASK_STATUS_NO_POOL);
+
+	/* Set status of the task as WAIT */
+	task->status = PROCESSING_TASK_STATUS_WAIT;
+
+	/* Push a new task in WAIT status (so just add it to the tasks queue,
+	 * and don't process it. */
+	g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_WAIT], task);
+	task->pool = pool;
+}
+
+static void
+processing_pool_sparql_update_cb (GObject      *object,
+                                  GAsyncResult *result,
+                                  gpointer      user_data)
+{
+	ProcessingTask *task;
+	GError *error = NULL;
+
+	tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
+
+	/* If update was done when crawling finished, no task will be given */
+	if (!user_data)
+		return;
+
+	task = user_data;
+
+	/* Before calling user-provided callback, REMOVE the task from the pool;
+	 * as the user-provided callback may actually modify the pool again */
+	processing_pool_remove_task (task->pool, task);
+
+	/* Call finished handler with the error, if any */
+	task->finished_handler (task, task->finished_user_data, error);
+
+	/* Deallocate unneeded stuff */
+	processing_task_free (task);
+	g_clear_error (&error);
+}
+
+static void
+processing_pool_sparql_update_array_cb (GObject      *object,
+                                        GAsyncResult *result,
+                                        gpointer      user_data)
+{
+	GError *global_error = NULL;
+	GPtrArray *sparql_array_errors;
+	GPtrArray *sparql_array;
+	guint i;
+
+	/* Get arrays of errors and queries */
+	sparql_array = user_data;
+	sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
+	                                                                     result,
+	                                                                     &global_error);
+	if (global_error) {
+		g_critical ("(Sparql buffer) Could not execute array-update with '%u' items: %s",
+		            sparql_array->len,
+		            global_error->message);
+	}
+
+	/* Report status on each task of the batch update */
+	for (i = 0; i < sparql_array->len; i++) {
+		ProcessingTask *task;
+
+		task = g_ptr_array_index (sparql_array, i);
+
+		/* Before calling user-provided callback, REMOVE the task from the pool;
+		 * as the user-provided callback may actually modify the pool again */
+		processing_pool_remove_task (task->pool, task);
+
+		/* Call finished handler with the error, if any */
+		task->finished_handler (task, task->finished_user_data,
+		                        (global_error ?
+		                         global_error :
+		                         g_ptr_array_index (sparql_array_errors, i)));
+
+		/* No need to deallocate the task here, it will be done when
+		 * unref-ing the GPtrArray below */
+	}
+
+	/* Unref the arrays of errors and queries */
+	if (sparql_array_errors)
+		g_ptr_array_unref (sparql_array_errors);
+	/* Note that tasks are actually deallocated here */
+	g_ptr_array_unref (sparql_array);
+	g_clear_error (&global_error);
+}
+
+void
+processing_pool_buffer_flush (ProcessingPool *pool)
+{
+	guint i;
+	GPtrArray *sparql_array;
+
+	if (!pool->sparql_buffer)
+		return;
+
+	/* Loop buffer and construct array of strings */
+	sparql_array = g_ptr_array_new ();
+	for (i = 0; i < pool->sparql_buffer->len; i++) {
+		ProcessingTask *task;
+
+		task = g_ptr_array_index (pool->sparql_buffer, i);
+		/* Add original string, not a duplicate */
+		g_ptr_array_add (sparql_array, task->sparql);
+	}
+
+	g_debug ("(Sparql buffer) Flushing buffer with '%u' items",
+	         pool->sparql_buffer->len);
+	tracker_sparql_connection_update_array_async (pool->connection,
+	                                              (gchar **)(sparql_array->pdata),
+	                                              sparql_array->len,
+	                                              G_PRIORITY_DEFAULT,
+	                                              NULL,
+	                                              processing_pool_sparql_update_array_cb,
+	                                              pool->sparql_buffer);
+
+	/* Clear current parent */
+	if (pool->sparql_buffer_current_parent) {
+		g_object_unref (pool->sparql_buffer_current_parent);
+		pool->sparql_buffer_current_parent = NULL;
+	}
+
+	/* Clear temp buffer */
+	g_ptr_array_free (sparql_array, TRUE);
+	pool->sparql_buffer_start_time = 0;
+	/* Note the whole buffer is passed to the update_array callback,
+	 * so no need to free it. */
+	pool->sparql_buffer = NULL;
+}
+
+gboolean
+processing_pool_process_task (ProcessingPool                     *pool,
+                              ProcessingTask                     *task,
+                              gboolean                            buffer,
+                              ProcessingPoolTaskFinishedCallback  finished_handler,
+                              gpointer                            user_data)
+{
+	GList *previous;
+
+	/* The task MUST have a proper SPARQL here */
+	g_assert (task->sparql != NULL);
+
+	/* First, check if the task was already added as being WAITING */
+	previous = g_queue_find (pool->tasks[PROCESSING_TASK_STATUS_WAIT], task);
+	if (!previous) {
+		/* Add it to the PROCESS queue */
+		g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_PROCESS], task);
+		task->pool = pool;
+	} else {
+		/* Make sure it was a WAIT task */
+		g_assert (task->status == PROCESSING_TASK_STATUS_WAIT);
+		/* Move task from WAIT queue to PROCESS queue */
+		g_queue_delete_link (pool->tasks[PROCESSING_TASK_STATUS_WAIT], previous);
+		g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_PROCESS], task);
+	}
+
+	/* Set status of the task as PROCESS */
+	task->status = PROCESSING_TASK_STATUS_PROCESS;
+
+	task->finished_handler = finished_handler;
+	task->finished_user_data = user_data;
+
+	/* If buffering not requested, flush previous buffer and then the new update */
+	if (!buffer) {
+		/* Flush previous */
+		processing_pool_buffer_flush (pool);
+		/* And update the new one */
+		tracker_sparql_connection_update_async (pool->connection,
+		                                        task->sparql,
+		                                        G_PRIORITY_DEFAULT,
+		                                        NULL,
+		                                        processing_pool_sparql_update_cb,
+		                                        task);
+
+		return TRUE;
+	} else {
+		GFile *parent;
+		gboolean flushed = FALSE;
+
+		/* Get parent of this file we're updating/creating */
+		parent = g_file_get_parent (task->file);
+
+		/* Start buffer if not already done */
+		if (!pool->sparql_buffer) {
+			pool->sparql_buffer = g_ptr_array_new_with_free_func ((GDestroyNotify)processing_task_free);
+			pool->sparql_buffer_start_time = time (NULL);
+		}
+
+		/* Set current parent if not set already */
+		if (!pool->sparql_buffer_current_parent && parent) {
+			pool->sparql_buffer_current_parent = g_object_ref (parent);
+		}
+
+		/* Add task to array */
+		g_ptr_array_add (pool->sparql_buffer, task);
+
+		/* Flush buffer if:
+		 *  - Last item has no parent
+		 *  - Parent change was detected
+		 *  - 'limit_process' items reached
+		 *  - Not flushed in the last MAX_SPARQL_BUFFER_TIME seconds
+		 */
+		if (!parent ||
+		    !g_file_equal (parent, pool->sparql_buffer_current_parent) ||
+		    processing_pool_process_limit_reached (pool) ||
+		    (time (NULL) - pool->sparql_buffer_start_time > MAX_SPARQL_BUFFER_TIME)) {
+			/* Flush! */
+			processing_pool_buffer_flush (pool);
+			flushed = TRUE;
+		}
+
+		if (parent)
+			g_object_unref (parent);
+
+		return flushed;
+	}
+}
+
+void
+processing_pool_remove_task (ProcessingPool *pool,
+                             ProcessingTask *task)
+{
+	/* Remove from pool without freeing it */
+	GList *in_pool;
+
+	g_assert (pool == task->pool);
+
+	/* Make sure the task was in the pool */
+	in_pool = g_queue_find (pool->tasks[task->status], task);
+	g_assert (in_pool != NULL);
+
+	g_queue_delete_link (pool->tasks[task->status], in_pool);
+	task->pool = NULL;
+	task->status = PROCESSING_TASK_STATUS_NO_POOL;
+}
+
+guint
+processing_pool_get_wait_task_count (ProcessingPool *pool)
+{
+	return g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_WAIT]);
+}
+
+guint
+processing_pool_get_process_task_count (ProcessingPool *pool)
+{
+	return g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_PROCESS]);
+}
+
+ProcessingTask *
+processing_pool_get_last_wait (ProcessingPool *pool)
+{
+	GList *li;
+
+	for (li = pool->tasks[PROCESSING_TASK_STATUS_WAIT]->tail; li; li = g_list_previous (li)) {
+		ProcessingTask *task = li->data;
+
+		if (task->status == PROCESSING_TASK_STATUS_WAIT) {
+			return task;
+		}
+	}
+	return NULL;
+}
+
+void
+processing_pool_foreach (ProcessingPool *pool,
+                         GFunc           func,
+                         gpointer        user_data)
+{
+	guint i;
+
+	for (i = PROCESSING_TASK_STATUS_WAIT;
+	     i < PROCESSING_TASK_STATUS_PROCESS;
+	     i++) {
+		g_queue_foreach (pool->tasks[i], func, user_data);
+	}
+}
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
new file mode 100644
index 0000000..28c7c8a
--- /dev/null
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -0,0 +1,85 @@
+/*
+ * Copyright (C) 2009, 2010 Nokia <ivan frade nokia com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ */
+
+#ifndef __LIBTRACKER_MINER_MINER_FS_PROCESSING_POOL_H__
+#define __LIBTRACKER_MINER_MINER_FS_PROCESSING_POOL_H__
+
+#include <glib-object.h>
+#include <gio/gio.h>
+
+#include <libtracker-sparql/tracker-sparql.h>
+#include "tracker-miner-fs.h"
+
+G_BEGIN_DECLS
+
+
+typedef struct _ProcessingTask ProcessingTask;
+typedef struct _ProcessingPool ProcessingPool;
+typedef void  (* ProcessingPoolTaskFinishedCallback) (ProcessingTask *task,
+                                                      gpointer        user_data,
+                                                      const GError   *error);
+
+
+ProcessingTask *processing_task_new         (GFile          *file);
+void            processing_task_free        (ProcessingTask *task);
+GFile          *processing_task_get_file    (ProcessingTask *task);
+gpointer        processing_task_get_context (ProcessingTask *task);
+void            processing_task_set_context (ProcessingTask *task,
+                                             gpointer        context,
+                                             GFreeFunc       context_free_func);
+void            processing_task_set_sparql  (ProcessingTask *task,
+                                             gchar          *sparql);
+
+
+ProcessingPool *processing_pool_new                   (TrackerSparqlConnection *connection,
+                                                       guint                    limit_wait,
+                                                       guint                    limit_process);
+void            processing_pool_free                  (ProcessingPool          *pool);
+void            processing_pool_set_wait_limit        (ProcessingPool          *pool,
+                                                       guint                    limit);
+void            processing_pool_set_process_limit     (ProcessingPool          *pool,
+                                                       guint                    limit);
+guint           processing_pool_get_wait_limit        (ProcessingPool          *pool);
+guint           processing_pool_get_process_limit     (ProcessingPool          *pool);
+ProcessingTask *processing_pool_find_task             (ProcessingPool          *pool,
+                                                       GFile                   *file,
+                                                       gboolean                 path_search);
+gboolean        processing_pool_wait_limit_reached    (ProcessingPool          *pool);
+gboolean        processing_pool_process_limit_reached (ProcessingPool          *pool);
+
+void            processing_pool_remove_task           (ProcessingPool          *pool,
+                                                       ProcessingTask          *task);
+void            processing_pool_wait_task             (ProcessingPool          *pool,
+                                                       ProcessingTask          *task);
+gboolean        processing_pool_process_task          (ProcessingPool          *pool,
+                                                       ProcessingTask          *task,
+                                                       gboolean                 buffer,
+                                                       ProcessingPoolTaskFinishedCallback  finished_handler,
+                                                       gpointer                 user_data);
+guint           processing_pool_get_wait_task_count    (ProcessingPool         *pool);
+guint           processing_pool_get_process_task_count (ProcessingPool         *pool);
+ProcessingTask *processing_pool_get_last_wait          (ProcessingPool         *pool);
+void            processing_pool_foreach                (ProcessingPool         *pool,
+                                                        GFunc                   func,
+                                                        gpointer                user_data);
+void            processing_pool_buffer_flush           (ProcessingPool         *pool);
+
+G_END_DECLS
+
+#endif /* __LIBTRACKER_MINER_MINER_FS_PROCESSING_POOL_H__ */



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]