[tracker/tracker-store] Introduce TrackerStore, a queue for updating



commit a3d8a58019659273071cfb3a6311377fbd946068
Author: Philip Van Hoof <philip codeminded be>
Date:   Thu May 21 10:40:37 2009 +0200

    Introduce TrackerStore, a queue for updating
    
    TrackerStoreQueue is or will be the internal API for most data access and
    storage. This implementation uses the GMainLoop and a GQueue for all batch
    requests.
    
    Due to its use of the GMainLoop the API is going to block the mainloop, but it
    wont block the caller's code. It will instead find a nice slot during the
    GMainLoop the execute itself on. When finished it'll execute first your
    callback and then your GDestroyNotify. All parameters that you receive in the
    callback that aren't user_data must not be freed (they are to be considered
    read-only in your callback).
---
 src/tracker-store/Makefile.am         |    4 +-
 src/tracker-store/tracker-main.c      |    3 +
 src/tracker-store/tracker-resources.c |  102 ++++++-----
 src/tracker-store/tracker-store.c     |  328 +++++++++++++++++++++++++++++++++
 src/tracker-store/tracker-store.h     |   60 ++++++
 5 files changed, 448 insertions(+), 49 deletions(-)

diff --git a/src/tracker-store/Makefile.am b/src/tracker-store/Makefile.am
index 3435d76..10e2c7a 100644
--- a/src/tracker-store/Makefile.am
+++ b/src/tracker-store/Makefile.am
@@ -51,7 +51,9 @@ tracker_store_SOURCES =							\
 	tracker-push-registrar.c					\
 	tracker-push-registrar.h					\
 	tracker-resource-class.c					\
-	tracker-resource-class.h
+	tracker-resource-class.h					\
+	tracker-store.c							\
+	tracker-store.h
 
 if OS_WIN32
 tracker_store_win_libs = -lws2_32 -lkernel32
diff --git a/src/tracker-store/tracker-main.c b/src/tracker-store/tracker-main.c
index fe885da..5c23125 100644
--- a/src/tracker-store/tracker-main.c
+++ b/src/tracker-store/tracker-main.c
@@ -72,6 +72,7 @@
 #include "tracker-volume-cleanup.h"
 #include "tracker-backup.h"
 #include "tracker-daemon.h"
+#include "tracker-store.h"
 
 #ifdef G_OS_WIN32
 #include <windows.h>
@@ -898,6 +899,7 @@ main (gint argc, gchar *argv[])
 			  NULL);
 #endif /* HAVE_HAL */
 
+	tracker_store_init ();
 	tracker_status_init (config, hal_power);
 
 	tracker_module_config_init ();
@@ -1009,6 +1011,7 @@ main (gint argc, gchar *argv[])
 	tracker_nfs_lock_shutdown ();
 	tracker_status_shutdown ();
 	tracker_turtle_shutdown ();
+	tracker_store_shutdown ();
 	tracker_thumbnailer_shutdown ();
 	tracker_log_shutdown ();
 
diff --git a/src/tracker-store/tracker-resources.c b/src/tracker-store/tracker-resources.c
index 700ddcc..dcb4367 100644
--- a/src/tracker-store/tracker-resources.c
+++ b/src/tracker-store/tracker-resources.c
@@ -42,12 +42,11 @@
 #include "tracker-resources.h"
 #include "tracker-resource-class.h"
 #include "tracker-events.h"
+#include "tracker-store.h"
 
 #define RDF_PREFIX TRACKER_RDF_PREFIX
 #define RDF_TYPE RDF_PREFIX "type"
 
-/* Transaction every 'x' batch items */
-#define TRACKER_STORE_TRANSACTION_MAX	4000
 
 G_DEFINE_TYPE(TrackerResources, tracker_resources, G_TYPE_OBJECT)
 
@@ -55,11 +54,14 @@ G_DEFINE_TYPE(TrackerResources, tracker_resources, G_TYPE_OBJECT)
 
 
 typedef struct {
-	gboolean    batch_mode;
-	gint        batch_count;
 	GSList     *event_sources;
 } TrackerResourcesPrivate;
 
+typedef struct {
+	DBusGMethodInvocation *context;
+	guint request_id;
+} TrackerDBusMethodInfo;
+
 static void
 free_event_sources (TrackerResourcesPrivate *priv)
 {
@@ -112,6 +114,13 @@ tracker_resources_new (void)
  * Functions
  */
 
+static void
+destroy_method_info (gpointer user_data)
+{
+	g_slice_free (TrackerDBusMethodInfo, user_data);
+}
+
+
 void
 tracker_resources_insert (TrackerResources	     *self,
 			  const gchar                *subject,
@@ -133,7 +142,7 @@ tracker_resources_insert (TrackerResources	     *self,
 				  "'%s' '%s' '%s'",
 				  subject, predicate, object);
 
-	tracker_data_insert_statement (subject, predicate, object);
+	tracker_store_insert_statement (subject, predicate, object);
 
 	dbus_g_method_return (context);
 
@@ -161,7 +170,7 @@ tracker_resources_delete (TrackerResources	     *self,
 				  "'%s' '%s' '%s'",
 				  subject, predicate, object);
 
-	tracker_data_delete_statement (subject, predicate, object);
+	tracker_store_delete_statement (subject, predicate, object);
 
 	dbus_g_method_return (context);
 
@@ -230,7 +239,7 @@ tracker_resources_sparql_query (TrackerResources	 *self,
 				  "query:'%s'",
 				  query);
 
-	result_set = tracker_data_query_sparql (query, &actual_error);
+	result_set = tracker_store_sparql_query (query, &actual_error);
 
 	if (actual_error) {
 		tracker_dbus_request_failed (request_id,
@@ -270,19 +279,12 @@ tracker_resources_sparql_update (TrackerResources	 *self,
 
 	tracker_dbus_async_return_if_fail (update != NULL, context);
 
-	if (priv->batch_mode) {
-		/* commit pending batch items */
-		tracker_data_commit_transaction ();
-		priv->batch_mode = FALSE;
-		priv->batch_count = 0;
-	}
-
 	tracker_dbus_request_new (request_id,
 				  "DBus request for SPARQL Update, "
 				  "update:'%s'",
 				  update);
 
-	tracker_data_update_sparql (update, &actual_error);
+	tracker_store_sparql_update (update, &actual_error);
 
 	if (actual_error) {
 		tracker_dbus_request_failed (request_id,
@@ -298,14 +300,32 @@ tracker_resources_sparql_update (TrackerResources	 *self,
 	tracker_dbus_request_success (request_id);
 }
 
+static void
+batch_update_callback (GError *error, gpointer user_data)
+{
+	TrackerDBusMethodInfo *info = user_data;
+
+	if (error) {
+		tracker_dbus_request_failed (info->request_id,
+					     &error,
+					     NULL);
+		dbus_g_method_return_error (info->context, error);
+		return;
+	}
+
+	dbus_g_method_return (info->context);
+
+	tracker_dbus_request_success (info->request_id);
+}
+
 void
 tracker_resources_batch_sparql_update (TrackerResources          *self,
 				       const gchar	         *update,
 				       DBusGMethodInvocation	 *context,
 				       GError			**error)
 {
+	TrackerDBusMethodInfo   *info;
 	TrackerResourcesPrivate *priv;
-	GError 		     *actual_error = NULL;
 	guint		      request_id;
 
 	priv = TRACKER_RESOURCES_GET_PRIVATE (self);
@@ -319,35 +339,23 @@ tracker_resources_batch_sparql_update (TrackerResources          *self,
 				  "update:'%s'",
 				  update);
 
-	if (!priv->batch_mode) {
-		/* switch to batch mode
-		   delays database commits to improve performance */
-		priv->batch_mode = TRUE;
-		priv->batch_count = 0;
-		tracker_data_begin_transaction ();
-	}
+	info = g_slice_new (TrackerDBusMethodInfo);
 
-	tracker_data_update_sparql (update, &actual_error);
+	info->request_id = request_id;
+	info->context = context;
 
-	if (actual_error) {
-		tracker_dbus_request_failed (request_id,
-					     &actual_error,
-					     NULL);
-		dbus_g_method_return_error (context, actual_error);
-		g_error_free (actual_error);
-		return;
-	}
+	tracker_store_queue_sparql_update (update, batch_update_callback,
+	                                   info, destroy_method_info);
 
-	if (++priv->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
-		/* commit pending batch items */
-		tracker_data_commit_transaction ();
-		priv->batch_mode = FALSE;
-		priv->batch_count = 0;
-	}
+}
 
-	dbus_g_method_return (context);
+static void
+batch_commit_callback (gpointer user_data)
+{
+	TrackerDBusMethodInfo *info = user_data;
 
-	tracker_dbus_request_success (request_id);
+	dbus_g_method_return (info->context);
+	tracker_dbus_request_success (info->request_id);
 }
 
 void
@@ -355,6 +363,7 @@ tracker_resources_batch_commit (TrackerResources	 *self,
 				DBusGMethodInvocation	 *context,
 				GError			**error)
 {
+	TrackerDBusMethodInfo *info;
 	TrackerResourcesPrivate *priv;
 	guint		      request_id;
 
@@ -365,16 +374,13 @@ tracker_resources_batch_commit (TrackerResources	 *self,
 	tracker_dbus_request_new (request_id,
 				  "DBus request for batch commit");
 
-	if (priv->batch_mode) {
-		/* commit pending batch items */
-		tracker_data_commit_transaction ();
-		priv->batch_mode = FALSE;
-		priv->batch_count = 0;
-	}
+	info = g_slice_new (TrackerDBusMethodInfo);
 
-	dbus_g_method_return (context);
+	info->request_id = request_id;
+	info->context = context;
 
-	tracker_dbus_request_success (request_id);
+	tracker_store_queue_commit (batch_commit_callback, info,
+	                            destroy_method_info);
 }
 
 
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
new file mode 100644
index 0000000..8295284
--- /dev/null
+++ b/src/tracker-store/tracker-store.c
@@ -0,0 +1,328 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2009, Nokia
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ *
+ * Author: Philip Van Hoof <philip codeminded be>
+ */
+
+#include "config.h"
+
+#include <libtracker-common/tracker-dbus.h>
+#include <libtracker-db/tracker-db-dbus.h>
+
+#include <libtracker-data/tracker-data-update.h>
+#include <libtracker-data/tracker-data-query.h>
+
+#include "tracker-store.h"
+
+#define TRACKER_STORE_TRANSACTION_MAX	4000
+
+typedef struct {
+	gboolean  have_handler;
+	gboolean  batch_mode;
+	guint     batch_count;
+	GQueue   *queue;
+} TrackerStorePrivate;
+
+typedef enum {
+	TRACKER_STORE_TASK_TYPE_UPDATE = 0,
+	TRACKER_STORE_TASK_TYPE_COMMIT = 1
+} TrackerStoreTaskType;
+
+typedef struct {
+	TrackerStoreTaskType  type;
+	union {
+	  gchar                   *query;
+	} data;
+	gpointer                   user_data;
+	GDestroyNotify             destroy;
+	union {
+		TrackerStoreSparqlUpdateCallback update_callback;
+		TrackerStoreCommitCallback       commit_callback;
+	} callback;
+} TrackerStoreTask;
+
+static GStaticPrivate private_key = G_STATIC_PRIVATE_INIT;
+
+static void
+private_free (gpointer data)
+{
+	TrackerStorePrivate *private = data;
+
+	g_queue_free (private->queue);
+	g_free (private);
+}
+
+static void
+tracker_store_task_free (TrackerStoreTask *task)
+{
+	g_free (task->data.query);
+	g_slice_free (TrackerStoreTask, task);
+}
+
+
+static gboolean
+queue_idle_handler (gpointer user_data)
+{
+	TrackerStorePrivate *private = user_data;
+	TrackerStoreTask    *task;
+	GError              *error = NULL;
+
+	task = g_queue_pop_head (private->queue);
+
+	if (!task) {
+		return FALSE;
+	}
+
+	/* Implicit transaction start */
+
+	if (!private->batch_mode && task->type != TRACKER_STORE_TASK_TYPE_COMMIT) {
+		/* switch to batch mode
+		   delays database commits to improve performance */
+		tracker_data_begin_transaction ();
+		private->batch_mode = TRUE;
+		private->batch_count = 0;
+	}
+
+	switch (task->type) {
+		case TRACKER_STORE_TASK_TYPE_COMMIT:
+			if (private->batch_mode) {
+				/* commit pending batch items */
+				tracker_data_commit_transaction ();
+				private->batch_mode = FALSE;
+				private->batch_count = 0;
+			}
+
+			if (task->callback.commit_callback) {
+				task->callback.commit_callback (task->user_data);
+			}
+			break;
+
+		case TRACKER_STORE_TASK_TYPE_UPDATE:
+			tracker_data_update_sparql (task->data.query, &error);
+			if (task->callback.update_callback) {
+				task->callback.update_callback (error, task->user_data);
+			}
+
+			if (!error) {
+				private->batch_count++;
+			}
+			break;
+	}
+
+	if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
+		/* commit pending batch items */
+		tracker_data_commit_transaction ();
+		private->batch_mode = FALSE;
+		private->batch_count = 0;
+	}
+
+	if (task->destroy) {
+		task->destroy (task->user_data);
+	}
+
+	if (error) {
+		g_clear_error (&error);
+	}
+
+	tracker_store_task_free (task);
+
+	return TRUE;
+}
+
+static void
+queue_idle_destroy (gpointer user_data)
+{
+	TrackerStorePrivate *private = user_data;
+
+	private->have_handler = FALSE;
+}
+
+void
+tracker_store_init (void)
+{
+	TrackerStorePrivate *private;
+
+	private = g_new0 (TrackerStorePrivate, 1);
+
+	private->queue = g_queue_new ();
+
+	g_static_private_set (&private_key,
+	                      private,
+	                      private_free);
+}
+
+void
+tracker_store_shutdown (void)
+{
+	TrackerStorePrivate *private;
+
+	private = g_static_private_get (&private_key);
+	g_return_if_fail (private != NULL);
+
+	if (private->have_handler) {
+		g_debug ("Can't exit until store-queue is finished ...");
+		while (private->have_handler) {
+			g_main_context_iteration (NULL, TRUE);
+		}
+		g_debug ("Store-queue finished");
+	}
+
+	g_static_private_set (&private_key, NULL, NULL);
+}
+
+static void
+start_handler (TrackerStorePrivate *private)
+{
+	private->have_handler = TRUE;
+
+	g_idle_add_full (G_PRIORITY_DEFAULT,
+	                 queue_idle_handler,
+	                 private,
+	                 queue_idle_destroy);
+}
+
+void
+tracker_store_queue_commit (TrackerStoreCommitCallback callback,
+                            gpointer user_data,
+                            GDestroyNotify destroy)
+{
+	TrackerStorePrivate *private;
+	TrackerStoreTask    *task;
+
+	private = g_static_private_get (&private_key);
+	g_return_if_fail (private != NULL);
+
+	task = g_slice_new0 (TrackerStoreTask);
+	task->type = TRACKER_STORE_TASK_TYPE_COMMIT;
+	task->user_data = user_data;
+	task->callback.commit_callback = callback;
+	task->destroy = destroy;
+
+	g_queue_push_tail (private->queue, task);
+
+	if (!private->have_handler) {
+		start_handler (private);
+	}
+}
+
+
+void
+tracker_store_queue_sparql_update (const gchar *sparql,
+                                   TrackerStoreSparqlUpdateCallback callback,
+                                   gpointer user_data,
+                                   GDestroyNotify destroy)
+{
+	TrackerStorePrivate *private;
+	TrackerStoreTask    *task;
+
+	g_return_if_fail (sparql != NULL);
+
+	private = g_static_private_get (&private_key);
+	g_return_if_fail (private != NULL);
+
+	task = g_slice_new0 (TrackerStoreTask);
+	task->type = TRACKER_STORE_TASK_TYPE_UPDATE;
+	task->data.query = g_strdup (sparql);
+	task->user_data = user_data;
+	task->callback.update_callback = callback;
+	task->destroy = destroy;
+
+	g_queue_push_tail (private->queue, task);
+
+	if (!private->have_handler) {
+		start_handler (private);
+	}
+}
+
+void
+tracker_store_sparql_update (const gchar *sparql,
+                             GError     **error)
+{
+	TrackerStorePrivate *private;
+
+	g_return_if_fail (sparql != NULL);
+
+	private = g_static_private_get (&private_key);
+	g_return_if_fail (private != NULL);
+
+	if (private->batch_mode) {
+		/* commit pending batch items */
+		tracker_data_commit_transaction ();
+		private->batch_mode = FALSE;
+		private->batch_count = 0;
+	}
+
+	tracker_data_update_sparql (sparql, error);
+}
+
+TrackerDBResultSet*
+tracker_store_sparql_query (const gchar *sparql,
+                            GError     **error)
+{
+	return tracker_data_query_sparql (sparql, error);
+}
+
+void
+tracker_store_insert_statement (const gchar   *subject,
+                                const gchar   *predicate,
+                                const gchar   *object)
+{
+	TrackerStorePrivate *private;
+
+	g_return_if_fail (subject != NULL);
+	g_return_if_fail (predicate != NULL);
+	g_return_if_fail (object != NULL);
+
+	private = g_static_private_get (&private_key);
+	g_return_if_fail (private != NULL);
+
+	if (private->batch_mode) {
+		/* commit pending batch items */
+		tracker_data_commit_transaction ();
+		private->batch_mode = FALSE;
+		private->batch_count = 0;
+	}
+
+	tracker_data_insert_statement (subject, predicate, object);
+}
+
+void
+tracker_store_delete_statement (const gchar   *subject,
+                                const gchar   *predicate,
+                                const gchar   *object)
+{
+	TrackerStorePrivate *private;
+
+	g_return_if_fail (subject != NULL);
+	g_return_if_fail (predicate != NULL);
+	g_return_if_fail (object != NULL);
+
+	private = g_static_private_get (&private_key);
+	g_return_if_fail (private != NULL);
+
+	if (private->batch_mode) {
+		/* commit pending batch items */
+		tracker_data_commit_transaction ();
+		private->batch_mode = FALSE;
+		private->batch_count = 0;
+	}
+
+	tracker_data_delete_statement (subject, predicate, object);
+}
+
diff --git a/src/tracker-store/tracker-store.h b/src/tracker-store/tracker-store.h
new file mode 100644
index 0000000..c0253e4
--- /dev/null
+++ b/src/tracker-store/tracker-store.h
@@ -0,0 +1,60 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2009, Nokia
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ *
+ * Author: Philip Van Hoof <philip codeminded be>
+ */
+
+#ifndef __TRACKER_STORE_H__
+#define __TRACKER_STORE_H__
+
+#include <stdio.h>
+
+#include <libtracker-common/tracker-common.h>
+#include <libtracker-db/tracker-db-interface.h>
+
+G_BEGIN_DECLS
+
+typedef void (* TrackerStoreSparqlUpdateCallback)  (GError          *error,
+                                                    gpointer         user_data);
+typedef void (* TrackerStoreCommitCallback)        (gpointer         user_data);
+
+void         tracker_store_init                   (void);
+void         tracker_store_shutdown               (void);
+void         tracker_store_queue_commit           (TrackerStoreCommitCallback       callback,
+                                                   gpointer       user_data,
+                                                   GDestroyNotify destroy);
+void         tracker_store_queue_sparql_update    (const gchar   *sparql,
+                                                   TrackerStoreSparqlUpdateCallback callback,
+                                                   gpointer       user_data,
+                                                   GDestroyNotify destroy);
+void         tracker_store_sparql_update          (const gchar   *sparql,
+                                                   GError       **error);
+void         tracker_store_insert_statement       (const gchar   *subject,
+                                                   const gchar   *predicate,
+                                                   const gchar   *object);
+void         tracker_store_delete_statement       (const gchar   *subject,
+                                                   const gchar   *predicate,
+                                                   const gchar   *object);
+TrackerDBResultSet*
+             tracker_store_sparql_query           (const gchar   *sparql,
+                                                   GError       **error);
+
+G_END_DECLS
+
+#endif /* __TRACKER_STORE_H__ */



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]