[tracker/dbus-fd-experiment-gio: 33/41] Add gio based update



commit d26ceca249d6987f61426db379fe8e04241aaf7e
Author: Adrien Bustany <abustany gnome org>
Date:   Thu Jun 10 18:26:10 2010 -0400

    Add gio based update

 src/libtracker-client/tracker.c |  283 ++++++++++++++++++++++++++++++++++++---
 1 files changed, 264 insertions(+), 19 deletions(-)
---
diff --git a/src/libtracker-client/tracker.c b/src/libtracker-client/tracker.c
index 18c5299..aa42b5b 100644
--- a/src/libtracker-client/tracker.c
+++ b/src/libtracker-client/tracker.c
@@ -183,7 +183,7 @@ typedef enum {
 
 typedef struct {
 	FastOperationType      operation;
-	gchar                 *query;
+	const gchar           *query;
 	gpointer               user_data;
 	GInputStream          *input_stream;
 	GOutputStream         *output_stream;
@@ -815,6 +815,30 @@ find_conversion (const char  *format,
 }
 
 #ifdef HAVE_DBUS_FD_PASSING
+static GHashTable*
+unmarshall_hash_table (DBusMessageIter *iter) {
+	GHashTable *result;
+	DBusMessageIter subiter, subsubiter;
+
+	result = g_hash_table_new (g_str_hash, g_str_equal);
+
+	dbus_message_iter_recurse (iter, &subiter);
+
+	while (dbus_message_iter_get_arg_type (&subiter) != DBUS_TYPE_INVALID) {
+		const gchar *key, *value;
+
+		dbus_message_iter_recurse (&subiter, &subsubiter);
+		dbus_message_iter_get_basic (&subsubiter, &key);
+		dbus_message_iter_next (&subsubiter);
+		dbus_message_iter_get_basic (&subsubiter, &value);
+		g_hash_table_insert (result, g_strdup (key), g_strdup (value));
+
+		dbus_message_iter_next (&subiter);
+	}
+
+	return result;
+}
+
 static int
 iterator_buffer_read_int (TrackerResultIterator *iterator)
 {
@@ -825,6 +849,82 @@ iterator_buffer_read_int (TrackerResultIterator *iterator)
 	return GINT32_FROM_BE (v);
 }
 
+static void
+sparql_update_fast_callback (DBusPendingCall *call,
+                             void            *user_data)
+{
+	FastAsyncData *data = user_data;
+	DBusMessage *reply;
+	DBusError dbus_error;
+	GError *error = NULL;
+	DBusMessageIter iter, subiter, subsubiter;
+	GPtrArray *result;
+
+	dbus_error_init (&dbus_error);
+
+	reply = dbus_pending_call_steal_reply (call);
+
+	g_assert (reply);
+
+	if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) {
+		dbus_set_error_from_message (&dbus_error, reply);
+		dbus_set_g_error (&error, &dbus_error);
+
+		switch (data->operation) {
+		case FAST_UPDATE:
+		case FAST_UPDATE_BATCH:
+			(* data->void_callback) (error, data->user_data);
+			break;
+		case FAST_UPDATE_BLANK:
+			(* data->gptrarray_callback) (NULL, error, data->user_data);
+			break;
+		default:
+			g_assert_not_reached ();
+			break;
+		}
+
+		dbus_message_unref (reply);
+		dbus_pending_call_unref (call);
+		g_slice_free (FastAsyncData, data);
+		return;
+	}
+
+	switch (data->operation) {
+	case FAST_UPDATE:
+	case FAST_UPDATE_BATCH:
+		(* data->void_callback) (NULL, data->user_data);
+		break;
+	case FAST_UPDATE_BLANK:
+		result = g_ptr_array_new ();
+		dbus_message_iter_init (reply, &iter);
+		dbus_message_iter_recurse (&iter, &subiter);
+
+		while (dbus_message_iter_get_arg_type (&subiter) != DBUS_TYPE_INVALID) {
+			GPtrArray *inner_array;
+
+			inner_array = g_ptr_array_new ();
+			g_ptr_array_add (result, inner_array);
+			dbus_message_iter_recurse (&subiter, &subsubiter);
+
+			while (dbus_message_iter_get_arg_type (&subsubiter) != DBUS_TYPE_INVALID) {
+				g_ptr_array_add (inner_array, unmarshall_hash_table (&subsubiter));
+				dbus_message_iter_next (&subsubiter);
+			}
+
+			dbus_message_iter_next (&subiter);
+		}
+		(* data->gptrarray_callback) (result, error, data->user_data);
+		break;
+	default:
+		g_assert_not_reached ();
+		break;
+	}
+
+	dbus_message_unref (reply);
+	dbus_pending_call_unref (call);
+	g_slice_free (FastAsyncData, data);
+}
+
 static DBusMessage*
 sparql_update_fast (TrackerClient      *client,
                     const gchar        *query,
@@ -944,30 +1044,106 @@ sparql_update_fast (TrackerClient      *client,
 	return reply;
 }
 
-static GHashTable*
-unmarshall_hash_table (DBusMessageIter *iter) {
-	GHashTable *result;
-	DBusMessageIter subiter, subsubiter;
+static void
+sparql_update_fast_async (TrackerClient      *client,
+                          FastAsyncData      *data,
+                          GError            **error)
+{
+	TrackerClientPrivate *private;
+	DBusConnection *connection;
+	gchar *dbus_method;
+	DBusMessage *message;
+	DBusMessageIter iter;
+	DBusPendingCall *call;
+	DBusError dbus_error;
+	int pipefd[2];
+	GOutputStream *output_stream;
+	GDataOutputStream *data_output_stream;
+	GError *inner_error = NULL;
 
-	result = g_hash_table_new (g_str_hash, g_str_equal);
+	private = TRACKER_CLIENT_GET_PRIVATE (client);
 
-	dbus_message_iter_recurse (iter, &subiter);
+	if (pipe (pipefd) < 0) {
+		g_set_error (error,
+		             TRACKER_CLIENT_ERROR,
+		             TRACKER_CLIENT_ERROR_UNSUPPORTED,
+		             "Cannot open pipe");
+		return;
+	}
 
-	while (dbus_message_iter_get_arg_type (&subiter) != DBUS_TYPE_INVALID) {
-		const gchar *key, *value;
+	connection = dbus_g_connection_get_connection (private->connection);
 
-		dbus_message_iter_recurse (&subiter, &subsubiter);
-		dbus_message_iter_get_basic (&subsubiter, &key);
-		dbus_message_iter_next (&subsubiter);
-		dbus_message_iter_get_basic (&subsubiter, &value);
-		g_hash_table_insert (result, g_strdup (key), g_strdup (value));
+	dbus_error_init (&dbus_error);
 
-		dbus_message_iter_next (&subiter);
+	switch (data->operation) {
+		case FAST_UPDATE:
+			dbus_method = "Update";
+			break;
+		case FAST_UPDATE_BLANK:
+			dbus_method = "UpdateBlank";
+			break;
+		case FAST_UPDATE_BATCH:
+			dbus_method = "BatchUpdate";
+			break;
+		default:
+			g_assert_not_reached ();
 	}
 
-	return result;
+	message = dbus_message_new_method_call (TRACKER_STEROIDS_SERVICE,
+	                                        TRACKER_STEROIDS_PATH,
+	                                        TRACKER_STEROIDS_INTERFACE,
+	                                        dbus_method);
+	dbus_message_iter_init_append (message, &iter);
+	dbus_message_iter_append_basic (&iter, DBUS_TYPE_UNIX_FD, &pipefd[0]);
+	dbus_connection_send_with_reply (connection,
+	                                 message,
+	                                 &call,
+	                                 -1);
+	dbus_message_unref (message);
+	close (pipefd[0]);
+
+	if (!call) {
+		g_set_error (error,
+		             TRACKER_CLIENT_ERROR,
+		             TRACKER_CLIENT_ERROR_UNSUPPORTED,
+		             "FD passing unsupported or connection disconnected");
+		return;
+	}
+
+	output_stream = g_unix_output_stream_new (pipefd[1], TRUE);
+	data_output_stream = g_data_output_stream_new (output_stream);
+
+	g_data_output_stream_put_int32 (data_output_stream,
+	                                strlen (data->query),
+	                                NULL,
+	                                &inner_error);
+
+	if (inner_error) {
+		g_propagate_error (error, inner_error);
+		g_object_unref (data_output_stream);
+		g_object_unref (output_stream);
+		return;
+	}
+
+	g_data_output_stream_put_string (data_output_stream,
+	                                 data->query,
+	                                 NULL,
+	                                 &inner_error);
+
+	if (inner_error) {
+		g_propagate_error (error, inner_error);
+		g_object_unref (data_output_stream);
+		g_object_unref (output_stream);
+		return;
+	}
+
+	g_object_unref (data_output_stream);
+	g_object_unref (output_stream);
+
+	dbus_pending_call_set_notify (call, sparql_update_fast_callback, data, NULL);
 }
 
+
 #endif
 
 /**
@@ -2170,7 +2346,30 @@ tracker_resources_sparql_update_fast_async (TrackerClient    *client,
                                             TrackerReplyVoid  callback,
                                             gpointer          user_data)
 {
-	return 0;
+	FastAsyncData *data;
+	GError *error = NULL;
+
+	g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
+	g_return_val_if_fail (query != NULL, 0);
+	g_return_val_if_fail (callback != NULL, 0);
+
+	data = g_slice_new0 (FastAsyncData);
+	data->operation = FAST_UPDATE;
+	data->query = query;
+	data->void_callback = callback;
+	data->user_data = user_data;
+
+	sparql_update_fast_async (client, data, &error);
+
+	if (error) {
+		g_critical ("Could not initiate update: %s", error->message);
+		g_error_free (error);
+		g_slice_free (FastAsyncData, data);
+
+		return 0;
+	}
+
+	return 42;
 }
 
 guint
@@ -2210,7 +2409,30 @@ tracker_resources_sparql_update_blank_fast_async (TrackerClient         *client,
                                                   TrackerReplyGPtrArray  callback,
                                                   gpointer               user_data)
 {
-	return 0;
+	FastAsyncData *data;
+	GError *error = NULL;
+
+	g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
+	g_return_val_if_fail (query != NULL, 0);
+	g_return_val_if_fail (callback != NULL, 0);
+
+	data = g_slice_new0 (FastAsyncData);
+	data->operation = FAST_UPDATE_BLANK;
+	data->query = query;
+	data->gptrarray_callback = callback;
+	data->user_data = user_data;
+
+	sparql_update_fast_async (client, data, &error);
+
+	if (error) {
+		g_critical ("Could not initiate update: %s", error->message);
+		g_error_free (error);
+		g_slice_free (FastAsyncData, data);
+
+		return 0;
+	}
+
+	return 42;
 }
 
 /**
@@ -2264,7 +2486,30 @@ tracker_resources_batch_sparql_update_fast_async (TrackerClient    *client,
                                                   TrackerReplyVoid  callback,
                                                   gpointer          user_data)
 {
-	return 0;
+	FastAsyncData *data;
+	GError *error = NULL;
+
+	g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
+	g_return_val_if_fail (query != NULL, 0);
+	g_return_val_if_fail (callback != NULL, 0);
+
+	data = g_slice_new0 (FastAsyncData);
+	data->operation = FAST_UPDATE_BATCH;
+	data->query = query;
+	data->void_callback = callback;
+	data->user_data = user_data;
+
+	sparql_update_fast_async (client, data, &error);
+
+	if (error) {
+		g_critical ("Could not initiate update: %s", error->message);
+		g_error_free (error);
+		g_slice_free (FastAsyncData, data);
+
+		return 0;
+	}
+
+	return 42;
 }
 
 /**



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]