[tracker/miner-fs-refactor: 1/3] libtracker-miner: Added separate implementation of processing pool
- From: Aleksander Morgado <aleksm src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-fs-refactor: 1/3] libtracker-miner: Added separate implementation of processing pool
- Date: Wed, 13 Oct 2010 17:15:29 +0000 (UTC)
commit 8a50f06598653129487ee8c212b600101f1f41ff
Author: Aleksander Morgado <aleksander lanedo com>
Date: Wed Oct 13 16:52:43 2010 +0200
libtracker-miner: Added separate implementation of processing pool
src/libtracker-miner/Makefile.am | 2 +
.../tracker-miner-fs-processing-pool.c | 344 ++++++++++++++++++++
.../tracker-miner-fs-processing-pool.h | 77 +++++
3 files changed, 423 insertions(+), 0 deletions(-)
---
diff --git a/src/libtracker-miner/Makefile.am b/src/libtracker-miner/Makefile.am
index 2092292..595a155 100644
--- a/src/libtracker-miner/Makefile.am
+++ b/src/libtracker-miner/Makefile.am
@@ -33,6 +33,8 @@ libtracker_miner_ TRACKER_API_VERSION@_la_SOURCES = \
tracker-miner-dbus.h \
tracker-miner-object.c \
tracker-miner-fs.c \
+ tracker-miner-fs-processing-pool.h \
+ tracker-miner-fs-processing-pool.c \
tracker-miner-manager.c \
tracker-miner-web.c \
tracker-miner-web-dbus.h \
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
new file mode 100644
index 0000000..ec18915
--- /dev/null
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -0,0 +1,344 @@
+/*
+ * Copyright (C) 2009, 2010 Nokia <ivan frade nokia com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include "config.h"
+#include "tracker-miner-fs-processing-pool.h"
+
+/*------------------- PROCESSING TASK ----------------------*/
+
+typedef enum {
+ PROCESSING_TASK_STATUS_NO_POOL,
+ PROCESSING_TASK_STATUS_WAIT,
+ PROCESSING_TASK_STATUS_PROCESS,
+} ProcessingTaskStatus;
+
+struct _ProcessingTask {
+ /* The file being processed */
+ GFile *file;
+ /* The FULL sparql to be updated in the store */
+ gchar *sparql;
+ /* The context of the task */
+ gpointer context;
+ /* The context deallocation method, if any */
+ GFreeFunc context_free_func;
+
+ /* Internal status of the task */
+ ProcessingTaskStatus status;
+ /* The pool where the task was added */
+ ProcessingPool *pool;
+
+ /* Handler and user_data to use when task is fully processed */
+ ProcessingPoolTaskFinishedCallback finished_handler;
+ gpointer finished_user_data;
+};
+
+ProcessingTask *
+processing_task_new (GFile *file)
+{
+ ProcessingTask *task;
+
+ task = g_slice_new0 (ProcessingTask);
+ task->file = g_object_ref (file);
+ task->status = PROCESSING_TASK_STATUS_NO_POOL;
+ return task;
+}
+
+void
+processing_task_free (ProcessingTask *task)
+{
+ if (!task)
+ return;
+
+ /* Free context if requested to do so */
+ if (task->context &&
+ task->context_free_func) {
+ task->context_free_func (task->context);
+ }
+ g_free (task->sparql);
+ g_object_unref (task->file);
+ g_slice_free (ProcessingTask, task);
+}
+
+GFile *
+processing_task_get_file (ProcessingTask *task)
+{
+ return task->file;
+}
+
+gpointer
+processing_task_get_context (ProcessingTask *task)
+{
+ return task->context;
+}
+
+void
+processing_task_set_context (ProcessingTask *task,
+ gpointer context,
+ GFreeFunc context_free_func)
+{
+ /* Free previous context if any and if requested to do so */
+ if (task->context &&
+ task->context_free_func) {
+ task->context_free_func (task->context);
+ }
+
+ task->context = context;
+ task->context_free_func = context_free_func;
+}
+
+void
+processing_task_set_sparql (ProcessingTask *task,
+ gchar *sparql)
+{
+ g_free (task->sparql);
+ task->sparql = g_strdup (sparql);
+}
+
+
+/*------------------- PROCESSING POOL ----------------------*/
+
+struct _ProcessingPool {
+ /* Connection to the Store */
+ TrackerSparqlConnection *connection;
+
+ /* The tasks currently being processed */
+ GQueue *tasks;
+ /* The processing pool limit */
+ guint limit;
+};
+
+static void
+pool_queue_free_foreach (gpointer data,
+ gpointer user_data)
+{
+ processing_task_free (data);
+}
+
+void
+processing_pool_free (ProcessingPool *pool)
+{
+ if (!pool)
+ return;
+
+ /* Free any pending task here... shouldn't really
+ * be any */
+ g_queue_foreach (pool->tasks,
+ pool_queue_free_foreach,
+ NULL);
+ g_queue_free (pool->tasks);
+
+ g_object_unref (pool->connection);
+ g_free (pool);
+}
+
+ProcessingPool *
+processing_pool_new (TrackerSparqlConnection *connection,
+ guint limit)
+{
+ ProcessingPool *pool;
+
+ pool = g_new0 (ProcessingPool, 1);
+
+ pool->connection = g_object_ref (connection);
+ pool->limit = limit;
+ pool->tasks = g_queue_new ();
+
+ g_debug ("Processing pool created with a limit of %u tasks", limit);
+
+ return pool;
+}
+
+void
+processing_pool_set_limit (ProcessingPool *pool,
+ guint limit)
+{
+ g_message ("Processing pool limit is set to %u", limit);
+ pool->limit = limit;
+}
+
+ProcessingTask *
+processing_pool_find_task (ProcessingPool *pool,
+ GFile *file,
+ gboolean path_search)
+{
+ GList *l;
+
+ for (l = pool->tasks->head; l; l = g_list_next (l)) {
+ ProcessingTask *task = l->data;
+
+ if (!path_search) {
+ /* Different operations for the same file URI could be
+ * piled up here, each being a different GFile object.
+ * Miner implementations should really notify on the
+ * same GFile object that's being passed, so we check for
+ * pointer equality here, rather than doing path comparisons
+ */
+ if(task->file == file)
+ return task;
+ } else {
+ /* Note that if there are different GFiles being
+ * processed for the same file path, we are actually
+ * returning the first one found, If you want exactly
+ * the same GFile as the one as input, use the
+ * process_data_find() method instead */
+ if (g_file_equal (task->file, file))
+ return task;
+ }
+ }
+
+ /* Not found... */
+ return NULL;
+}
+
+gboolean
+processing_pool_limit_reached (ProcessingPool *pool)
+{
+ return (g_queue_get_length (pool->tasks) >= pool->limit ?
+ TRUE : FALSE);
+}
+
+gboolean
+processing_pool_wait_task (ProcessingPool *pool,
+ ProcessingTask *task)
+{
+ g_assert (task->status == PROCESSING_TASK_STATUS_NO_POOL);
+
+ /* Set status of the task as WAIT */
+ task->status = PROCESSING_TASK_STATUS_WAIT;
+
+ /* Push a new task in WAIT status (so just add it to the tasks queue,
+ * and don't process it. */
+ g_queue_push_head (pool->tasks, task);
+ task->pool = pool;
+
+ return (!processing_pool_limit_reached (pool));
+}
+
+static void
+sparql_update_cb (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ ProcessingTask *task;
+ GError *error = NULL;
+
+ tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
+
+ /* If update was done when crawling finished, no task will be given */
+ if (!user_data)
+ return;
+
+ task = user_data;
+
+ /* Before calling user-provided callback, REMOVE the task from the pool;
+ * as the user-provided callback may actually modify the pool again */
+ processing_pool_remove_task (task->pool, task);
+
+ /* Call finished handler with the error, if any */
+ task->finished_handler (task, task->finished_user_data, error);
+
+ /* Deallocate unneeded stuff */
+ processing_task_free (task);
+ g_clear_error (&error);
+}
+
+gboolean
+processing_pool_process_task (ProcessingPool *pool,
+ ProcessingTask *task,
+ ProcessingPoolTaskFinishedCallback finished_handler,
+ gpointer user_data)
+{
+ GList *previous;
+
+ /* The task MUST have a proper SPARQL here */
+ g_assert (task->sparql != NULL);
+
+ /* First, check if the task was already added as being WAITING */
+ previous = g_queue_find (pool->tasks, task);
+ if (!previous) {
+ /* Add it to the queue */
+ g_queue_push_head (pool->tasks, task);
+ task->pool = pool;
+ } else {
+ /* Make sure it was a WAIT task */
+ g_assert (task->status == PROCESSING_TASK_STATUS_WAIT);
+ }
+
+ /* Set status of the task as PROCESS */
+ task->status = PROCESSING_TASK_STATUS_PROCESS;
+
+ task->finished_handler = finished_handler;
+ task->finished_user_data = user_data;
+
+ /* Update in the store */
+ tracker_sparql_connection_update_async (pool->connection,
+ task->sparql,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ sparql_update_cb,
+ task);
+
+ return (!processing_pool_limit_reached (pool));
+}
+
+void
+processing_pool_remove_task (ProcessingPool *pool,
+ ProcessingTask *task)
+{
+ /* Remove from pool without freeing it */
+ GList *in_pool;
+
+ g_assert (pool == task->pool);
+
+ /* Make sure the task was in the pool */
+ in_pool = g_queue_find (pool->tasks, task);
+ g_assert (in_pool != NULL);
+
+ g_queue_delete_link (pool->tasks, in_pool);
+ task->pool = NULL;
+}
+
+guint
+processing_pool_get_task_count (ProcessingPool *pool)
+{
+ return g_queue_get_length (pool->tasks);
+}
+
+ProcessingTask *
+processing_pool_get_last_wait (ProcessingPool *pool)
+{
+ GList *li;
+
+ for (li = pool->tasks->tail; li; li = g_list_previous (li)) {
+ ProcessingTask *task = li->data;
+
+ if (task->status == PROCESSING_TASK_STATUS_WAIT) {
+ return task;
+ }
+ }
+ return NULL;
+}
+
+void
+processing_pool_foreach (ProcessingPool *pool,
+ GFunc func,
+ gpointer user_data)
+{
+ g_queue_foreach (pool->tasks, func, user_data);
+}
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
new file mode 100644
index 0000000..979ab96
--- /dev/null
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2009, 2010 Nokia <ivan frade nokia com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __LIBTRACKER_MINER_MINER_FS_PROCESSING_POOL_H__
+#define __LIBTRACKER_MINER_MINER_FS_PROCESSING_POOL_H__
+
+#include <glib-object.h>
+#include <gio/gio.h>
+
+#include <libtracker-sparql/tracker-sparql.h>
+#include "tracker-miner-fs.h"
+
+G_BEGIN_DECLS
+
+
+typedef struct _ProcessingTask ProcessingTask;
+typedef struct _ProcessingPool ProcessingPool;
+typedef void (* ProcessingPoolTaskFinishedCallback) (ProcessingTask *task,
+ gpointer user_data,
+ const GError *error);
+
+
+ProcessingTask *processing_task_new (GFile *file);
+void processing_task_free (ProcessingTask *task);
+GFile *processing_task_get_file (ProcessingTask *task);
+gpointer processing_task_get_context (ProcessingTask *task);
+void processing_task_set_context (ProcessingTask *task,
+ gpointer context,
+ GFreeFunc context_free_func);
+void processing_task_set_sparql (ProcessingTask *task,
+ gchar *sparql);
+
+
+ProcessingPool *processing_pool_new (TrackerSparqlConnection *connection,
+ guint limit);
+void processing_pool_free (ProcessingPool *pool);
+void processing_pool_set_limit (ProcessingPool *pool,
+ guint limit);
+ProcessingTask *processing_pool_find_task (ProcessingPool *pool,
+ GFile *file,
+ gboolean path_search);
+gboolean processing_pool_limit_reached (ProcessingPool *pool);
+void processing_pool_remove_task (ProcessingPool *pool,
+ ProcessingTask *task);
+gboolean processing_pool_wait_task (ProcessingPool *pool,
+ ProcessingTask *task);
+gboolean processing_pool_process_task (ProcessingPool *pool,
+ ProcessingTask *task,
+ ProcessingPoolTaskFinishedCallback finished_handler,
+ gpointer user_data);
+
+guint processing_pool_get_task_count (ProcessingPool *pool);
+ProcessingTask *processing_pool_get_last_wait (ProcessingPool *pool);
+
+void processing_pool_foreach (ProcessingPool *pool,
+ GFunc func,
+ gpointer user_data);
+
+G_END_DECLS
+
+#endif /* __LIBTRACKER_MINER_MINER_FS_PROCESSING_POOL_H__ */
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]