[tracker/writeback] Use a separate thread to dispatch DBus writeback signals.



commit 40d410bdb8562998d8064ec5dbc7fea21e324939
Author: Carlos Garnacho <carlos lanedo com>
Date:   Thu Nov 26 11:56:16 2009 +0100

    Use a separate thread to dispatch DBus writeback signals.
    
    TrackerWritebackDispatcher has been splitted into dispatcher/consumer.
    The former runs in a separate thread, listening for writeback signals
    from tracker-store and notifying the main thread about them. The consumer
    just has these petitions queued and processes them one by one,
    
    This way, operations performed by the consumer may be blocking, while
    the dispatcher will be able to receive DBus signals at any time.

 src/tracker-writeback/Makefile.am                  |    2 +
 src/tracker-writeback/tracker-main.c               |  114 +++++-
 src/tracker-writeback/tracker-marshal.list         |    1 +
 src/tracker-writeback/tracker-writeback-consumer.c |  317 ++++++++++++
 src/tracker-writeback/tracker-writeback-consumer.h |   58 +++
 .../tracker-writeback-dispatcher.c                 |  517 ++++++++++----------
 .../tracker-writeback-dispatcher.h                 |   13 +-
 src/tracker-writeback/tracker-writeback-file.c     |    2 +
 8 files changed, 745 insertions(+), 279 deletions(-)
---
diff --git a/src/tracker-writeback/Makefile.am b/src/tracker-writeback/Makefile.am
index ff0c7b6..ae99e11 100644
--- a/src/tracker-writeback/Makefile.am
+++ b/src/tracker-writeback/Makefile.am
@@ -45,6 +45,8 @@ libexec_PROGRAMS = tracker-writeback
 tracker_writeback_SOURCES = 						\
 	$(marshal_sources)						\
 	$(dbus_sources)							\
+	tracker-writeback-consumer.c					\
+	tracker-writeback-consumer.h					\
 	tracker-writeback-dispatcher.c					\
 	tracker-writeback-dispatcher.h					\
 	tracker-writeback-file.c					\
diff --git a/src/tracker-writeback/tracker-main.c b/src/tracker-writeback/tracker-main.c
index 5629061..b8185c8 100644
--- a/src/tracker-writeback/tracker-main.c
+++ b/src/tracker-writeback/tracker-main.c
@@ -23,22 +23,128 @@
 #include <stdlib.h>
 
 #include "tracker-writeback-dispatcher.h"
+#include "tracker-writeback-consumer.h"
+
+typedef struct {
+	gchar *subject;
+	GStrv  rdf_types;
+} WritebackData;
+
+static TrackerWritebackConsumer *consumer = NULL;
+static TrackerWritebackDispatcher *dispatcher = NULL;
+static GMainContext *dispatcher_context = NULL;
+
+static WritebackData *
+writeback_data_new (const gchar *subject,
+		    const GStrv  rdf_types)
+{
+	WritebackData *data;
+
+	data = g_slice_new (WritebackData);
+	data->subject = g_strdup (subject);
+	data->rdf_types = g_strdupv (rdf_types);
+
+	return data;
+}
+
+static void
+writeback_data_free (WritebackData *data)
+{
+	g_free (data->subject);
+	g_strfreev (data->rdf_types);
+	g_slice_free (WritebackData, data);
+}
+
+/* This function will be executed in the main thread */
+static gboolean
+on_writeback_idle_cb (gpointer user_data)
+{
+	WritebackData *data = user_data;
+
+	g_message ("Main thread (%p) got signaled of writeback petition", g_thread_self ());
+
+	tracker_writeback_consumer_add_subject (consumer, data->subject, data->rdf_types);
+
+	writeback_data_free (data);
+
+	return FALSE;
+}
+
+/* This callback run in the dispatcher thread */
+static void
+on_writeback_cb (TrackerWritebackDispatcher *dispatcher,
+		 const gchar                *subject,
+		 const GStrv                 rdf_types)
+{
+	WritebackData *data;
+
+	g_message ("Got writeback petition on thread '%p' for subject '%s'",
+		   g_thread_self (), subject);
+
+	data = writeback_data_new (subject, rdf_types);
+	g_idle_add_full (G_PRIORITY_HIGH_IDLE,
+			 on_writeback_idle_cb,
+			 data, NULL);
+}
+
+static gpointer
+dispatcher_thread_func (gpointer data)
+{
+	GMainLoop *loop;
+
+	g_message ("DBus Dispatcher thread created: %p", g_thread_self ());
+
+	g_signal_connect (dispatcher, "writeback",
+			  G_CALLBACK (on_writeback_cb), NULL);
+
+	loop = g_main_loop_new (dispatcher_context, FALSE);
+	g_main_loop_run (loop);
+
+	g_object_unref (dispatcher);
+	g_main_loop_unref (loop);
+	g_main_context_unref (dispatcher_context);
+
+	return NULL;
+}
 
 int
 main (int   argc,
       char *argv[])
 {
-        TrackerWritebackDispatcher *dispatcher;
         GMainLoop *loop;
+	GError *error = NULL;
+
+	g_thread_init (NULL);
+	dbus_g_thread_init ();
 
         g_type_init ();
 
-        loop = g_main_loop_new (NULL, FALSE);
-        dispatcher = tracker_writeback_dispatcher_new ();
+        consumer = tracker_writeback_consumer_new ();
+
+	/* Create dispatcher thread data here, GType
+	 * initialization for boxed types don't seem
+	 * to be threadsafe, this is troublesome with
+	 * signals initialization.
+	 */
+	dispatcher_context = g_main_context_new ();
+	dispatcher = tracker_writeback_dispatcher_new (dispatcher_context);
+
+	g_thread_create (dispatcher_thread_func, dispatcher, FALSE, &error);
 
+	if (error) {
+		g_critical ("Error creating dispatcher thread: %s", error->message);
+		g_error_free (error);
+
+		return EXIT_FAILURE;
+	}
+
+	g_message ("Main thread is: %p", g_thread_self ());
+
+        loop = g_main_loop_new (NULL, FALSE);
         g_main_loop_run (loop);
 
-        g_object_unref (dispatcher);
+        g_object_unref (consumer);
+	g_main_loop_unref (loop);
 
         return EXIT_SUCCESS;
 }
diff --git a/src/tracker-writeback/tracker-marshal.list b/src/tracker-writeback/tracker-marshal.list
index 374d4b7..fbb2c47 100644
--- a/src/tracker-writeback/tracker-marshal.list
+++ b/src/tracker-writeback/tracker-marshal.list
@@ -1 +1,2 @@
 VOID:BOXED
+VOID:STRING,BOXED
diff --git a/src/tracker-writeback/tracker-writeback-consumer.c b/src/tracker-writeback/tracker-writeback-consumer.c
new file mode 100644
index 0000000..ab75d13
--- /dev/null
+++ b/src/tracker-writeback/tracker-writeback-consumer.c
@@ -0,0 +1,317 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2009, Nokia
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ */
+
+#include "config.h"
+
+#include <stdlib.h>
+
+#include <libtracker-common/tracker-dbus.h>
+#include <libtracker-client/tracker.h>
+
+#include "tracker-writeback-consumer.h"
+#include "tracker-writeback-dbus.h"
+#include "tracker-writeback-glue.h"
+#include "tracker-writeback-module.h"
+#include "tracker-marshal.h"
+
+#define TRACKER_WRITEBACK_CONSUMER_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_WRITEBACK_CONSUMER, TrackerWritebackConsumerPrivate))
+
+#define TRACKER_SERVICE		    "org.freedesktop.Tracker1"
+#define TRACKER_RESOURCES_OBJECT    "/org/freedesktop/Tracker1/Resources"
+#define TRACKER_INTERFACE_RESOURCES "org.freedesktop.Tracker1.Resources"
+
+typedef struct {
+	gchar *subject;
+	GStrv rdf_types;
+} QueryData;
+
+typedef struct {
+	GHashTable *modules;
+	TrackerClient *client;
+	TrackerMinerManager *manager;
+	GQueue *process_queue;
+	guint idle_id;
+	guint state;
+} TrackerWritebackConsumerPrivate;
+
+enum {
+	STATE_IDLE,
+	STATE_PROCESSING
+};
+
+static void tracker_writeback_consumer_finalize    (GObject                    *object);
+static void tracker_writeback_consumer_constructed (GObject                    *object);
+
+static gboolean process_queue_cb                   (gpointer user_data);
+
+
+G_DEFINE_TYPE (TrackerWritebackConsumer, tracker_writeback_consumer, G_TYPE_OBJECT)
+
+static void
+tracker_writeback_consumer_class_init (TrackerWritebackConsumerClass *klass)
+{
+	GObjectClass *object_class = G_OBJECT_CLASS (klass);
+
+	object_class->finalize = tracker_writeback_consumer_finalize;
+	object_class->constructed = tracker_writeback_consumer_constructed;
+
+	g_type_class_add_private (object_class, sizeof (TrackerWritebackConsumerPrivate));
+}
+
+static void
+tracker_writeback_consumer_init (TrackerWritebackConsumer *consumer)
+{
+	TrackerWritebackConsumerPrivate *priv;
+
+	priv = TRACKER_WRITEBACK_CONSUMER_GET_PRIVATE (consumer);
+
+	priv->client = tracker_connect (TRUE, 0);
+	priv->modules = g_hash_table_new_full (g_str_hash,
+	                                       g_str_equal,
+	                                       (GDestroyNotify) g_free,
+	                                       NULL);
+	priv->process_queue = g_queue_new ();
+
+	priv->manager = tracker_writeback_get_miner_manager ();
+	priv->state = STATE_IDLE;
+}
+
+static void
+tracker_writeback_consumer_finalize (GObject *object)
+{
+	TrackerWritebackConsumerPrivate *priv;
+
+	priv = TRACKER_WRITEBACK_CONSUMER_GET_PRIVATE (object);
+
+	if (priv->client) {
+		tracker_disconnect (priv->client);
+	}
+
+	g_object_unref (priv->manager);
+
+	G_OBJECT_CLASS (tracker_writeback_consumer_parent_class)->finalize (object);
+}
+
+static void
+tracker_writeback_consumer_constructed (GObject *object)
+{
+	TrackerWritebackConsumerPrivate *priv;
+	GList *modules;
+
+	priv = TRACKER_WRITEBACK_CONSUMER_GET_PRIVATE (object);
+	modules = tracker_writeback_modules_list ();
+
+	while (modules) {
+		TrackerWritebackModule *module;
+		const gchar *path;
+
+		path = modules->data;
+		module = tracker_writeback_module_get (path);
+
+		g_hash_table_insert (priv->modules, g_strdup (path), module);
+		modules = modules->next;
+	}
+}
+
+TrackerWritebackConsumer *
+tracker_writeback_consumer_new (void)
+{
+	return g_object_new (TRACKER_TYPE_WRITEBACK_CONSUMER, NULL);
+}
+
+static gboolean
+sparql_rdf_types_match (const gchar * const *module_types, 
+                        const gchar * const *rdf_types)
+{
+	guint n;
+
+	for (n = 0; rdf_types[n] != NULL; n++) {
+		guint i;
+
+		for (i = 0; module_types[i] != NULL; i++) {
+			if (g_strcmp0 (module_types[i], rdf_types[n]) == 0) {
+				return TRUE;
+			}
+		}
+	}
+
+	return FALSE;
+}
+
+static void
+sparql_query_cb (GPtrArray *result,
+                 GError    *error,
+                 gpointer   user_data)
+{
+	TrackerWritebackConsumerPrivate *priv;
+	TrackerWritebackConsumer *consumer;
+	QueryData *data;
+
+	consumer = TRACKER_WRITEBACK_CONSUMER (user_data);
+	priv = TRACKER_WRITEBACK_CONSUMER_GET_PRIVATE (consumer);
+	data = g_queue_pop_head (priv->process_queue);
+
+	if (result && result->len > 0) {
+		GHashTableIter iter;
+		gpointer key, value;
+		GStrv rdf_types;
+
+
+		rdf_types = data->rdf_types;
+
+		g_hash_table_iter_init (&iter, priv->modules);
+
+		while (g_hash_table_iter_next (&iter, &key, &value)) {
+			TrackerWritebackModule *module;
+			const gchar * const *module_types;
+
+			module = value;
+			module_types = tracker_writeback_module_get_rdf_types (module);
+
+			if (sparql_rdf_types_match (module_types, (const gchar * const *) rdf_types)) {
+				TrackerWriteback *writeback;
+
+				g_message ("  Updating metadata for file:'%s' using module:'%s'",
+				           "unknown",
+				           module->name);
+
+				writeback = tracker_writeback_module_create (module);
+				tracker_writeback_update_metadata (writeback, result, priv->client);
+				g_object_unref (writeback);
+			}
+		}
+	} else {
+		g_message ("  No files qualify for updates");
+	}
+
+	g_free (data->subject);
+	g_strfreev (data->rdf_types);
+	g_slice_free (QueryData, data);
+
+	priv->idle_id = g_idle_add (process_queue_cb, consumer);
+}
+
+#if 0
+static void
+sparql_writeback_cb (DBusGProxy               *proxy,
+                     GHashTable               *subjects,
+                     TrackerWritebackConsumer *object)
+{
+	TrackerWritebackConsumerPrivate *priv;
+	QueryData *data;
+	GHashTableIter iter;
+	gpointer key, value;
+
+	priv = TRACKER_WRITEBACK_CONSUMER_GET_PRIVATE (object);
+
+	g_message ("Writeback signalled with %d subjects...",
+		   g_hash_table_size (subjects));
+
+	g_hash_table_iter_init (&iter, subjects);
+
+	while (g_hash_table_iter_next (&iter, &key, &value)) {
+		const gchar *subject = key;
+		const gchar * const *rdf_types = value;
+		gchar *query;
+
+		query = g_strdup_printf ("SELECT ?url ?predicate ?object {"
+		                         "  <%s> ?predicate ?object ;"
+		                         "  nie:isStoredAs ?url ."
+		                         "  ?predicate tracker:writeback true "
+		                         "}", 
+		                         subject);
+
+		data = g_slice_new (QueryData);
+		data->consumer = object;
+		data->rdf_types = g_strdupv ((gchar **) rdf_types);
+
+		tracker_resources_sparql_query_async (priv->client,
+		                                      query,
+		                                      sparql_query_cb,
+		                                      data);
+
+		g_free (query);
+	}
+}
+#endif
+
+static gboolean
+process_queue_cb (gpointer user_data)
+{
+	TrackerWritebackConsumerPrivate *priv;
+	TrackerWritebackConsumer *consumer;
+	QueryData *data;
+	gchar *query;
+
+	consumer = TRACKER_WRITEBACK_CONSUMER (user_data);
+	priv = TRACKER_WRITEBACK_CONSUMER_GET_PRIVATE (consumer);
+	data = g_queue_peek_head (priv->process_queue);
+
+	if (!data) {
+		/* No more data left, back to idle state */
+		priv->state = STATE_IDLE;
+		priv->idle_id = 0;
+		return FALSE;
+	}
+
+	query = g_strdup_printf ("SELECT ?url ?predicate ?object {"
+				 "  <%s> ?predicate ?object ;"
+				 "  nie:isStoredAs ?url ."
+				 "  ?predicate tracker:writeback true "
+				 "}",
+				 data->subject);
+
+	tracker_resources_sparql_query_async (priv->client,
+					      query,
+					      sparql_query_cb,
+					      consumer);
+
+	g_free (query);
+
+	/* Keep "processing" state */
+	priv->idle_id = 0;
+	return FALSE;
+}
+
+void
+tracker_writeback_consumer_add_subject (TrackerWritebackConsumer *consumer,
+					const gchar              *subject,
+					const GStrv               rdf_types)
+{
+	TrackerWritebackConsumerPrivate *priv;
+	QueryData *data;
+
+	g_return_if_fail (TRACKER_IS_WRITEBACK_CONSUMER (consumer));
+	g_return_if_fail (subject != NULL);
+	g_return_if_fail (rdf_types != NULL);
+
+	priv = TRACKER_WRITEBACK_CONSUMER_GET_PRIVATE (consumer);
+
+	data = g_slice_new (QueryData);
+	data->subject = g_strdup (subject);
+	data->rdf_types = g_strdupv (rdf_types);
+
+	g_queue_push_tail (priv->process_queue, data);
+
+	if (priv->state != STATE_PROCESSING && priv->idle_id == 0) {
+		priv->state = STATE_PROCESSING;
+		priv->idle_id = g_idle_add (process_queue_cb, consumer);
+	}
+}
diff --git a/src/tracker-writeback/tracker-writeback-consumer.h b/src/tracker-writeback/tracker-writeback-consumer.h
new file mode 100644
index 0000000..78c1ac6
--- /dev/null
+++ b/src/tracker-writeback/tracker-writeback-consumer.h
@@ -0,0 +1,58 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2009, Nokia
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ */
+
+#ifndef __TRACKER_WRITEBACK_CONSUMER_H__
+#define __TRACKER_WRITEBACK_CONSUMER_H__
+
+#include <glib-object.h>
+#include <gio/gio.h>
+
+#include <libtracker-client/tracker.h>
+
+G_BEGIN_DECLS
+
+#define TRACKER_TYPE_WRITEBACK_CONSUMER         (tracker_writeback_consumer_get_type())
+#define TRACKER_WRITEBACK_CONSUMER(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), TRACKER_TYPE_WRITEBACK_CONSUMER, TrackerWritebackConsumer))
+#define TRACKER_WRITEBACK_CONSUMER_CLASS(c)     (G_TYPE_CHECK_CLASS_CAST ((c), TRACKER_TYPE_WRITEBACK_CONSUMER, TrackerWritebackConsumerClass))
+#define TRACKER_IS_WRITEBACK_CONSUMER(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), TRACKER_TYPE_WRITEBACK_CONSUMER))
+#define TRACKER_IS_WRITEBACK_CONSUMER_CLASS(c)  (G_TYPE_CHECK_CLASS_TYPE ((c),  TRACKER_TYPE_WRITEBACK_CONSUMER))
+#define TRACKER_WRITEBACK_CONSUMER_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TRACKER_TYPE_WRITEBACK_CONSUMER, TrackerWritebackConsumerClass))
+
+typedef struct TrackerWritebackConsumer TrackerWritebackConsumer;
+typedef struct TrackerWritebackConsumerClass TrackerWritebackConsumerClass;
+
+struct TrackerWritebackConsumer {
+        GObject parent_instance;
+};
+
+struct TrackerWritebackConsumerClass {
+        GObjectClass parent_class;
+};
+
+GType                      tracker_writeback_consumer_get_type (void) G_GNUC_CONST;
+TrackerWritebackConsumer * tracker_writeback_consumer_new      (void);
+
+void tracker_writeback_consumer_add_subject (TrackerWritebackConsumer *consumer,
+					     const gchar              *subject,
+					     const GStrv               rdf_types);
+
+G_END_DECLS
+
+#endif /* __TRACKER_WRITEBACK_CONSUMER_H__ */
diff --git a/src/tracker-writeback/tracker-writeback-dispatcher.c b/src/tracker-writeback/tracker-writeback-dispatcher.c
index d560689..74b712b 100644
--- a/src/tracker-writeback/tracker-writeback-dispatcher.c
+++ b/src/tracker-writeback/tracker-writeback-dispatcher.c
@@ -1,6 +1,6 @@
 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
 /*
- * Copyright (C) 2009, Nokia
+ * Copyright (C) 2008, Nokia
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public
@@ -18,365 +18,340 @@
  * Boston, MA  02110-1301, USA.
  */
 
-#include "config.h"
-
-#include <stdlib.h>
-
 #include <libtracker-common/tracker-dbus.h>
 #include <libtracker-client/tracker.h>
 
 #include "tracker-writeback-dispatcher.h"
 #include "tracker-writeback-dbus.h"
-#include "tracker-writeback-glue.h"
-#include "tracker-writeback-module.h"
 #include "tracker-marshal.h"
 
 #define TRACKER_WRITEBACK_DISPATCHER_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_WRITEBACK_DISPATCHER, TrackerWritebackDispatcherPrivate))
 
-#define TRACKER_SERVICE		    "org.freedesktop.Tracker1"
-#define TRACKER_RESOURCES_OBJECT    "/org/freedesktop/Tracker1/Resources"
-#define TRACKER_INTERFACE_RESOURCES "org.freedesktop.Tracker1.Resources"
-
-typedef struct {
-	DBusGConnection *connection;
-	DBusGProxy *proxy_dbus;
-	DBusGProxy *proxy_resources;
-} DBusData;
+#define TRACKER_SERVICE			"org.freedesktop.Tracker1"
+#define TRACKER_RESOURCES_OBJECT	"/org/freedesktop/Tracker1/Resources"
+#define TRACKER_INTERFACE_RESOURCES	"org.freedesktop.Tracker1.Resources"
 
-typedef struct {
-	TrackerWritebackDispatcher *dispatcher;
-	GStrv rdf_types;
-} QueryData;
+#define DBUS_MATCH_STR \
+        "type='signal', " \
+        "sender='" TRACKER_SERVICE "', " \
+        "path='" TRACKER_RESOURCES_OBJECT "', " \
+        "interface='" TRACKER_INTERFACE_RESOURCES "'"
 
 typedef struct {
-	GHashTable *modules;
-	DBusData *dbus_data;
-	TrackerClient *client;
-	TrackerMinerManager *manager;
+        GMainContext *context;
+	DBusConnection *connection;
 } TrackerWritebackDispatcherPrivate;
 
-static void tracker_writeback_dispatcher_finalize    (GObject                    *object);
-static void tracker_writeback_dispatcher_constructed (GObject                    *object);
-static void sparql_writeback_cb                      (DBusGProxy                 *proxy,
-                                                      GHashTable                 *subjects,
-                                                      TrackerWritebackDispatcher *object);
+enum {
+        PROP_0,
+        PROP_MAIN_CONTEXT
+};
+
+enum {
+        WRITEBACK,
+        LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+
+static void tracker_writeback_dispatcher_finalize     (GObject      *object);
+static void tracker_writeback_dispatcher_constructed  (GObject      *object);
+
+static void tracker_writeback_dispatcher_get_property (GObject      *object,
+                                                       guint         param_id,
+                                                       GValue       *value,
+                                                       GParamSpec   *pspec);
+static void tracker_writeback_dispatcher_set_property (GObject      *object,
+                                                       guint         param_id,
+                                                       const GValue *value,
+                                                       GParamSpec   *pspec);
+
 
 G_DEFINE_TYPE (TrackerWritebackDispatcher, tracker_writeback_dispatcher, G_TYPE_OBJECT)
 
 static void
 tracker_writeback_dispatcher_class_init (TrackerWritebackDispatcherClass *klass)
 {
-	GObjectClass *object_class = G_OBJECT_CLASS (klass);
-
-	object_class->finalize = tracker_writeback_dispatcher_finalize;
-	object_class->constructed = tracker_writeback_dispatcher_constructed;
-
-	g_type_class_add_private (object_class, sizeof (TrackerWritebackDispatcherPrivate));
+        GObjectClass *object_class = G_OBJECT_CLASS (klass);
+
+        object_class->finalize = tracker_writeback_dispatcher_finalize;
+        object_class->constructed = tracker_writeback_dispatcher_constructed;
+        object_class->get_property = tracker_writeback_dispatcher_get_property;
+        object_class->set_property = tracker_writeback_dispatcher_set_property;
+
+        g_object_class_install_property (object_class,
+					 PROP_MAIN_CONTEXT,
+					 g_param_spec_pointer ("context",
+                                                               "Main context",
+                                                               "Main context to run the DBus service on",
+                                                               G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+
+        signals[WRITEBACK] =
+		g_signal_new ("writeback",
+			      G_OBJECT_CLASS_TYPE (object_class),
+			      G_SIGNAL_RUN_LAST,
+			      G_STRUCT_OFFSET (TrackerWritebackDispatcherClass, writeback),
+			      NULL, NULL,
+                              tracker_marshal_VOID__STRING_BOXED,
+			      G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_STRV);
+
+        g_type_class_add_private (object_class, sizeof (TrackerWritebackDispatcherPrivate));
 }
 
-static gboolean
-dbus_register_service (DBusGProxy  *proxy,
-                       const gchar *name)
+static void
+handle_writeback_signal (TrackerWritebackDispatcher *dispatcher,
+                         DBusMessage                *message)
 {
-	GError *error = NULL;
-	guint result;
+        DBusMessageIter iter;
+        gchar *signature;
+        int arg_type;
 
-	g_message ("Registering D-Bus service '%s'...", name);
+        g_message ("Got writeback DBus signal");
 
-	if (!org_freedesktop_DBus_request_name (proxy,
-	                                        name,
-	                                        DBUS_NAME_FLAG_DO_NOT_QUEUE,
-	                                        &result, &error)) {
-		g_critical ("Could not acquire name:'%s', %s",
-		            name,
-		            error ? error->message : "no error given");
-		g_error_free (error);
+        if (!dbus_message_iter_init (message, &iter)) {
+                g_critical ("  Message had no arguments");
+                return;
+        }
 
-		return FALSE;
-	}
+        signature = dbus_message_iter_get_signature (&iter);
 
-	if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
-		g_critical ("D-Bus service name:'%s' is already taken, "
-		            "perhaps the application is already running?",
-		            name);
-		return FALSE;
-	}
+        if (g_strcmp0 (signature, "a{sas}") != 0) {
+                g_critical ("  Unexpected message signature '%s'", signature);
+                g_free (signature);
+                return;
+        }
 
-	return TRUE;
-}
+        g_free (signature);
 
-static gboolean
-dbus_register_object (GObject		    *object,
-                      DBusGConnection	    *connection,
-                      DBusGProxy	    *proxy,
-                      const DBusGObjectInfo *info,
-                      const gchar	    *path)
-{
-	g_message ("Registering D-Bus object...");
-	g_message ("  Path:'%s'", path);
-	g_message ("  Object Type:'%s'", G_OBJECT_TYPE_NAME (object));
+        while ((arg_type = dbus_message_iter_get_arg_type (&iter)) != DBUS_TYPE_INVALID) {
+                DBusMessageIter arr, dict, types_arr;
+                const gchar *subject;
+                GArray *rdf_types;
 
-	dbus_g_object_type_install_info (G_OBJECT_TYPE (object), info);
-	dbus_g_connection_register_g_object (connection, path, object);
+                rdf_types = g_array_new (TRUE, TRUE, sizeof (gchar *));
 
-	return TRUE;
-}
-
-static DBusData *
-dbus_data_new (GObject *object)
-{
-	DBusData *data;
-	DBusGConnection *connection;
-	DBusGProxy *gproxy;
-	GError *error = NULL;
+                dbus_message_iter_recurse (&iter, &arr);
+                dbus_message_iter_recurse (&arr, &dict);
 
-	connection = dbus_g_bus_get (DBUS_BUS_SESSION, &error);
+                dbus_message_iter_get_basic (&dict, &subject);
 
-	if (!connection) {
-		g_critical ("Could not connect to the D-Bus session bus, %s",
-		            error ? error->message : "no error given.");
-		g_error_free (error);
-		return NULL;
-	}
+                dbus_message_iter_next (&dict);
+                dbus_message_iter_recurse (&dict, &types_arr);
 
-	gproxy = dbus_g_proxy_new_for_name (connection,
-	                                    DBUS_SERVICE_DBUS,
-	                                    DBUS_PATH_DBUS,
-	                                    DBUS_INTERFACE_DBUS);
+                while ((arg_type = dbus_message_iter_get_arg_type (&types_arr)) != DBUS_TYPE_INVALID) {
+                        const gchar *type;
 
-	if (!dbus_register_service (gproxy,
-	                            TRACKER_WRITEBACK_DBUS_NAME)) {
-		g_object_unref (gproxy);
-		return NULL;
-	}
+                        dbus_message_iter_get_basic (&types_arr, &type);
 
-	if (!dbus_register_object (object,
-	                           connection, gproxy,
-	                           &dbus_glib_tracker_writeback_object_info,
-	                           TRACKER_WRITEBACK_DBUS_PATH)) {
-		g_object_unref (gproxy);
-		return NULL;
-	}
+                        g_array_append_val (rdf_types, type);
 
-	dbus_g_object_register_marshaller (tracker_marshal_VOID__BOXED,
-	                                   G_TYPE_NONE,
-	                                   TRACKER_TYPE_STR_STRV_MAP,
-	                                   G_TYPE_INVALID);
+                        dbus_message_iter_next (&types_arr);
+                }
 
-	/* Now we're successfully connected and registered, create the data */
-	data = g_new0 (DBusData, 1);
-	data->connection = dbus_g_connection_ref (connection);
-	data->proxy_dbus = gproxy;
+                g_signal_emit (dispatcher, signals[WRITEBACK], 0, subject, rdf_types->data);
+                g_array_free (rdf_types, TRUE);
 
-	return data;
+                dbus_message_iter_next (&iter);
+        }
 }
 
-static void
-dbus_data_free (DBusData *data)
+static DBusHandlerResult
+message_filter (DBusConnection *connection,
+                DBusMessage    *message,
+                gpointer        user_data)
 {
-	dbus_g_connection_unref (data->connection);
+        if (dbus_message_is_signal (message,
+                                    TRACKER_INTERFACE_RESOURCES,
+                                    "Writeback")) {
+                TrackerWritebackDispatcher *dispatcher = user_data;
 
-	if (data->proxy_dbus) {
-		g_object_unref (data->proxy_dbus);
-	}
+                handle_writeback_signal (dispatcher, message);
 
-	if (data->proxy_resources) {
-		g_object_unref (data->proxy_resources);
-	}
+                return DBUS_HANDLER_RESULT_HANDLED;
+        }
 
-	g_free (data);
+        return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
 }
 
-static void
-tracker_writeback_dispatcher_init (TrackerWritebackDispatcher *dispatcher)
+static DBusConnection *
+setup_dbus_connection (TrackerWritebackDispatcher *dispatcher,
+                       GMainContext               *context)
 {
-	TrackerWritebackDispatcherPrivate *priv;
+        DBusConnection *connection;
+        DBusError error;
+        gint result;
 
-	priv = TRACKER_WRITEBACK_DISPATCHER_GET_PRIVATE (dispatcher);
+        dbus_error_init (&error);
 
-	priv->client = tracker_connect (TRUE, 0);
-	priv->dbus_data = dbus_data_new (G_OBJECT (dispatcher));
+        /* Create DBus connection */
+        connection = dbus_bus_get_private (DBUS_BUS_SESSION, &error);
 
-	if (!priv->dbus_data) {
-		tracker_disconnect (priv->client);
-		exit (EXIT_FAILURE);
-	}
+        if (dbus_error_is_set (&error)) {
+                g_critical ("Could not connect to the D-Bus session bus, %s",
+                            error.message);
+                dbus_error_free (&error);
+                return NULL;
+        }
 
-	priv->modules = g_hash_table_new_full (g_str_hash,
-	                                       g_str_equal,
-	                                       (GDestroyNotify) g_free,
-	                                       NULL);
-
-	priv->dbus_data->proxy_resources = 
-		dbus_g_proxy_new_for_name (priv->dbus_data->connection,
-		                           TRACKER_SERVICE,
-		                           TRACKER_RESOURCES_OBJECT,
-		                           TRACKER_INTERFACE_RESOURCES);
-
-	dbus_g_proxy_add_signal (priv->dbus_data->proxy_resources, 
-	                         "Writeback",
-	                         TRACKER_TYPE_STR_STRV_MAP,
-	                         G_TYPE_INVALID);
-
-	dbus_g_proxy_connect_signal (priv->dbus_data->proxy_resources, 
-	                             "Writeback",
-	                             G_CALLBACK (sparql_writeback_cb),
-	                             dispatcher,
-	                             NULL);
-
-	priv->manager = tracker_writeback_get_miner_manager ();
-}
+        dbus_connection_set_exit_on_disconnect (connection, FALSE);
 
-static void
-tracker_writeback_dispatcher_finalize (GObject *object)
-{
-	TrackerWritebackDispatcherPrivate *priv;
+        /* Request writeback service name */
+	g_message ("Registering D-Bus service '%s'...", TRACKER_WRITEBACK_DBUS_NAME);
+        result = dbus_bus_request_name (connection,
+                                        TRACKER_WRITEBACK_DBUS_NAME, 0,
+                                        &error);
 
-	priv = TRACKER_WRITEBACK_DISPATCHER_GET_PRIVATE (object);
+        if (dbus_error_is_set (&error)) {
+		g_critical ("Could not acquire name:'%s', %s",
+                            TRACKER_WRITEBACK_DBUS_NAME, error.message);
+                dbus_error_free (&error);
+		dbus_connection_close (connection);
+                dbus_connection_unref (connection);
 
-	if (priv->client) {
-		tracker_disconnect (priv->client);
-	}
+                return NULL;
+        }
 
-	g_object_unref (priv->manager);
+        if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
+		g_critical ("D-Bus service name:'%s' is already taken, "
+		            "perhaps the application is already running?",
+                            TRACKER_WRITEBACK_DBUS_NAME);
+		dbus_connection_close (connection);
+                dbus_connection_unref (connection);
 
-	dbus_data_free (priv->dbus_data);
+		return NULL;
+	}
 
-	G_OBJECT_CLASS (tracker_writeback_dispatcher_parent_class)->finalize (object);
-}
+        /* Add message filter function */
+        if (!dbus_connection_add_filter (connection, message_filter, dispatcher, NULL)) {
+                g_critical ("Could not add message filter");
+		dbus_connection_close (connection);
+                dbus_connection_unref (connection);
 
-static void
-tracker_writeback_dispatcher_constructed (GObject *object)
-{
-	TrackerWritebackDispatcherPrivate *priv;
-	GList *modules;
+                return NULL;
+        }
 
-	priv = TRACKER_WRITEBACK_DISPATCHER_GET_PRIVATE (object);
-	modules = tracker_writeback_modules_list ();
+        /* Add match to receive writeback signals */
+        dbus_bus_add_match (connection, DBUS_MATCH_STR, &error);
 
-	while (modules) {
-		TrackerWritebackModule *module;
-		const gchar *path;
+        if (dbus_error_is_set (&error)) {
+                g_critical ("Could not add match rules, %s", error.message);
+                dbus_error_free (&error);
+		dbus_connection_close (connection);
+                dbus_connection_unref (connection);
 
-		path = modules->data;
-		module = tracker_writeback_module_get (path);
+                return NULL;
+        }
 
-		g_hash_table_insert (priv->modules, g_strdup (path), module);
-		modules = modules->next;
-	}
-}
+        /* Set up with the thread context */
+        dbus_connection_setup_with_g_main (connection, context);
 
-TrackerWritebackDispatcher *
-tracker_writeback_dispatcher_new (void)
-{
-	return g_object_new (TRACKER_TYPE_WRITEBACK_DISPATCHER, NULL);
+        return connection;
 }
 
-static gboolean
-sparql_rdf_types_match (const gchar * const *module_types, 
-                        const gchar * const *rdf_types)
+static void
+tracker_writeback_dispatcher_init (TrackerWritebackDispatcher *dispatcher)
 {
-	guint n;
-
-	for (n = 0; rdf_types[n] != NULL; n++) {
-		guint i;
-
-		for (i = 0; module_types[i] != NULL; i++) {
-			if (g_strcmp0 (module_types[i], rdf_types[n]) == 0) {
-				return TRUE;
-			}
-		}
-	}
-
-	return FALSE;
 }
 
 static void
-sparql_query_cb (GPtrArray *result,
-                 GError    *error,
-                 gpointer   user_data)
+tracker_writeback_dispatcher_finalize (GObject *object)
 {
-	TrackerWritebackDispatcherPrivate *priv;
-	QueryData *data;
-
-	data = user_data;
+        TrackerWritebackDispatcherPrivate *priv;
+        DBusError error;
 
-	if (result && result->len > 0) {
-		GHashTableIter iter;
-		gpointer key, value;
-		GStrv rdf_types;
+        priv = TRACKER_WRITEBACK_DISPATCHER_GET_PRIVATE (object);
+        dbus_error_init (&error);
 
-		priv = TRACKER_WRITEBACK_DISPATCHER_GET_PRIVATE (data->dispatcher);
+        dbus_bus_remove_match (priv->connection, DBUS_MATCH_STR, &error);
 
-		rdf_types = data->rdf_types;
+        if (dbus_error_is_set (&error)) {
+                g_critical ("Could not remove match rules, %s", error.message);
+                dbus_error_free (&error);
+        }
 
-		g_hash_table_iter_init (&iter, priv->modules);
+        dbus_connection_remove_filter (priv->connection, message_filter, object);
+        dbus_connection_unref (priv->connection);
 
-		while (g_hash_table_iter_next (&iter, &key, &value)) {
-			TrackerWritebackModule *module;
-			const gchar * const *module_types;
-
-			module = value;
-			module_types = tracker_writeback_module_get_rdf_types (module);
+        G_OBJECT_CLASS (tracker_writeback_dispatcher_parent_class)->finalize (object);
+}
 
-			if (sparql_rdf_types_match (module_types, (const gchar * const *) rdf_types)) {
-				TrackerWriteback *writeback;
+static void
+tracker_writeback_dispatcher_constructed (GObject *object)
+{
+        TrackerWritebackDispatcherPrivate *priv;
+        TrackerWritebackDispatcher *dispatcher;
+        DBusConnection *connection;
 
-				g_message ("  Updating metadata for file:'%s' using module:'%s'",
-				           "unknown",
-				           module->name);
+        dispatcher = TRACKER_WRITEBACK_DISPATCHER (object);
+	priv = TRACKER_WRITEBACK_DISPATCHER_GET_PRIVATE (dispatcher);
 
-				writeback = tracker_writeback_module_create (module);
-				tracker_writeback_update_metadata (writeback, result, priv->client);
-				g_object_unref (writeback);
-			}
-		}
-	} else {
-		g_message ("  No files qualify for updates");
-	}
+        connection = setup_dbus_connection (dispatcher, priv->context);
+        g_assert (connection != NULL);
 
-	g_strfreev (data->rdf_types);
-	g_slice_free (QueryData, data);
+        priv->connection = connection;
 }
 
 static void
-sparql_writeback_cb (DBusGProxy                 *proxy,
-                     GHashTable                 *subjects,
-                     TrackerWritebackDispatcher *object)
+tracker_writeback_dispatcher_get_property (GObject    *object,
+                                           guint       param_id,
+                                           GValue     *value,
+                                           GParamSpec *pspec)
 {
-	TrackerWritebackDispatcherPrivate *priv;
-	QueryData *data;
-	GHashTableIter iter;
-	gpointer key, value;
+        TrackerWritebackDispatcherPrivate *priv;
 
 	priv = TRACKER_WRITEBACK_DISPATCHER_GET_PRIVATE (object);
 
-	g_message ("Writeback signalled with %d subjects...",
-		   g_hash_table_size (subjects));
-
-	g_hash_table_iter_init (&iter, subjects);
-
-	while (g_hash_table_iter_next (&iter, &key, &value)) {
-		const gchar *subject = key;
-		const gchar * const *rdf_types = value;
-		gchar *query;
+        switch (param_id) {
+        case PROP_MAIN_CONTEXT:
+                g_value_set_pointer (value, priv->context);
+                break;
+        default:
+                G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
+		break;
+        }
+}
 
-		query = g_strdup_printf ("SELECT ?url ?predicate ?object {"
-		                         "  <%s> ?predicate ?object ;"
-		                         "  nie:isStoredAs ?url ."
-		                         "  ?predicate tracker:writeback true "
-		                         "}", 
-		                         subject);
+static void
+tracker_writeback_dispatcher_set_property (GObject      *object,
+                                           guint         param_id,
+                                           const GValue *value,
+                                           GParamSpec   *pspec)
+{
+        TrackerWritebackDispatcherPrivate *priv;
 
-		data = g_slice_new (QueryData);
-		data->dispatcher = object;
-		data->rdf_types = g_strdupv ((gchar **) rdf_types);
+	priv = TRACKER_WRITEBACK_DISPATCHER_GET_PRIVATE (object);
 
-		tracker_resources_sparql_query_async (priv->client,
-		                                      query,
-		                                      sparql_query_cb,
-		                                      data);
+        switch (param_id) {
+        case PROP_MAIN_CONTEXT:
+                {
+                        GMainContext *context;
+
+                        context = g_value_get_pointer (value);
+
+                        if (context != priv->context) {
+                                if (priv->context) {
+                                        g_main_context_unref (priv->context);
+                                        priv->context = NULL;
+                                }
+
+                                if (context) {
+                                        priv->context = g_main_context_ref (context);
+                                }
+                        }
+                }
+
+                break;
+        default:
+                G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
+		break;
+        }
+}
 
-		g_free (query);
-	}
+TrackerWritebackDispatcher *
+tracker_writeback_dispatcher_new (GMainContext *context)
+{
+	return g_object_new (TRACKER_TYPE_WRITEBACK_DISPATCHER,
+                             "context", context,
+                             NULL);
 }
diff --git a/src/tracker-writeback/tracker-writeback-dispatcher.h b/src/tracker-writeback/tracker-writeback-dispatcher.h
index 2ebecdf..4682ec1 100644
--- a/src/tracker-writeback/tracker-writeback-dispatcher.h
+++ b/src/tracker-writeback/tracker-writeback-dispatcher.h
@@ -1,6 +1,6 @@
 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
 /*
- * Copyright (C) 2009, Nokia
+ * Copyright (C) 2008, Nokia
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public
@@ -21,9 +21,9 @@
 #ifndef __TRACKER_WRITEBACK_DISPATCHER_H__
 #define __TRACKER_WRITEBACK_DISPATCHER_H__
 
+
 #include <glib-object.h>
 #include <gio/gio.h>
-
 #include <libtracker-client/tracker.h>
 
 G_BEGIN_DECLS
@@ -44,10 +44,15 @@ struct TrackerWritebackDispatcher {
 
 struct TrackerWritebackDispatcherClass {
         GObjectClass parent_class;
+
+        void (* writeback) (TrackerWritebackDispatcher *dispatcher,
+                            const gchar                *subject,
+                            const GStrv                 rdf_types);
 };
 
-GType                        tracker_writeback_dispatcher_get_type (void) G_GNUC_CONST;
-TrackerWritebackDispatcher * tracker_writeback_dispatcher_new      (void);
+GType          tracker_writeback_dispatcher_get_type (void) G_GNUC_CONST;
+
+TrackerWritebackDispatcher * tracker_writeback_dispatcher_new (GMainContext *context);
 
 G_END_DECLS
 
diff --git a/src/tracker-writeback/tracker-writeback-file.c b/src/tracker-writeback/tracker-writeback-file.c
index ad51c39..532185a 100644
--- a/src/tracker-writeback/tracker-writeback-file.c
+++ b/src/tracker-writeback/tracker-writeback-file.c
@@ -123,6 +123,8 @@ tracker_writeback_file_update_metadata (TrackerWriteback *writeback,
 	g_object_unref (file_info);
 
 	if (retval) {
+		g_message ("Locking file '%s' in order to write metadata", row[0]);
+
 		tracker_file_lock (file);
 
 		subjects[0] = row[0];



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]