[tracker/extractor-dbus-fd: 4/7] libtracker-client: use tracker_dbus_send_and_splice



commit f8ebc796078d2e689dbb304ceb892a6fc9610be6
Author: Adrien Bustany <abustany gnome org>
Date:   Fri Jul 2 13:33:03 2010 +0200

    libtracker-client: use tracker_dbus_send_and_splice

 src/libtracker-client/Makefile.am |    1 +
 src/libtracker-client/tracker.c   |  192 +++++--------------------------------
 2 files changed, 25 insertions(+), 168 deletions(-)
---
diff --git a/src/libtracker-client/Makefile.am b/src/libtracker-client/Makefile.am
index dee975b..2ff5c9f 100644
--- a/src/libtracker-client/Makefile.am
+++ b/src/libtracker-client/Makefile.am
@@ -32,6 +32,7 @@ libtracker_client_ TRACKER_API_VERSION@_la_LDFLAGS = 	\
 	-export-symbols-regex '^tracker_.*'
 
 libtracker_client_ TRACKER_API_VERSION@_la_LIBADD =	\
+	$(top_srcdir)/src/libtracker-common/libtracker-common.la \
 	$(GLIB2_LIBS) 					\
 	$(GIO_LIBS)					\
 	$(DBUS_LIBS)					\
diff --git a/src/libtracker-client/tracker.c b/src/libtracker-client/tracker.c
index 0b291d7..0a422b5 100644
--- a/src/libtracker-client/tracker.c
+++ b/src/libtracker-client/tracker.c
@@ -190,8 +190,6 @@ typedef struct {
 	guint request_id;
 	FastOperationType operation_type;
 
-	GInputStream *input_stream;
-	GOutputStream *output_stream;
 	GCancellable *cancellable;
 
 	DBusPendingCall *dbus_call;
@@ -350,11 +348,6 @@ fast_async_data_free (gpointer data)
 			g_object_unref (fad->cancellable);
 		}
 
-		if (fad->dbus_call) {
-			dbus_pending_call_cancel (fad->dbus_call);
-			dbus_pending_call_unref (fad->dbus_call);
-		}
-
 		if (fad->client) {
 			g_object_unref (fad->client);
 		}
@@ -366,10 +359,7 @@ fast_async_data_free (gpointer data)
 static FastAsyncData *
 fast_async_data_new (TrackerClient     *client,
                      FastOperationType  operation_type,
-                     GInputStream      *input_stream,
-                     GOutputStream     *output_stream,
                      GCancellable      *cancellable,
-                     DBusPendingCall   *dbus_call,
                      gpointer           user_data)
 {
 	FastAsyncData *data;
@@ -380,10 +370,7 @@ fast_async_data_new (TrackerClient     *client,
 	data->request_id = fast_pending_call_new (client, cancellable, data);
 	data->operation_type = operation_type;
 	data->result_iterator = g_slice_new0 (TrackerResultIterator);
-	data->input_stream = input_stream;
-	data->output_stream = output_stream;
 	data->cancellable = cancellable;
-	data->dbus_call = dbus_call;
 	data->user_data = user_data;
 
 	return data;
@@ -650,18 +637,15 @@ iterator_buffer_read_int (TrackerResultIterator *iterator)
 }
 
 static void
-callback_iterator (GObject      *source_object,
-                   GAsyncResult *result,
-                   gpointer      user_data)
+callback_iterator (void     *buffer,
+                   gssize    buffer_size,
+                   GError   *error,
+                   gpointer  user_data)
 {
 	TrackerClientPrivate *private;
-	DBusMessage *reply = NULL;
-	GError *error = NULL;
 	FastAsyncData *fad;
 	TrackerResultIterator *iterator;
-	GInputStream *base_input_stream;
 
-	/* Clean up pending calls */
 	fad = user_data;
 
 	private = TRACKER_CLIENT_GET_PRIVATE (fad->client);
@@ -671,42 +655,13 @@ callback_iterator (GObject      *source_object,
 	/* Reset the iterator internal state */
 	iterator = fad->result_iterator;
 
-	iterator->buffer_size = g_output_stream_splice_finish (fad->output_stream,
-	                                                       result,
-	                                                       &error);
-	iterator->buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (fad->output_stream));
+	iterator->buffer = buffer;
+	iterator->buffer_size = buffer_size;
 	iterator->buffer_index = 0;
 
-	/* Clean up streams */
-	base_input_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (fad->input_stream));
-	g_object_unref (fad->input_stream);
-	g_object_unref (fad->output_stream);
-	g_object_unref (base_input_stream);
-
 	/* Check for errors */
 	if (G_LIKELY (!error)) {
-		/* Wait for any current d-bus call to finish */
-		dbus_pending_call_block (fad->dbus_call);
-
-		/* Check we didn't get an error */
-		reply = dbus_pending_call_steal_reply (fad->dbus_call);
-
-		if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) {
-			DBusError dbus_error;
-
-			dbus_error_init (&dbus_error);
-			dbus_set_error_from_message (&dbus_error, reply);
-			dbus_set_g_error (&error, &dbus_error);
-
-			(* fad->iterator_callback) (NULL, error, fad->user_data);
-
-			dbus_error_free (&dbus_error);
-		} else {
-			/* Call iterator callback */
-			fad->iterator_returned = TRUE;
-
-			(* fad->iterator_callback) (iterator, NULL, fad->user_data);
-		}
+		(* fad->iterator_callback) (iterator, NULL, fad->user_data);
 	} else {
 		if (error->code != G_IO_ERROR_CANCELLED) {
 			g_clear_error (&error);
@@ -717,13 +672,6 @@ callback_iterator (GObject      *source_object,
 
 			(* fad->iterator_callback) (NULL, error, fad->user_data);
 		}
-
-		g_error_free (error);
-	}
-
-	/* Clean up */
-	if (reply) {
-		dbus_message_unref (reply);
 	}
 
 	fast_async_data_free (fad);
@@ -1782,12 +1730,7 @@ tracker_resources_sparql_query_iterate (TrackerClient  *client,
 	DBusConnection *connection;
 	DBusMessage *message;
 	DBusMessageIter iter;
-	DBusMessage *reply = NULL;
-	DBusPendingCall *call;
 	int pipefd[2];
-	GInputStream *input_stream;
-	GInputStream *buffered_input_stream;
-	GOutputStream *iterator_output_stream;
 	GError *inner_error = NULL;
 
 	g_return_val_if_fail (TRACKER_IS_CLIENT (client), NULL);
@@ -1813,79 +1756,26 @@ tracker_resources_sparql_query_iterate (TrackerClient  *client,
 	dbus_message_iter_init_append (message, &iter);
 	dbus_message_iter_append_basic (&iter, DBUS_TYPE_STRING, &query);
 	dbus_message_iter_append_basic (&iter, DBUS_TYPE_UNIX_FD, &pipefd[1]);
-
-	dbus_connection_send_with_reply (connection,
-	                                 message,
-	                                 &call,
-	                                 -1);
-	dbus_message_unref (message);
 	close (pipefd[1]);
 
-	if (!call) {
-		g_set_error (error,
-		             TRACKER_CLIENT_ERROR,
-		             TRACKER_CLIENT_ERROR_UNSUPPORTED,
-		             "FD passing unsupported or connection disconnected");
-		return NULL;
-	}
-
-	input_stream = g_unix_input_stream_new (pipefd[0], TRUE);
-	buffered_input_stream = g_buffered_input_stream_new_sized (input_stream,
-	                                                           TRACKER_STEROIDS_BUFFER_SIZE);
-	iterator_output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
 
-	/* Reset the iterator internal state */
 	iterator = g_slice_new0 (TrackerResultIterator);
-	iterator->buffer_size = g_output_stream_splice (iterator_output_stream,
-	                                                buffered_input_stream,
-	                                                G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
-	                                                G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
-	                                                NULL,
-	                                                &inner_error);
-	iterator->buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (iterator_output_stream));
-	iterator->buffer_index = 0;
-
-	/* Clean up streams */
-	g_object_unref (buffered_input_stream);
-	g_object_unref (iterator_output_stream);
-	g_object_unref (input_stream);
-
-	if (G_LIKELY (!inner_error)) {
-		/* Wait for any current d-bus call to finish */
-		dbus_pending_call_block (call);
-
-		/* Check we didn't get an error */
-		reply = dbus_pending_call_steal_reply (call);
 
-		if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) {
-			DBusError dbus_error;
-
-			dbus_error_init (&dbus_error);
-			dbus_set_error_from_message (&dbus_error, reply);
-			dbus_set_g_error (error, &dbus_error);
-			dbus_error_free (&dbus_error);
-
-			tracker_result_iterator_free (iterator);
-			iterator = NULL;
-		}
-	} else {
-		g_set_error (error,
-		             TRACKER_CLIENT_ERROR,
-		             TRACKER_CLIENT_ERROR_BROKEN_PIPE,
-		             "Couldn't get results from server");
-		g_error_free (inner_error);
+	tracker_dbus_send_and_splice (connection,
+	                              message,
+	                              pipefd[0],
+	                              NULL,
+	                              (void **) &iterator->buffer,
+	                              &iterator->buffer_size,
+	                              &inner_error);
+	/* message is destroyed by tracker_dbus_send_and_splice */
 
-		tracker_result_iterator_free (iterator);
+	if (G_UNLIKELY (inner_error)) {
+		g_propagate_error (error, inner_error);
+		g_slice_free (TrackerResultIterator, iterator);
 		iterator = NULL;
 	}
 
-	/* Clean up */
-	if (reply) {
-		dbus_message_unref (reply);
-	}
-
-	dbus_pending_call_unref (call);
-
 	return iterator;
 #else  /* HAVE_DBUS_FD_PASSING */
 	TrackerResultIterator *iterator;
@@ -2369,11 +2259,7 @@ tracker_resources_sparql_query_iterate_async (TrackerClient         *client,
 	DBusConnection *connection;
 	DBusMessage *message;
 	DBusMessageIter iter;
-	DBusPendingCall *call;
 	int pipefd[2];
-	GInputStream *input_stream;
-	GInputStream *buffered_input_stream;
-	GOutputStream *iterator_output_stream;
 	GCancellable *cancellable;
 	FastAsyncData *fad;
 
@@ -2412,43 +2298,22 @@ tracker_resources_sparql_query_iterate_async (TrackerClient         *client,
 	dbus_message_iter_append_basic (&iter, DBUS_TYPE_STRING, &query);
 	dbus_message_iter_append_basic (&iter, DBUS_TYPE_UNIX_FD, &pipefd[1]);
 
-	dbus_connection_send_with_reply (connection,
-	                                 message,
-	                                 &call,
-	                                 -1);
-	dbus_message_unref (message);
-
-	/* FIXME: Why do we close this? */
 	close (pipefd[1]);
 
-	if (!call) {
-		g_critical ("FD passing unsupported or connection disconnected");
-		return 0;
-	}
-
-	input_stream = g_unix_input_stream_new (pipefd[0], TRUE);
-	buffered_input_stream = g_buffered_input_stream_new_sized (input_stream,
-	                                                           TRACKER_STEROIDS_BUFFER_SIZE);
-	iterator_output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
 	cancellable = g_cancellable_new ();
 
 	fad = fast_async_data_new (client,
 	                           FAST_QUERY,
-	                           buffered_input_stream,
-	                           iterator_output_stream,
 	                           cancellable,
-	                           call,
 	                           user_data);
 	fad->iterator_callback = callback;
 
-	g_output_stream_splice_async (iterator_output_stream,
-	                              buffered_input_stream,
-	                              G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
-	                              G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
-	                              0,
-	                              cancellable,
-	                              callback_iterator,
-	                              fad);
+	tracker_dbus_send_and_splice_async (connection,
+	                                    message,
+	                                    pipefd[0],
+	                                    cancellable,
+	                                    callback_iterator,
+	                                    fad);
 
 	return fad->request_id;
 #else  /* HAVE_DBUS_FD_PASSING */
@@ -2482,9 +2347,6 @@ tracker_resources_sparql_update_async (TrackerClient    *client,
 	fad = fast_async_data_new (client,
 	                           FAST_UPDATE,
 	                           NULL,
-	                           NULL,
-	                           NULL,
-	                           NULL,
 	                           user_data);
 	fad->void_callback = callback;
 
@@ -2544,9 +2406,6 @@ tracker_resources_sparql_update_blank_async (TrackerClient         *client,
 	fad = fast_async_data_new (client,
 	                           FAST_UPDATE_BLANK,
 	                           NULL,
-	                           NULL,
-	                           NULL,
-	                           NULL,
 	                           user_data);
 	fad->gptrarray_callback = callback;
 
@@ -2620,9 +2479,6 @@ tracker_resources_batch_sparql_update_async (TrackerClient    *client,
 	fad = fast_async_data_new (client,
 	                           FAST_UPDATE_BATCH,
 	                           NULL,
-	                           NULL,
-	                           NULL,
-	                           NULL,
 	                           user_data);
 	fad->void_callback = callback;
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]