[tracker/dbus-fd-experiment-gio: 3/11] Steroids client: make async query use GIO and not threads



commit 4fea19d382809615b91fb92eb319ca8b494b1d9c
Author: Adrien Bustany <abustany gnome org>
Date:   Thu Jun 10 12:31:55 2010 -0400

    Steroids client: make async query use GIO and not threads

 src/libtracker-client/tracker.c |  358 +++++++++++++++++----------------------
 1 files changed, 155 insertions(+), 203 deletions(-)
---
diff --git a/src/libtracker-client/tracker.c b/src/libtracker-client/tracker.c
index 11a3fd7..18c5299 100644
--- a/src/libtracker-client/tracker.c
+++ b/src/libtracker-client/tracker.c
@@ -117,10 +117,6 @@ typedef struct {
 	GList *writeback_callbacks;
 
 	gboolean is_constructed;
-
-#ifdef HAVE_DBUS_FD_PASSING
-	GThreadPool *thread_pool;
-#endif
 } TrackerClientPrivate;
 
 typedef struct {
@@ -165,6 +161,7 @@ struct TrackerResultIterator {
 	int rc;
 	char *buffer;
 	int buffer_index;
+	long buffer_size;
 
 	guint  n_columns;
 	int   *offsets;
@@ -187,8 +184,10 @@ typedef enum {
 typedef struct {
 	FastOperationType      operation;
 	gchar                 *query;
-	GError                *error;
 	gpointer               user_data;
+	GInputStream          *input_stream;
+	GOutputStream         *output_stream;
+	DBusPendingCall       *dbus_call;
 	union {
 		GPtrArray             *result_gptrarray;
 		TrackerResultIterator *result_iterator;
@@ -214,6 +213,10 @@ static void     client_get_property  (GObject      *object,
                                       GParamSpec   *pspec);
 static void     client_constructed   (GObject      *object);
 
+#ifdef HAVE_DBUS_FD_PASSING
+static int      iterator_buffer_read_int (TrackerResultIterator *iterator);
+#endif
+
 enum {
 	PROP_0,
 	PROP_ENABLE_WARNINGS,
@@ -281,74 +284,6 @@ writeback_cb (DBusGProxy       *proxy,
 	}
 }
 
-static gboolean
-thread_callback (gpointer data)
-{
-	FastAsyncData *async_data = data;
-
-	switch (async_data->operation) {
-		case FAST_QUERY:
-			(* async_data->iterator_callback) (async_data->result_iterator,
-			                                   async_data->error,
-			                                   async_data->user_data);
-			break;
-		case FAST_UPDATE:
-		case FAST_UPDATE_BATCH:
-			(* async_data->void_callback) (async_data->error,
-			                               async_data->user_data);
-			break;
-		case FAST_UPDATE_BLANK:
-			(* async_data->gptrarray_callback) (async_data->result_gptrarray,
-			                                    async_data->error,
-			                                    async_data->user_data);
-			break;
-		default:
-			g_assert_not_reached ();
-	}
-
-	g_free (async_data->query);
-
-	if (async_data->error) {
-		g_error_free (async_data->error);
-	}
-
-	return FALSE;
-}
-
-static void
-thread_dispatch (gpointer data, gpointer user_data)
-{
-	FastAsyncData *async_data = data;
-	TrackerClient *client = user_data;
-
-	switch (async_data->operation) {
-		case FAST_QUERY:
-			async_data->result_iterator = tracker_resources_sparql_query_iterate (client,
-			                                                                      async_data->query,
-			                                                                      &async_data->error);
-			break;
-		case FAST_UPDATE:
-			tracker_resources_sparql_update_fast (client,
-			                                      async_data->query,
-			                                      &async_data->error);
-			break;
-		case FAST_UPDATE_BLANK:
-			async_data->result_gptrarray = tracker_resources_sparql_update_blank_fast (client,
-			                                                                           async_data->query,
-			                                                                           &async_data->error);
-			break;
-		case FAST_UPDATE_BATCH:
-			tracker_resources_batch_sparql_update_fast (client,
-			                                            async_data->query,
-			                                            &async_data->error);
-			break;
-		default:
-			g_assert_not_reached ();
-	}
-
-	g_idle_add (thread_callback, async_data);
-}
-
 static void
 tracker_client_class_init (TrackerClientClass *klass)
 {
@@ -407,12 +342,6 @@ client_finalize (GObject *object)
 	if (private->pending_calls) {
 		g_hash_table_unref (private->pending_calls);
 	}
-
-	if (private->thread_pool) {
-		g_thread_pool_free (private->thread_pool,
-		                    TRUE,
-		                    TRUE);
-	}
 }
 
 static void
@@ -518,14 +447,6 @@ client_constructed (GObject *object)
 	                         TRACKER_TYPE_STR_STRV_MAP,
 	                         G_TYPE_INVALID);
 
-#ifdef HAVE_DBUS_FD_PASSING
-	private->thread_pool = g_thread_pool_new (thread_dispatch,
-	                                          object,
-	                                          5,
-	                                          FALSE,
-	                                          NULL);
-#endif
-
 	private->is_constructed = TRUE;
 }
 
@@ -572,6 +493,73 @@ callback_with_void (DBusGProxy *proxy,
 	g_slice_free (CallbackVoid, cb);
 }
 
+static void
+fast_async_callback_iterator (GObject      *source_object,
+                              GAsyncResult *result,
+                              gpointer      user_data)
+{
+	DBusMessage *reply;
+	DBusError dbus_error;
+	GError *inner_error = NULL;
+	GError *error = NULL;
+	FastAsyncData *data = user_data;
+	TrackerResultIterator *iterator = data->result_iterator;
+
+	dbus_error_init (&dbus_error);
+
+	iterator->buffer_size = g_output_stream_splice_finish (data->output_stream,
+	                                                       result,
+	                                                       &inner_error);
+
+	iterator->buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream));
+
+	g_object_unref (data->input_stream);
+	g_object_unref (data->output_stream);
+
+	if (inner_error) {
+		g_set_error (&error,
+		             TRACKER_CLIENT_ERROR,
+		             TRACKER_CLIENT_ERROR_BROKEN_PIPE,
+		             "Couldn't get results from server");
+		g_error_free (inner_error);
+		tracker_result_iterator_free (iterator);
+		dbus_pending_call_unref (data->dbus_call);
+		(* data->iterator_callback) (NULL, error, data->user_data);
+		return;
+	}
+
+	iterator->buffer_index = 0;
+	iterator->rc = iterator_buffer_read_int (iterator);
+
+	/* Reset the iterator internal state */
+	iterator->buffer_index = 0;
+
+	if (iterator->rc == TRACKER_STEROIDS_RC_ROW ||
+	    iterator->rc == TRACKER_STEROIDS_RC_LARGEROW) {
+		iterator->has_next = TRUE;
+	}
+
+	dbus_pending_call_block (data->dbus_call);
+
+	reply = dbus_pending_call_steal_reply (data->dbus_call);
+
+	g_assert (reply);
+
+	if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) {
+		dbus_set_error_from_message (&dbus_error, reply);
+		dbus_set_g_error (&error, &dbus_error);
+		dbus_pending_call_unref (data->dbus_call);
+		(* data->iterator_callback) (NULL, error, data->user_data);
+		return ;
+	}
+
+	dbus_message_unref (reply);
+
+	dbus_pending_call_unref (data->dbus_call);
+
+	(* data->iterator_callback) (iterator, NULL, data->user_data);
+}
+
 /* Deprecated and only used for 0.6 API */
 static void
 callback_with_array (DBusGProxy *proxy,
@@ -828,28 +816,13 @@ find_conversion (const char  *format,
 
 #ifdef HAVE_DBUS_FD_PASSING
 static int
-buffer_read_int (char *buf)
-{
-	int result = 0;
-
-	result += (((unsigned char)*(buf++)));
-	result += (((unsigned char)*(buf++)) <<  8);
-	result += (((unsigned char)*(buf++)) << 16);
-	result += (((unsigned char)*(buf++)) << 24);
-
-	return result;
-}
-
-static int
 iterator_buffer_read_int (TrackerResultIterator *iterator)
 {
-	int result;
+	int v = *((int *)(iterator->buffer + iterator->buffer_index));
 
-	result = buffer_read_int (iterator->buffer + iterator->buffer_index);
+	iterator->buffer_index += 4;
 
-	iterator->buffer_index += sizeof (int);
-
-	return result;
+	return GINT32_FROM_BE (v);
 }
 
 static DBusMessage*
@@ -1495,11 +1468,11 @@ tracker_resources_sparql_query_iterate (TrackerClient  *client,
 	iterator = g_slice_new0 (TrackerResultIterator);
 	input_stream = g_unix_input_stream_new (pipefd[0], TRUE);
 	iterator_output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
-	g_output_stream_splice (iterator_output_stream,
-	                        input_stream,
-	                        G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
-	                        NULL,
-	                        &inner_error);
+	iterator->buffer_size = g_output_stream_splice (iterator_output_stream,
+	                                                input_stream,
+	                                                G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+	                                                NULL,
+	                                                &inner_error);
 	iterator->buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (iterator_output_stream));
 
 	g_object_unref (input_stream);
@@ -1663,21 +1636,29 @@ tracker_result_iterator_next (TrackerResultIterator  *iterator)
 {
 #ifdef HAVE_DBUS_FD_PASSING
 	int nextrc;
+	int last_offset;
 
 	iterator->rc = iterator_buffer_read_int (iterator);
 	switch (iterator->rc) {
-	case TRACKER_STEROIDS_RC_LARGEROW:
-		/* Skip row size int */
-		iterator_buffer_read_int (iterator);
 	case TRACKER_STEROIDS_RC_ROW:
 		iterator->n_columns = iterator_buffer_read_int (iterator);
 		iterator->offsets = (int *)(iterator->buffer + iterator->buffer_index);
-		iterator->buffer_index += iterator->n_columns * sizeof (int);
+		iterator->buffer_index += sizeof (int) * (iterator->n_columns - 1);
+		last_offset = iterator_buffer_read_int (iterator);
 		iterator->data = iterator->buffer + iterator->buffer_index;
-		iterator->buffer_index += iterator->offsets[iterator->n_columns - 1] + 1;
+		iterator->buffer_index += last_offset + 1;
+
+		nextrc = iterator_buffer_read_int (iterator);
+		iterator->buffer_index -= 4;
 
-		nextrc = buffer_read_int (iterator->buffer + iterator->buffer_index);
-		iterator->has_next = (nextrc == TRACKER_STEROIDS_RC_ROW || nextrc == TRACKER_STEROIDS_RC_LARGEROW);
+		if (nextrc == TRACKER_STEROIDS_RC_ROW || nextrc == TRACKER_STEROIDS_RC_LARGEROW) {
+			iterator->has_next = TRUE;
+		} else if (nextrc == TRACKER_STEROIDS_RC_DONE) {
+			iterator->has_next = FALSE;
+		} else {
+			g_critical ("Invalid row code %d", nextrc);
+			iterator->has_next = FALSE;
+		}
 		break;
 	case TRACKER_STEROIDS_RC_DONE:
 		break;
@@ -2069,30 +2050,73 @@ tracker_resources_sparql_query_iterate_async (TrackerClient         *client,
                                               gpointer               user_data)
 {
 	TrackerClientPrivate *private;
-	FastAsyncData *data;
-	GError *error = NULL;
+	DBusConnection *connection;
+	DBusMessage *message;
+	DBusMessageIter iter;
+	DBusPendingCall *call;
+	DBusError dbus_error;
+	TrackerResultIterator *iterator;
+	int pipefd[2];
+	GInputStream *input_stream;
+	GOutputStream *iterator_output_stream;
+	FastAsyncData *async_data;
 
 	g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
-	g_return_val_if_fail (query != NULL, 0);
-	g_return_val_if_fail (callback != NULL, 0);
+	g_return_val_if_fail (query, 0);
 
 	private = TRACKER_CLIENT_GET_PRIVATE (client);
 
-	data = g_slice_new0 (FastAsyncData);
-	data->operation = FAST_QUERY;
-	data->query = g_strdup (query);
-	data->iterator_callback = callback;
-	data->user_data = user_data;
+	if (pipe (pipefd) < 0) {
+		g_critical ("Cannot open pipe");
+		return 0;
+	}
 
-	g_thread_pool_push (private->thread_pool, data, &error);
+	connection = dbus_g_connection_get_connection (private->connection);
 
-	if (error) {
-		g_critical ("Could not create thread: %s", error->message);
-		g_error_free (error);
+	dbus_error_init (&dbus_error);
+
+	message = dbus_message_new_method_call (TRACKER_STEROIDS_SERVICE,
+	                                        TRACKER_STEROIDS_PATH,
+	                                        TRACKER_STEROIDS_INTERFACE,
+	                                        "Query");
+
+	dbus_message_iter_init_append (message, &iter);
+	dbus_message_iter_append_basic (&iter, DBUS_TYPE_STRING, &query);
+	dbus_message_iter_append_basic (&iter, DBUS_TYPE_UNIX_FD, &pipefd[1]);
+
+	dbus_connection_send_with_reply (connection,
+	                                 message,
+	                                 &call,
+	                                 -1);
+	dbus_message_unref (message);
+	close (pipefd[1]);
+
+	if (!call) {
+		g_critical ("FD passing unsupported or connection disconnected");
 		return 0;
 	}
 
-	return 0;
+	async_data = g_slice_new0 (FastAsyncData);
+
+	iterator = g_slice_new0 (TrackerResultIterator);
+	input_stream = g_unix_input_stream_new (pipefd[0], TRUE);
+	iterator_output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
+
+	async_data->result_iterator = iterator;
+	async_data->input_stream = input_stream;
+	async_data->output_stream = iterator_output_stream;
+	async_data->dbus_call = call;
+	async_data->iterator_callback = callback;
+	async_data->user_data = user_data;
+
+	g_output_stream_splice_async (iterator_output_stream,
+	                              input_stream,
+	                              G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+	                              0,
+	                              NULL,
+	                              fast_async_callback_iterator,
+	                              async_data);
+	return 42;
 }
 
 /**
@@ -2146,30 +2170,6 @@ tracker_resources_sparql_update_fast_async (TrackerClient    *client,
                                             TrackerReplyVoid  callback,
                                             gpointer          user_data)
 {
-	TrackerClientPrivate *private;
-	FastAsyncData *data;
-	GError *error = NULL;
-
-	g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
-	g_return_val_if_fail (query != NULL, 0);
-	g_return_val_if_fail (callback != NULL, 0);
-
-	private = TRACKER_CLIENT_GET_PRIVATE (client);
-
-	data = g_slice_new0 (FastAsyncData);
-	data->operation = FAST_UPDATE;
-	data->query = g_strdup (query);
-	data->void_callback = callback;
-	data->user_data = user_data;
-
-	g_thread_pool_push (private->thread_pool, data, &error);
-
-	if (error) {
-		g_critical ("Could not create thread: %s", error->message);
-		g_error_free (error);
-		return 0;
-	}
-
 	return 0;
 }
 
@@ -2210,30 +2210,6 @@ tracker_resources_sparql_update_blank_fast_async (TrackerClient         *client,
                                                   TrackerReplyGPtrArray  callback,
                                                   gpointer               user_data)
 {
-	TrackerClientPrivate *private;
-	FastAsyncData *data;
-	GError *error = NULL;
-
-	g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
-	g_return_val_if_fail (query != NULL, 0);
-	g_return_val_if_fail (callback != NULL, 0);
-
-	private = TRACKER_CLIENT_GET_PRIVATE (client);
-
-	data = g_slice_new0 (FastAsyncData);
-	data->operation = FAST_UPDATE_BLANK;
-	data->query = g_strdup (query);
-	data->gptrarray_callback = callback;
-	data->user_data = user_data;
-
-	g_thread_pool_push (private->thread_pool, data, &error);
-
-	if (error) {
-		g_critical ("Could not create thread: %s", error->message);
-		g_error_free (error);
-		return 0;
-	}
-
 	return 0;
 }
 
@@ -2288,30 +2264,6 @@ tracker_resources_batch_sparql_update_fast_async (TrackerClient    *client,
                                                   TrackerReplyVoid  callback,
                                                   gpointer          user_data)
 {
-	TrackerClientPrivate *private;
-	FastAsyncData *data;
-	GError *error = NULL;
-
-	g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
-	g_return_val_if_fail (query != NULL, 0);
-	g_return_val_if_fail (callback != NULL, 0);
-
-	private = TRACKER_CLIENT_GET_PRIVATE (client);
-
-	data = g_slice_new0 (FastAsyncData);
-	data->operation = FAST_UPDATE_BLANK;
-	data->query = g_strdup (query);
-	data->void_callback = callback;
-	data->user_data = user_data;
-
-	g_thread_pool_push (private->thread_pool, data, &error);
-
-	if (error) {
-		g_critical ("Could not create thread: %s", error->message);
-		g_error_free (error);
-		return 0;
-	}
-
 	return 0;
 }
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]