[tracker/dbus-fd-experiment-gio: 2/6] Steroids client: port to GIO



commit a106d0e0b7f7f2a1fb9138a13ba025bd81860614
Author: Adrien Bustany <abustany gnome org>
Date:   Wed Jun 9 12:17:24 2010 -0400

    Steroids client: port to GIO

 configure.ac                      |    2 +-
 src/libtracker-client/Makefile.am |    2 +
 src/libtracker-client/tracker.c   |  228 ++++++++-----------------------------
 3 files changed, 51 insertions(+), 181 deletions(-)
---
diff --git a/configure.ac b/configure.ac
index 5809854..6c5cb20 100644
--- a/configure.ac
+++ b/configure.ac
@@ -183,7 +183,7 @@ PKG_CHECK_MODULES(GMODULE, [gmodule-2.0 >= $GLIB_REQUIRED])
 AC_SUBST(GMODULE_CFLAGS)
 AC_SUBST(GMODULE_LIBS)
 
-PKG_CHECK_MODULES(GIO, [gio-2.0 >= $GLIB_REQUIRED])
+PKG_CHECK_MODULES(GIO, [gio-2.0 >= $GLIB_REQUIRED gio-unix-2.0 >= $GLIB_REQUIRED])
 AC_SUBST(GIO_CFLAGS)
 AC_SUBST(GIO_LIBS)
 
diff --git a/src/libtracker-client/Makefile.am b/src/libtracker-client/Makefile.am
index d3b2b83..dee975b 100644
--- a/src/libtracker-client/Makefile.am
+++ b/src/libtracker-client/Makefile.am
@@ -8,6 +8,7 @@ INCLUDES = 						\
 	-I$(top_builddir)/src/libtracker-client		\
 	$(WARN_CFLAGS)					\
 	$(GLIB2_CFLAGS)					\
+	$(GIO_CFLAGS)					\
 	$(GCOV_CFLAGS)					\
 	$(DBUS_CFLAGS)
 
@@ -32,6 +33,7 @@ libtracker_client_ TRACKER_API_VERSION@_la_LDFLAGS = 	\
 
 libtracker_client_ TRACKER_API_VERSION@_la_LIBADD =	\
 	$(GLIB2_LIBS) 					\
+	$(GIO_LIBS)					\
 	$(DBUS_LIBS)					\
 	$(GCOV_LIBS)					\
 	$(GOBJECT_LIBS)
diff --git a/src/libtracker-client/tracker.c b/src/libtracker-client/tracker.c
index 0e24620..11a3fd7 100644
--- a/src/libtracker-client/tracker.c
+++ b/src/libtracker-client/tracker.c
@@ -26,6 +26,10 @@
 #include <dbus/dbus-glib-lowlevel.h>
 #include <dbus/dbus-glib-bindings.h>
 
+#include <gio/gio.h>
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+
 #include <libtracker-common/tracker-dbus.h>
 #include <tracker-store/tracker-steroids.h>
 
@@ -158,13 +162,9 @@ typedef struct {
 
 struct TrackerResultIterator {
 #ifdef HAVE_DBUS_FD_PASSING
-	int fd;
 	int rc;
 	char *buffer;
-	int message_size;
-	int buffer_size;
 	int buffer_index;
-	char *large_row_buf;
 
 	guint  n_columns;
 	int   *offsets;
@@ -840,15 +840,6 @@ buffer_read_int (char *buf)
 	return result;
 }
 
-static void
-buffer_write_int (char *buf, int value)
-{
-	memset (buf++, (value      ) & 0xff, sizeof (char));
-	memset (buf++, (value >>  8) & 0xff, sizeof (char));
-	memset (buf++, (value >> 16) & 0xff, sizeof (char));
-	memset (buf++, (value >> 24) & 0xff, sizeof (char));
-}
-
 static int
 iterator_buffer_read_int (TrackerResultIterator *iterator)
 {
@@ -861,134 +852,6 @@ iterator_buffer_read_int (TrackerResultIterator *iterator)
 	return result;
 }
 
-static gboolean
-pipe_read (int fd, char *dst, int size)
-{
-	ssize_t readsofar = 0;
-	ssize_t ret;
-
-	while (readsofar < size) {
-		ret = read (fd,
-		            dst + readsofar,
-		            size - readsofar);
-		if (ret < 0) {
-			switch (errno) {
-			case EAGAIN:
-				break;
-			case EPIPE:
-				g_critical ("SIGPIPE in pipe_read");
-				return FALSE;
-			default:
-				g_critical ("write returned %d in pipe_read", errno);
-				return FALSE;
-			}
-		} else {
-			readsofar += ret;
-		}
-	}
-
-	return TRUE;
-}
-
-static gboolean
-pipe_write (int fd, const char *src, int size)
-{
-	ssize_t sent = 0;
-	ssize_t ret;
-
-	while (sent < size) {
-		ret = write (fd,
-		             src + sent,
-		             size - sent);
-		if (ret < 0) {
-			switch (errno) {
-			case EAGAIN:
-				break;
-			case EPIPE:
-				g_critical ("SIGPIPE in pipe_write");
-				return FALSE;
-			default:
-				g_critical ("write returned %d in pipe_write", errno);
-				return FALSE;
-			}
-		} else {
-			sent += ret;
-		}
-	}
-
-	return TRUE;
-}
-
-static int
-iterator_buffer_fill (TrackerResultIterator *iterator)
-{
-	int rc;
-	int n_columns;
-	int *offsets;
-	int row_size;
-	int rows = 0;
-	gboolean force_page_load = FALSE;
-
-	for (;;) {
-		iterator->message_size += TRACKER_STEROIDS_BUFFER_SIZE;
-
-		while (iterator->message_size > iterator->buffer_size) {
-			iterator->buffer = g_realloc (iterator->buffer, 2 * iterator->buffer_size);
-			iterator->buffer_size *= 2;
-		}
-
-		if (!pipe_read (iterator->fd,
-		                iterator->buffer + iterator->buffer_index,
-		                TRACKER_STEROIDS_BUFFER_SIZE)) {
-			return -1;
-		}
-
-		for (;;) {
-			if ((unsigned char) (*(iterator->buffer + iterator->buffer_index)) == TRACKER_STEROIDS_EOP ||
-			    force_page_load) {
-				force_page_load = FALSE;
-				break;
-			}
-
-			rc = iterator_buffer_read_int (iterator);
-
-			switch (rc) {
-				case TRACKER_STEROIDS_RC_LARGEROW:
-					row_size = iterator_buffer_read_int (iterator);
-
-					while (iterator->buffer_size - iterator->buffer_index < row_size) {
-						iterator->buffer = g_realloc (iterator->buffer, 2 * iterator->buffer_size);
-						iterator->buffer_size *= 2;
-					}
-
-					if (!pipe_read (iterator->fd,
-					                iterator->buffer + iterator->buffer_index + TRACKER_STEROIDS_BUFFER_SIZE - 2 * sizeof (int),
-					                row_size - TRACKER_STEROIDS_BUFFER_SIZE + 2 * sizeof (int))) {
-						return -1;
-					}
-
-					force_page_load = TRUE;
-
-
-					/* Fall through to normal row handling */
-				case TRACKER_STEROIDS_RC_ROW:
-					n_columns = iterator_buffer_read_int (iterator);
-					iterator->n_columns = n_columns;
-					offsets = (int *) (iterator->buffer + iterator->buffer_index);
-					iterator->buffer_index += sizeof (int) * n_columns;
-					iterator->buffer_index += offsets[iterator->n_columns - 1] + 1;
-					rows ++;
-					break;
-				default:
-					goto end_of_results;
-			}
-		}
-	}
-
-end_of_results:
-	return rows;
-}
-
 static DBusMessage*
 sparql_update_fast (TrackerClient      *client,
                     const gchar        *query,
@@ -1004,9 +867,9 @@ sparql_update_fast (TrackerClient      *client,
 	DBusPendingCall *call;
 	DBusError dbus_error;
 	int pipefd[2];
-	char *query_size_buffer;
-	int query_len;
-	int query_index;
+	GOutputStream *output_stream;
+	GDataOutputStream *data_output_stream;
+	GError *inner_error = NULL;
 
 	g_return_val_if_fail (TRACKER_IS_CLIENT (client), NULL);
 	g_return_val_if_fail (query != NULL, NULL);
@@ -1060,34 +923,37 @@ sparql_update_fast (TrackerClient      *client,
 		return NULL;
 	}
 
-	query_size_buffer = g_malloc (sizeof (int));
-
-	/* We don't need to null terminate, the store will do it for us */
-	query_len = strlen (query);
-	query_index = 0;
-	buffer_write_int (query_size_buffer, query_len);
+	output_stream = g_unix_output_stream_new (pipefd[1], TRUE);
+	data_output_stream = g_data_output_stream_new (output_stream);
 
-	pipe_write (pipefd[1], query_size_buffer, sizeof (int));
+	g_data_output_stream_put_int32 (data_output_stream,
+	                                strlen (query),
+	                                NULL,
+	                                &inner_error);
 
-	if ((sizeof (int) + query_len) > TRACKER_STEROIDS_BUFFER_SIZE) {
-		pipe_write (pipefd[1], query, TRACKER_STEROIDS_BUFFER_SIZE - sizeof (int));
-		query_index += TRACKER_STEROIDS_BUFFER_SIZE - sizeof (int);
+	if (inner_error) {
+		g_propagate_error (error, inner_error);
+		g_object_unref (data_output_stream);
+		g_object_unref (output_stream);
+		return NULL;
 	}
 
-	while (query_index < query_len) {
-		int to_send = MIN(TRACKER_STEROIDS_BUFFER_SIZE, query_len - query_index);
-		pipe_write (pipefd[1], query + query_index, to_send);
-		query_index += to_send;
-	}
+	g_data_output_stream_put_string (data_output_stream,
+	                                 query,
+	                                 NULL,
+	                                 &inner_error);
 
-	close (pipefd[1]);
+	if (inner_error) {
+		g_propagate_error (error, inner_error);
+		g_object_unref (data_output_stream);
+		g_object_unref (output_stream);
+		return NULL;
+	}
 
-	g_free (query_size_buffer);
+	g_object_unref (data_output_stream);
+	g_object_unref (output_stream);
 
-	/* dbus_pending_call_block (call); */
-	while (!dbus_pending_call_get_completed (call)) {
-		g_usleep (NOT_TOO_SHORT_DELAY);
-	}
+	dbus_pending_call_block (call);
 
 	reply = dbus_pending_call_steal_reply (call);
 
@@ -1581,6 +1447,9 @@ tracker_resources_sparql_query_iterate (TrackerClient  *client,
 	DBusError dbus_error;
 	TrackerResultIterator *iterator;
 	int pipefd[2];
+	GInputStream *input_stream;
+	GOutputStream *iterator_output_stream;
+	GError *inner_error = NULL;
 
 	g_return_val_if_fail (TRACKER_IS_CLIENT (client), NULL);
 	g_return_val_if_fail (query, NULL);
@@ -1624,15 +1493,24 @@ tracker_resources_sparql_query_iterate (TrackerClient  *client,
 	}
 
 	iterator = g_slice_new0 (TrackerResultIterator);
-	iterator->fd = pipefd[0];
-	iterator->buffer_size = TRACKER_STEROIDS_BUFFER_SIZE;
-	iterator->buffer = g_malloc (TRACKER_STEROIDS_BUFFER_SIZE);
+	input_stream = g_unix_input_stream_new (pipefd[0], TRUE);
+	iterator_output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
+	g_output_stream_splice (iterator_output_stream,
+	                        input_stream,
+	                        G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+	                        NULL,
+	                        &inner_error);
+	iterator->buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (iterator_output_stream));
+
+	g_object_unref (input_stream);
+	g_object_unref (iterator_output_stream);
 
-	if (iterator_buffer_fill (iterator) < 0) {
+	if (inner_error) {
 		g_set_error (error,
 		             TRACKER_CLIENT_ERROR,
 		             TRACKER_CLIENT_ERROR_BROKEN_PIPE,
 		             "Couldn't get results from server");
+		g_error_free (inner_error);
 		tracker_result_iterator_free (iterator);
 		return NULL;
 	}
@@ -1648,17 +1526,7 @@ tracker_resources_sparql_query_iterate (TrackerClient  *client,
 		iterator->has_next = TRUE;
 	}
 
-	/* dbus_pending_call_block (call); */
-	/* This is a ugly workaround for a race condition in libdbus when you're
-	 * calling dbus_pending_call_block from a thread which is not the thread
-	 * where the DBus dispatch call happens. This is the case when the function
-	 * is run from tracker_resources_sparql_query_iterate_async.
-	 * See http://lists.freedesktop.org/archives/dbus/2009-March/011105.html
-	 * for a lengthy explication
-	 */
-	while (!dbus_pending_call_get_completed (call)) {
-		g_usleep (NOT_TOO_SHORT_DELAY);
-	}
+	dbus_pending_call_block (call);
 
 	reply = dbus_pending_call_steal_reply (call);
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]