[tracker/miner-fs-refactor: 1/3] libtracker-miner: Added separate implementation of processing pool



commit 8a50f06598653129487ee8c212b600101f1f41ff
Author: Aleksander Morgado <aleksander lanedo com>
Date:   Wed Oct 13 16:52:43 2010 +0200

    libtracker-miner: Added separate implementation of processing pool

 src/libtracker-miner/Makefile.am                   |    2 +
 .../tracker-miner-fs-processing-pool.c             |  344 ++++++++++++++++++++
 .../tracker-miner-fs-processing-pool.h             |   77 +++++
 3 files changed, 423 insertions(+), 0 deletions(-)
---
diff --git a/src/libtracker-miner/Makefile.am b/src/libtracker-miner/Makefile.am
index 2092292..595a155 100644
--- a/src/libtracker-miner/Makefile.am
+++ b/src/libtracker-miner/Makefile.am
@@ -33,6 +33,8 @@ libtracker_miner_ TRACKER_API_VERSION@_la_SOURCES = 	\
 	tracker-miner-dbus.h 				\
 	tracker-miner-object.c				\
 	tracker-miner-fs.c				\
+	tracker-miner-fs-processing-pool.h		\
+	tracker-miner-fs-processing-pool.c		\
 	tracker-miner-manager.c				\
 	tracker-miner-web.c				\
 	tracker-miner-web-dbus.h			\
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
new file mode 100644
index 0000000..ec18915
--- /dev/null
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -0,0 +1,344 @@
+/*
+ * Copyright (C) 2009, 2010 Nokia <ivan frade nokia com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ */
+
+#include "config.h"
+#include "tracker-miner-fs-processing-pool.h"
+
+/*------------------- PROCESSING TASK ----------------------*/
+
+typedef enum {
+	PROCESSING_TASK_STATUS_NO_POOL,
+	PROCESSING_TASK_STATUS_WAIT,
+	PROCESSING_TASK_STATUS_PROCESS,
+} ProcessingTaskStatus;
+
+struct _ProcessingTask {
+	/* The file being processed */
+	GFile *file;
+	/* The FULL sparql to be updated in the store */
+	gchar *sparql;
+	/* The context of the task */
+	gpointer context;
+	/* The context deallocation method, if any */
+	GFreeFunc context_free_func;
+
+	/* Internal status of the task */
+	ProcessingTaskStatus status;
+	/* The pool where the task was added */
+	ProcessingPool *pool;
+
+	/* Handler and user_data to use when task is fully processed */
+	ProcessingPoolTaskFinishedCallback finished_handler;
+	gpointer                           finished_user_data;
+};
+
+ProcessingTask *
+processing_task_new (GFile *file)
+{
+	ProcessingTask *task;
+
+	task = g_slice_new0 (ProcessingTask);
+	task->file = g_object_ref (file);
+	task->status = PROCESSING_TASK_STATUS_NO_POOL;
+	return task;
+}
+
+void
+processing_task_free (ProcessingTask *task)
+{
+	if (!task)
+		return;
+
+	/* Free context if requested to do so */
+	if (task->context &&
+	    task->context_free_func) {
+		task->context_free_func (task->context);
+	}
+	g_free (task->sparql);
+	g_object_unref (task->file);
+	g_slice_free (ProcessingTask, task);
+}
+
+GFile *
+processing_task_get_file (ProcessingTask *task)
+{
+	return task->file;
+}
+
+gpointer
+processing_task_get_context (ProcessingTask *task)
+{
+	return task->context;
+}
+
+void
+processing_task_set_context (ProcessingTask *task,
+                             gpointer        context,
+                             GFreeFunc       context_free_func)
+{
+	/* Free previous context if any and if requested to do so */
+	if (task->context &&
+	    task->context_free_func) {
+		task->context_free_func (task->context);
+	}
+
+	task->context = context;
+	task->context_free_func = context_free_func;
+}
+
+void
+processing_task_set_sparql (ProcessingTask *task,
+                            gchar          *sparql)
+{
+	g_free (task->sparql);
+	task->sparql = g_strdup (sparql);
+}
+
+
+/*------------------- PROCESSING POOL ----------------------*/
+
+struct _ProcessingPool {
+	/* Connection to the Store */
+	TrackerSparqlConnection *connection;
+
+	/* The tasks currently being processed */
+	GQueue *tasks;
+	/* The processing pool limit */
+	guint  limit;
+};
+
+static void
+pool_queue_free_foreach (gpointer data,
+                         gpointer user_data)
+{
+	processing_task_free (data);
+}
+
+void
+processing_pool_free (ProcessingPool *pool)
+{
+	if (!pool)
+		return;
+
+	/* Free any pending task here... shouldn't really
+	 * be any */
+	g_queue_foreach (pool->tasks,
+	                 pool_queue_free_foreach,
+	                 NULL);
+	g_queue_free (pool->tasks);
+
+	g_object_unref (pool->connection);
+	g_free (pool);
+}
+
+ProcessingPool *
+processing_pool_new (TrackerSparqlConnection *connection,
+                     guint                    limit)
+{
+	ProcessingPool *pool;
+
+	pool = g_new0 (ProcessingPool, 1);
+
+	pool->connection = g_object_ref (connection);
+	pool->limit = limit;
+	pool->tasks = g_queue_new ();
+
+	g_debug ("Processing pool created with a limit of %u tasks", limit);
+
+	return pool;
+}
+
+void
+processing_pool_set_limit (ProcessingPool *pool,
+                           guint           limit)
+{
+	g_message ("Processing pool limit is set to %u", limit);
+	pool->limit = limit;
+}
+
+ProcessingTask *
+processing_pool_find_task (ProcessingPool *pool,
+                           GFile          *file,
+                           gboolean        path_search)
+{
+	GList *l;
+
+	for (l = pool->tasks->head; l; l = g_list_next (l)) {
+		ProcessingTask *task = l->data;
+
+		if (!path_search) {
+			/* Different operations for the same file URI could be
+			 * piled up here, each being a different GFile object.
+			 * Miner implementations should really notify on the
+			 * same GFile object that's being passed, so we check for
+			 * pointer equality here, rather than doing path comparisons
+			 */
+			if(task->file == file)
+				return task;
+		} else {
+			/* Note that if there are different GFiles being
+			 * processed for the same file path, we are actually
+			 * returning the first one found, If you want exactly
+			 * the same GFile as the one as input, use the
+			 * process_data_find() method instead */
+			if (g_file_equal (task->file, file))
+				return task;
+		}
+	}
+
+	/* Not found... */
+	return NULL;
+}
+
+gboolean
+processing_pool_limit_reached (ProcessingPool *pool)
+{
+	return (g_queue_get_length (pool->tasks) >= pool->limit ?
+	        TRUE : FALSE);
+}
+
+gboolean
+processing_pool_wait_task (ProcessingPool *pool,
+                           ProcessingTask *task)
+{
+	g_assert (task->status == PROCESSING_TASK_STATUS_NO_POOL);
+
+	/* Set status of the task as WAIT */
+	task->status = PROCESSING_TASK_STATUS_WAIT;
+
+	/* Push a new task in WAIT status (so just add it to the tasks queue,
+	 * and don't process it. */
+	g_queue_push_head (pool->tasks, task);
+	task->pool = pool;
+
+	return (!processing_pool_limit_reached (pool));
+}
+
+static void
+sparql_update_cb (GObject      *object,
+                  GAsyncResult *result,
+                  gpointer      user_data)
+{
+	ProcessingTask *task;
+	GError *error = NULL;
+
+	tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
+
+	/* If update was done when crawling finished, no task will be given */
+	if (!user_data)
+		return;
+
+	task = user_data;
+
+	/* Before calling user-provided callback, REMOVE the task from the pool;
+	 * as the user-provided callback may actually modify the pool again */
+	processing_pool_remove_task (task->pool, task);
+
+	/* Call finished handler with the error, if any */
+	task->finished_handler (task, task->finished_user_data, error);
+
+	/* Deallocate unneeded stuff */
+	processing_task_free (task);
+	g_clear_error (&error);
+}
+
+gboolean
+processing_pool_process_task (ProcessingPool                     *pool,
+                              ProcessingTask                     *task,
+                              ProcessingPoolTaskFinishedCallback  finished_handler,
+                              gpointer                            user_data)
+{
+	GList *previous;
+
+	/* The task MUST have a proper SPARQL here */
+	g_assert (task->sparql != NULL);
+
+	/* First, check if the task was already added as being WAITING */
+	previous = g_queue_find (pool->tasks, task);
+	if (!previous) {
+		/* Add it to the queue */
+		g_queue_push_head (pool->tasks, task);
+		task->pool = pool;
+	} else {
+		/* Make sure it was a WAIT task */
+		g_assert (task->status == PROCESSING_TASK_STATUS_WAIT);
+	}
+
+	/* Set status of the task as PROCESS */
+	task->status = PROCESSING_TASK_STATUS_PROCESS;
+
+	task->finished_handler = finished_handler;
+	task->finished_user_data = user_data;
+
+	/* Update in the store */
+	tracker_sparql_connection_update_async (pool->connection,
+	                                        task->sparql,
+	                                        G_PRIORITY_DEFAULT,
+	                                        NULL,
+	                                        sparql_update_cb,
+	                                        task);
+
+	return (!processing_pool_limit_reached (pool));
+}
+
+void
+processing_pool_remove_task (ProcessingPool *pool,
+                             ProcessingTask *task)
+{
+	/* Remove from pool without freeing it */
+	GList *in_pool;
+
+	g_assert (pool == task->pool);
+
+	/* Make sure the task was in the pool */
+	in_pool = g_queue_find (pool->tasks, task);
+	g_assert (in_pool != NULL);
+
+	g_queue_delete_link (pool->tasks, in_pool);
+	task->pool = NULL;
+}
+
+guint
+processing_pool_get_task_count (ProcessingPool *pool)
+{
+	return g_queue_get_length (pool->tasks);
+}
+
+ProcessingTask *
+processing_pool_get_last_wait  (ProcessingPool *pool)
+{
+	GList *li;
+
+	for (li = pool->tasks->tail; li; li = g_list_previous (li)) {
+		ProcessingTask *task = li->data;
+
+		if (task->status == PROCESSING_TASK_STATUS_WAIT) {
+			return task;
+		}
+	}
+	return NULL;
+}
+
+void
+processing_pool_foreach (ProcessingPool *pool,
+                         GFunc           func,
+                         gpointer        user_data)
+{
+	g_queue_foreach (pool->tasks, func, user_data);
+}
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
new file mode 100644
index 0000000..979ab96
--- /dev/null
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2009, 2010 Nokia <ivan frade nokia com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ */
+
+#ifndef __LIBTRACKER_MINER_MINER_FS_PROCESSING_POOL_H__
+#define __LIBTRACKER_MINER_MINER_FS_PROCESSING_POOL_H__
+
+#include <glib-object.h>
+#include <gio/gio.h>
+
+#include <libtracker-sparql/tracker-sparql.h>
+#include "tracker-miner-fs.h"
+
+G_BEGIN_DECLS
+
+
+typedef struct _ProcessingTask ProcessingTask;
+typedef struct _ProcessingPool ProcessingPool;
+typedef void  (* ProcessingPoolTaskFinishedCallback) (ProcessingTask *task,
+                                                      gpointer        user_data,
+                                                      const GError   *error);
+
+
+ProcessingTask *processing_task_new         (GFile          *file);
+void            processing_task_free        (ProcessingTask *task);
+GFile          *processing_task_get_file    (ProcessingTask *task);
+gpointer        processing_task_get_context (ProcessingTask *task);
+void            processing_task_set_context (ProcessingTask *task,
+                                             gpointer        context,
+                                             GFreeFunc       context_free_func);
+void            processing_task_set_sparql  (ProcessingTask *task,
+                                             gchar          *sparql);
+
+
+ProcessingPool *processing_pool_new           (TrackerSparqlConnection *connection,
+                                               guint                    limit);
+void            processing_pool_free          (ProcessingPool          *pool);
+void            processing_pool_set_limit     (ProcessingPool          *pool,
+                                               guint                    limit);
+ProcessingTask *processing_pool_find_task     (ProcessingPool          *pool,
+                                               GFile                   *file,
+                                               gboolean                 path_search);
+gboolean        processing_pool_limit_reached (ProcessingPool          *pool);
+void            processing_pool_remove_task   (ProcessingPool          *pool,
+                                               ProcessingTask          *task);
+gboolean        processing_pool_wait_task     (ProcessingPool          *pool,
+                                               ProcessingTask          *task);
+gboolean        processing_pool_process_task  (ProcessingPool          *pool,
+                                               ProcessingTask          *task,
+                                               ProcessingPoolTaskFinishedCallback  finished_handler,
+                                               gpointer                 user_data);
+
+guint           processing_pool_get_task_count (ProcessingPool *pool);
+ProcessingTask *processing_pool_get_last_wait  (ProcessingPool          *pool);
+
+void            processing_pool_foreach        (ProcessingPool *pool,
+                                                GFunc           func,
+                                                gpointer        user_data);
+
+G_END_DECLS
+
+#endif /* __LIBTRACKER_MINER_MINER_FS_PROCESSING_POOL_H__ */



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]