[tracker/dbus-fd-experiment: 1/3] Add Tracker Steroids interface



commit 9baaf793aeb63974df117af38877909fe40d2594
Author: Adrien Bustany <abustany gnome org>
Date:   Thu May 20 17:55:31 2010 -0400

    Add Tracker Steroids interface
    
    The Steroids interface uses a local pipe to transfer query results,
    instead of sending them over DBus.
    It is composed of two methods:
    PrepareQuery <- SPARQL query
                 -> UNIX file descriptors where results will be written
                 -> Query identifier
    The PrepareQuery call initiates a query in Tracker. The query is not
    really prepared, in the sense that a PrepareQuery with an erroneous
    query will succeed. It's the Fetch call which will fail in that case.
    The returned query identifier can be used to initiate the Fetch.
    
    Fetch        <- Query identifier
    The Fetch call executes the query with the given identifier, and returns
    the results over the pipe returned by PrepareQuery.

 data/dbus/tracker-steroids.xml       |   17 ++
 src/tracker-store/Makefile.am        |    7 +-
 src/tracker-store/tracker-dbus.c     |   16 ++
 src/tracker-store/tracker-steroids.c |  457 ++++++++++++++++++++++++++++++++++
 src/tracker-store/tracker-steroids.h |   69 +++++
 5 files changed, 564 insertions(+), 2 deletions(-)
---
diff --git a/data/dbus/tracker-steroids.xml b/data/dbus/tracker-steroids.xml
new file mode 100644
index 0000000..e7af30b
--- /dev/null
+++ b/data/dbus/tracker-steroids.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<node name="/org/freedesktop/Tracker1">
+  <interface name="org.freedesktop.Tracker1.Steroids">
+    <!-- SPARQL Query without updates -->
+    <method name="PrepareQuery">
+      <annotation name="org.freedesktop.DBus.GLib.Async" value="true"/>
+      <arg type="s" name="query" direction="in" />
+      <arg type="h" name="pipe" direction="out" />
+      <arg type="u" name="query_id" direction="out" />
+    </method>
+    <method name="Fetch">
+      <annotation name="org.freedesktop.DBus.GLib.Async" value="true"/>
+      <arg type="u" name="query_id" direction="in" />
+    </method>
+  </interface>
+</node>
diff --git a/src/tracker-store/Makefile.am b/src/tracker-store/Makefile.am
index edf3ed8..8bc399e 100644
--- a/src/tracker-store/Makefile.am
+++ b/src/tracker-store/Makefile.am
@@ -52,7 +52,9 @@ tracker_store_SOURCES =							\
 	tracker-store.c							\
 	tracker-store.h							\
 	tracker-status.c						\
-	tracker-status.h
+	tracker-status.h						\
+	tracker-steroids.c						\
+	tracker-steroids.h
 
 if OS_WIN32
 tracker_store_win_libs = -lws2_32 -lkernel32
@@ -84,7 +86,8 @@ dbus_sources = 								\
 	tracker-resources-glue.h					\
 	tracker-statistics-glue.h					\
 	tracker-resources-class-glue.h					\
-	tracker-status-glue.h
+	tracker-status-glue.h						\
+	tracker-steroids-glue.h
 
 tracker-marshal.h: tracker-marshal.list
 	$(AM_V_GEN)$(GLIB_GENMARSHAL) $< --prefix=tracker_marshal --header > $@
diff --git a/src/tracker-store/tracker-dbus.c b/src/tracker-store/tracker-dbus.c
index 440c48e..1280a2b 100644
--- a/src/tracker-store/tracker-dbus.c
+++ b/src/tracker-store/tracker-dbus.c
@@ -42,6 +42,8 @@
 #include "tracker-backup.h"
 #include "tracker-backup-glue.h"
 #include "tracker-marshal.h"
+#include "tracker-steroids.h"
+#include "tracker-steroids-glue.h"
 
 static DBusGConnection *connection;
 static DBusGProxy      *gproxy;
@@ -289,6 +291,20 @@ tracker_dbus_register_objects (void)
 	                      TRACKER_RESOURCES_PATH);
 	objects = g_slist_prepend (objects, object);
 
+	/* Add org.freedesktop.Tracker1.Steroids */
+	object = tracker_steroids_new ();
+	if (!object) {
+		g_critical ("Could net create TrackerSteroids object to register");
+		return FALSE;
+	}
+
+	dbus_register_object (connection,
+	                      gproxy,
+	                      G_OBJECT (object),
+	                      &dbus_glib_tracker_steroids_object_info,
+	                      TRACKER_STEROIDS_PATH);
+	objects = g_slist_prepend (objects, object);
+
 	/* Reverse list since we added objects at the top each time */
 	objects = g_slist_reverse (objects);
 
diff --git a/src/tracker-store/tracker-steroids.c b/src/tracker-store/tracker-steroids.c
new file mode 100644
index 0000000..3b38302
--- /dev/null
+++ b/src/tracker-store/tracker-steroids.c
@@ -0,0 +1,457 @@
+/*
+ * Copyright (C) 2010, Adrien Bustany <abustany gnome org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ */
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "tracker-dbus.h"
+#include "tracker-steroids.h"
+#include "tracker-store.h"
+
+/**
+ * /!\ IMPORTANT WARNING /!\
+ *
+ * DBus 1.3 is required for this feature to work, since UNIX FD passing is not
+ * present in earlier versions. However, using UNIX FD passing with DBus will
+ * make Valgrind stop, since the fcntl command F_DUPFD_CLOEXEC is not supported
+ * as of version 3.5.
+ * This has been reported here: https://bugs.kde.org/show_bug.cgi?id=238696
+ */
+
+G_DEFINE_TYPE (TrackerSteroids, tracker_steroids, G_TYPE_OBJECT)
+
+#define TRACKER_STEROIDS_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), TRACKER_TYPE_STEROIDS, TrackerSteroidsPrivate))
+
+typedef struct {
+	GHashTable *clients;
+} TrackerSteroidsPrivate;
+
+typedef struct {
+	TrackerSteroids *parent;
+	char *query;
+	int fd;
+	unsigned int send_buffer_index;
+	char send_buffer[STEROIDS_BUFFER_SIZE];
+	guint request_id;
+	DBusGMethodInvocation *context;
+} ClientInfo;
+
+typedef struct {
+	GError *error;
+	gpointer user_data;
+} InThreadPtr;
+
+static void tracker_steroids_finalize (GObject *object);
+
+static void
+tracker_steroids_class_init (TrackerSteroidsClass *klass)
+{
+	GObjectClass *object_class;
+
+	object_class = G_OBJECT_CLASS (klass);
+
+	object_class->finalize = tracker_steroids_finalize;
+
+	g_type_class_add_private (object_class, sizeof (TrackerSteroidsPrivate));
+}
+
+static void
+tracker_steroids_init (TrackerSteroids *object)
+{
+	TrackerSteroidsPrivate *priv = TRACKER_STEROIDS_GET_PRIVATE (object);
+
+	priv->clients = g_hash_table_new (g_direct_hash, g_direct_equal);
+}
+
+TrackerSteroids*
+tracker_steroids_new (void)
+{
+	return g_object_new (TRACKER_TYPE_STEROIDS, NULL);
+}
+
+static void
+destroy_client_info (gpointer user_data)
+{
+	ClientInfo *info = user_data;
+
+	if (info->query) {
+		g_free (info->query);
+	}
+
+	if (info->fd) {
+		close (info->fd);
+	}
+
+	g_slice_free (ClientInfo, user_data);
+}
+
+static void
+buffer_write_int (char *dst, int value)
+{
+	memset (dst++, (value      ) & 0xff, sizeof (char));
+	memset (dst++, (value >>  8) & 0xff, sizeof (char));
+	memset (dst++, (value >> 16) & 0xff, sizeof (char));
+	memset (dst++, (value >> 24) & 0xff, sizeof (char));
+}
+
+static void
+client_write_int (ClientInfo *info, int value)
+{
+	char *dst = info->send_buffer + info->send_buffer_index;
+	buffer_write_int (dst, value);
+	info->send_buffer_index += sizeof (int);
+}
+
+static void
+buffer_send (int fd, char *buf, int size)
+{
+	ssize_t sent = 0;
+
+	while (sent != size) {
+		ssize_t ret = write (fd,
+		                     buf + sent,
+		                     size - sent);
+
+		if (ret == -1) {
+			g_critical ("Could not send buffer");
+		}
+
+		sent += ret;
+	}
+}
+
+static void
+page_flush (ClientInfo *info)
+{
+	/* Put an "end of page" marker if there was still some space left
+	 * The "end of page" marker is one byte long, so we're sure there's at
+	 * least space for the marker */
+	if (STEROIDS_BUFFER_SIZE != info->send_buffer_index) {
+		memset (info->send_buffer + info->send_buffer_index,
+		        STEROIDS_EOP,
+		        sizeof (char));
+	}
+
+	buffer_send (info-> fd, info->send_buffer, STEROIDS_BUFFER_SIZE);
+
+	info->send_buffer_index = 0;
+}
+
+static void
+query_callback (gpointer  inthread_data,
+                GError   *error,
+                gpointer  user_data)
+{
+	InThreadPtr *ptr  = inthread_data;
+	ClientInfo  *info = user_data;
+	TrackerSteroidsPrivate *priv = TRACKER_STEROIDS_GET_PRIVATE (info->parent);
+	DBusMessage *reply;
+
+	if (ptr && ptr->error) {
+		/* Client is still there, but query failed */
+		tracker_dbus_request_failed (info->request_id,
+		                             info->context,
+		                             &ptr->error,
+		                             NULL);
+		dbus_g_method_return_error (info->context, ptr->error);
+		g_error_free (ptr->error);
+	} else if (error) {
+		/* Client has disappeared */
+		tracker_dbus_request_failed (info->request_id,
+		                             info->context,
+		                             &error,
+		                             NULL);
+		dbus_g_method_return_error (info->context, error);
+	} else {
+		tracker_dbus_request_success (info->request_id,
+		                              info->context);
+		reply = dbus_g_method_get_reply (info->context);
+		dbus_g_method_send_reply (info->context, reply);
+	}
+
+	if (ptr) {
+		g_slice_free (InThreadPtr, ptr);
+	}
+
+	g_hash_table_remove (priv->clients, info);
+}
+
+static gpointer
+query_inthread (TrackerDBCursor *cursor,
+                GError          *error,
+                gpointer         user_data)
+{
+	InThreadPtr *ptr  = g_slice_new0 (InThreadPtr);
+	ClientInfo  *info = user_data;
+	GError *loop_error = NULL;
+	guint n_columns;
+	int *column_sizes;
+	int *column_offsets;
+	const gchar **column_data;
+
+	if (error) {
+		client_write_int (info, STEROIDS_RC_ERROR);
+		page_flush (info);
+		ptr->error = g_error_copy (error);
+		return ptr;
+	}
+
+	n_columns = tracker_db_cursor_get_n_columns (cursor);
+
+	column_sizes = g_malloc (n_columns * sizeof (int));
+	column_offsets = g_malloc (n_columns * sizeof (int));
+	column_data = g_malloc (n_columns * sizeof (char*));
+
+	while (tracker_db_cursor_iter_next (cursor, &loop_error)) {
+		int i;
+		guint last_offset = -1;
+		guint row_size;
+
+		if (loop_error != NULL) {
+			break;
+		}
+
+		for (i = 0; i < n_columns ; i++) {
+			const gchar *str;
+
+			str = tracker_db_cursor_get_string (cursor, i);
+
+			column_sizes[i] = str ? strlen (str) : 0;
+			column_data[i]  = str;
+
+			last_offset += column_sizes[i] + 1;
+			column_offsets[i] = last_offset;
+		}
+
+		row_size = sizeof (int)                   /* return code */
+		         + sizeof (int)                   /* column count */
+		         + n_columns * sizeof (int)    /* column offsets */
+		         + last_offset                    /* columns data */
+		         + n_columns * sizeof (char);  /* trailing \0 for each column */
+
+		if (row_size > STEROIDS_BUFFER_SIZE) {
+			char *buffer = g_malloc (sizeof (char) * row_size + sizeof (int));
+			char *dst = buffer;
+
+			/* Flush the current page, if there was something in it */
+			if (info->send_buffer_index) {
+				page_flush (info);
+			}
+
+			buffer_write_int (dst, STEROIDS_RC_LARGEROW);
+			dst += sizeof (int);
+			buffer_write_int (dst, row_size);
+			dst += sizeof (int);
+			buffer_write_int (dst, n_columns);
+			dst += sizeof (int);
+			memcpy (dst,
+			        column_offsets,
+			        n_columns * sizeof (int));
+			dst += n_columns * sizeof (int);
+
+			for (i = 0; i < n_columns; i++) {
+				memcpy (dst,
+				        column_data[i],
+				        column_sizes[i]);
+				dst += column_sizes[i];
+				memset (dst, 0, 1);
+				dst ++;
+			}
+
+			buffer_send (info->fd, buffer, row_size);
+
+			g_free (buffer);
+			continue;
+		}
+
+		if (row_size > STEROIDS_BUFFER_SIZE - info->send_buffer_index) {
+			page_flush (info);
+		}
+
+		client_write_int (info, STEROIDS_RC_ROW);
+		client_write_int (info, n_columns);
+		memcpy (info->send_buffer + info->send_buffer_index,
+		        column_offsets,
+		        n_columns * sizeof (int));
+		info->send_buffer_index += n_columns * sizeof (int);
+
+		for (i = 0; i < n_columns; i++) {
+			memcpy (info->send_buffer + info->send_buffer_index,
+			        column_data[i],
+			        column_sizes[i]);
+			info->send_buffer_index += column_sizes[i];
+			memset (info->send_buffer + info->send_buffer_index, 0, 1);
+			info->send_buffer_index ++;
+		}
+	}
+
+	/* We flush to ensure we'll have enough space to write the DONE code */
+	if (info->send_buffer_index) {
+		page_flush (info);
+	}
+	client_write_int (info, STEROIDS_RC_DONE);
+	page_flush (info);
+
+	close (info->fd);
+
+	if (loop_error) {
+		ptr->error = loop_error;
+	}
+
+	g_free (column_sizes);
+	g_free (column_offsets);
+	g_free (column_data);
+
+	return ptr;
+}
+
+void
+tracker_steroids_prepare_query (TrackerSteroids        *steroids,
+                                const char             *query,
+                                DBusGMethodInvocation  *context,
+                                GError                **error)
+{
+	TrackerSteroidsPrivate *priv = TRACKER_STEROIDS_GET_PRIVATE (steroids);
+	guint                   request_id;
+	gchar                  *sender;
+	int                     pipefd[2];
+	DBusMessage            *reply;
+	DBusMessageIter         iter;
+	GError *inner_error = NULL;
+	ClientInfo             *info;
+
+	request_id = tracker_dbus_get_next_request_id ();
+
+	tracker_dbus_async_return_if_fail (query != NULL, context);
+
+	tracker_dbus_request_new (request_id,
+	                          context,
+	                          "%s(): '%s'",
+	                          __FUNCTION__,
+	                          query);
+
+	if (pipe (pipefd) < 0) {
+		g_set_error (&inner_error, TRACKER_DBUS_ERROR, 0, "Cannot open pipe");
+
+		tracker_dbus_request_failed (request_id,
+		                             context,
+		                             &inner_error,
+		                             NULL);
+		dbus_g_method_return_error (context, inner_error);
+		g_propagate_error (error, inner_error);
+
+		return;
+	}
+
+	info = g_slice_new0 (ClientInfo);
+	info->parent = steroids;
+	info->fd = pipefd[1];
+	info->query = g_strdup (query);
+
+	sender = dbus_g_method_get_sender (context);
+
+	reply = dbus_g_method_get_reply (context);
+
+	dbus_message_iter_init_append (reply, &iter);
+
+	if (!dbus_message_iter_append_basic (&iter,
+	                                     DBUS_TYPE_UNIX_FD,
+	                                     &pipefd[0])) {
+		g_critical ("FD passing not supported");
+
+		g_set_error (&inner_error, TRACKER_DBUS_ERROR, 0, "FD passing not supported");
+
+		tracker_dbus_request_failed (request_id,
+		                             context,
+		                             &inner_error,
+		                             NULL);
+		dbus_g_method_return_error (context, inner_error);
+		g_propagate_error (error, inner_error);
+
+		g_slice_free (ClientInfo, info);
+		g_free (sender);
+		return;
+	}
+
+	dbus_message_iter_append_basic (&iter, DBUS_TYPE_UINT32, &request_id);
+
+	g_hash_table_insert (priv->clients,
+	                     GUINT_TO_POINTER (request_id),
+	                     info);
+
+	tracker_dbus_request_success (request_id, context);
+	dbus_g_method_send_reply (context, reply);
+
+	g_free (sender);
+}
+
+void
+tracker_steroids_fetch (TrackerSteroids        *steroids,
+                        guint                   query_id,
+                        DBusGMethodInvocation  *context,
+                        GError                **error)
+{
+	TrackerSteroidsPrivate *priv = TRACKER_STEROIDS_GET_PRIVATE (steroids);
+	ClientInfo             *info;
+	guint                   request_id;
+	gchar                  *sender;
+	GError                 *inner_error = NULL;
+
+	request_id = tracker_dbus_get_next_request_id ();
+
+	tracker_dbus_request_new (request_id,
+	                          context,
+	                          "%s(): %u",
+	                          __FUNCTION__,
+	                          query_id);
+
+	info = g_hash_table_lookup (priv->clients,
+	                            GUINT_TO_POINTER (query_id));
+
+	if (!info) {
+		g_set_error (&inner_error, TRACKER_DBUS_ERROR, 0, "Wrong query id");
+		tracker_dbus_request_failed (request_id,
+		                             context,
+		                             &inner_error,
+		                             NULL);
+		dbus_g_method_return_error (info->context, inner_error);
+		g_propagate_error (error, inner_error);
+
+		return;
+	}
+
+	info->request_id = request_id;
+	info->context = context;
+
+	sender = dbus_g_method_get_sender (context);
+
+	tracker_store_sparql_query (info->query, TRACKER_STORE_PRIORITY_HIGH,
+	                            query_inthread, query_callback, sender,
+	                            info, destroy_client_info);
+
+	g_free (sender);
+}
+
+static void
+tracker_steroids_finalize (GObject *object)
+{
+	TrackerSteroidsPrivate *priv = TRACKER_STEROIDS_GET_PRIVATE (object);
+
+	g_hash_table_unref (priv->clients);
+}
diff --git a/src/tracker-store/tracker-steroids.h b/src/tracker-store/tracker-steroids.h
new file mode 100644
index 0000000..f95083a
--- /dev/null
+++ b/src/tracker-store/tracker-steroids.h
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2010, Adrien Bustany <abustany gnome org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ */
+
+#ifndef __TRACKER_STEROIDS_H__
+#define __TRACKER_STEROIDS_H__
+
+#include <glib-object.h>
+
+#define TRACKER_STEROIDS_SERVICE   "org.freedesktop.Tracker1"
+#define TRACKER_STEROIDS_PATH      "/org/freedesktop/Tracker1/Steroids"
+#define TRACKER_STEROIDS_INTERFACE "org.freedesktop.Tracker1.Steroids"
+
+G_BEGIN_DECLS
+
+#define TRACKER_TYPE_STEROIDS            (tracker_steroids_get_type ())
+#define TRACKER_STEROIDS(object)         (G_TYPE_CHECK_INSTANCE_CAST ((object), TRACKER_TYPE_STEROIDS, TrackerSteroids))
+#define TRACKER_STEROIDS_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST ((klass), TRACKER_TYPE_DBUS_STEROIDS, TrackerSteroidsClass))
+#define TRACKER_IS_STEROIDS(object)      (G_TYPE_CHECK_INSTANCE_TYPE ((object), TRACKER_TYPE_STEROIDS))
+#define TRACKER_IS_STEROIDS_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), TRACKER_TYPE_STEROIDS))
+#define TRACKER_STEROIDS_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj), TRACKER_TYPE_STEROIDS, TrackerSteroidsClass))
+
+#define STEROIDS_BUFFER_SIZE 65536
+
+#define STEROIDS_RC_ERROR 0
+#define STEROIDS_RC_ROW   1
+#define STEROIDS_RC_DONE  2
+#define STEROIDS_RC_LARGEROW 3
+
+#define STEROIDS_EOP      0xFF
+
+typedef struct TrackerSteroids      TrackerSteroids;
+typedef struct TrackerSteroidsClass TrackerSteroidsClass;
+
+struct TrackerSteroids {
+	GObject parent;
+};
+
+struct TrackerSteroidsClass {
+	GObjectClass parent;
+};
+
+GType            tracker_steroids_get_type      (void) G_GNUC_CONST;
+TrackerSteroids* tracker_steroids_new           (void);
+void             tracker_steroids_prepare_query (TrackerSteroids        *steroids,
+                                                 const gchar            *query,
+                                                 DBusGMethodInvocation  *context,
+                                                 GError                **error);
+void             tracker_steroids_fetch         (TrackerSteroids        *steroids,
+                                                 guint                   query_id,
+                                                 DBusGMethodInvocation  *context,
+                                                 GError                **error);
+
+#endif /* __TRACKER_STEROIDS_H__ */



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]