[tracker/dbus-fd-experiment-gio: 2/11] Steroids client: port to GIO
- From: Adrien Bustany <abustany src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/dbus-fd-experiment-gio: 2/11] Steroids client: port to GIO
- Date: Thu, 10 Jun 2010 22:32:48 +0000 (UTC)
commit a106d0e0b7f7f2a1fb9138a13ba025bd81860614
Author: Adrien Bustany <abustany gnome org>
Date: Wed Jun 9 12:17:24 2010 -0400
Steroids client: port to GIO
configure.ac | 2 +-
src/libtracker-client/Makefile.am | 2 +
src/libtracker-client/tracker.c | 228 ++++++++-----------------------------
3 files changed, 51 insertions(+), 181 deletions(-)
---
diff --git a/configure.ac b/configure.ac
index 5809854..6c5cb20 100644
--- a/configure.ac
+++ b/configure.ac
@@ -183,7 +183,7 @@ PKG_CHECK_MODULES(GMODULE, [gmodule-2.0 >= $GLIB_REQUIRED])
AC_SUBST(GMODULE_CFLAGS)
AC_SUBST(GMODULE_LIBS)
-PKG_CHECK_MODULES(GIO, [gio-2.0 >= $GLIB_REQUIRED])
+PKG_CHECK_MODULES(GIO, [gio-2.0 >= $GLIB_REQUIRED gio-unix-2.0 >= $GLIB_REQUIRED])
AC_SUBST(GIO_CFLAGS)
AC_SUBST(GIO_LIBS)
diff --git a/src/libtracker-client/Makefile.am b/src/libtracker-client/Makefile.am
index d3b2b83..dee975b 100644
--- a/src/libtracker-client/Makefile.am
+++ b/src/libtracker-client/Makefile.am
@@ -8,6 +8,7 @@ INCLUDES = \
-I$(top_builddir)/src/libtracker-client \
$(WARN_CFLAGS) \
$(GLIB2_CFLAGS) \
+ $(GIO_CFLAGS) \
$(GCOV_CFLAGS) \
$(DBUS_CFLAGS)
@@ -32,6 +33,7 @@ libtracker_client_ TRACKER_API_VERSION@_la_LDFLAGS = \
libtracker_client_ TRACKER_API_VERSION@_la_LIBADD = \
$(GLIB2_LIBS) \
+ $(GIO_LIBS) \
$(DBUS_LIBS) \
$(GCOV_LIBS) \
$(GOBJECT_LIBS)
diff --git a/src/libtracker-client/tracker.c b/src/libtracker-client/tracker.c
index 0e24620..11a3fd7 100644
--- a/src/libtracker-client/tracker.c
+++ b/src/libtracker-client/tracker.c
@@ -26,6 +26,10 @@
#include <dbus/dbus-glib-lowlevel.h>
#include <dbus/dbus-glib-bindings.h>
+#include <gio/gio.h>
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+
#include <libtracker-common/tracker-dbus.h>
#include <tracker-store/tracker-steroids.h>
@@ -158,13 +162,9 @@ typedef struct {
struct TrackerResultIterator {
#ifdef HAVE_DBUS_FD_PASSING
- int fd;
int rc;
char *buffer;
- int message_size;
- int buffer_size;
int buffer_index;
- char *large_row_buf;
guint n_columns;
int *offsets;
@@ -840,15 +840,6 @@ buffer_read_int (char *buf)
return result;
}
-static void
-buffer_write_int (char *buf, int value)
-{
- memset (buf++, (value ) & 0xff, sizeof (char));
- memset (buf++, (value >> 8) & 0xff, sizeof (char));
- memset (buf++, (value >> 16) & 0xff, sizeof (char));
- memset (buf++, (value >> 24) & 0xff, sizeof (char));
-}
-
static int
iterator_buffer_read_int (TrackerResultIterator *iterator)
{
@@ -861,134 +852,6 @@ iterator_buffer_read_int (TrackerResultIterator *iterator)
return result;
}
-static gboolean
-pipe_read (int fd, char *dst, int size)
-{
- ssize_t readsofar = 0;
- ssize_t ret;
-
- while (readsofar < size) {
- ret = read (fd,
- dst + readsofar,
- size - readsofar);
- if (ret < 0) {
- switch (errno) {
- case EAGAIN:
- break;
- case EPIPE:
- g_critical ("SIGPIPE in pipe_read");
- return FALSE;
- default:
- g_critical ("write returned %d in pipe_read", errno);
- return FALSE;
- }
- } else {
- readsofar += ret;
- }
- }
-
- return TRUE;
-}
-
-static gboolean
-pipe_write (int fd, const char *src, int size)
-{
- ssize_t sent = 0;
- ssize_t ret;
-
- while (sent < size) {
- ret = write (fd,
- src + sent,
- size - sent);
- if (ret < 0) {
- switch (errno) {
- case EAGAIN:
- break;
- case EPIPE:
- g_critical ("SIGPIPE in pipe_write");
- return FALSE;
- default:
- g_critical ("write returned %d in pipe_write", errno);
- return FALSE;
- }
- } else {
- sent += ret;
- }
- }
-
- return TRUE;
-}
-
-static int
-iterator_buffer_fill (TrackerResultIterator *iterator)
-{
- int rc;
- int n_columns;
- int *offsets;
- int row_size;
- int rows = 0;
- gboolean force_page_load = FALSE;
-
- for (;;) {
- iterator->message_size += TRACKER_STEROIDS_BUFFER_SIZE;
-
- while (iterator->message_size > iterator->buffer_size) {
- iterator->buffer = g_realloc (iterator->buffer, 2 * iterator->buffer_size);
- iterator->buffer_size *= 2;
- }
-
- if (!pipe_read (iterator->fd,
- iterator->buffer + iterator->buffer_index,
- TRACKER_STEROIDS_BUFFER_SIZE)) {
- return -1;
- }
-
- for (;;) {
- if ((unsigned char) (*(iterator->buffer + iterator->buffer_index)) == TRACKER_STEROIDS_EOP ||
- force_page_load) {
- force_page_load = FALSE;
- break;
- }
-
- rc = iterator_buffer_read_int (iterator);
-
- switch (rc) {
- case TRACKER_STEROIDS_RC_LARGEROW:
- row_size = iterator_buffer_read_int (iterator);
-
- while (iterator->buffer_size - iterator->buffer_index < row_size) {
- iterator->buffer = g_realloc (iterator->buffer, 2 * iterator->buffer_size);
- iterator->buffer_size *= 2;
- }
-
- if (!pipe_read (iterator->fd,
- iterator->buffer + iterator->buffer_index + TRACKER_STEROIDS_BUFFER_SIZE - 2 * sizeof (int),
- row_size - TRACKER_STEROIDS_BUFFER_SIZE + 2 * sizeof (int))) {
- return -1;
- }
-
- force_page_load = TRUE;
-
-
- /* Fall through to normal row handling */
- case TRACKER_STEROIDS_RC_ROW:
- n_columns = iterator_buffer_read_int (iterator);
- iterator->n_columns = n_columns;
- offsets = (int *) (iterator->buffer + iterator->buffer_index);
- iterator->buffer_index += sizeof (int) * n_columns;
- iterator->buffer_index += offsets[iterator->n_columns - 1] + 1;
- rows ++;
- break;
- default:
- goto end_of_results;
- }
- }
- }
-
-end_of_results:
- return rows;
-}
-
static DBusMessage*
sparql_update_fast (TrackerClient *client,
const gchar *query,
@@ -1004,9 +867,9 @@ sparql_update_fast (TrackerClient *client,
DBusPendingCall *call;
DBusError dbus_error;
int pipefd[2];
- char *query_size_buffer;
- int query_len;
- int query_index;
+ GOutputStream *output_stream;
+ GDataOutputStream *data_output_stream;
+ GError *inner_error = NULL;
g_return_val_if_fail (TRACKER_IS_CLIENT (client), NULL);
g_return_val_if_fail (query != NULL, NULL);
@@ -1060,34 +923,37 @@ sparql_update_fast (TrackerClient *client,
return NULL;
}
- query_size_buffer = g_malloc (sizeof (int));
-
- /* We don't need to null terminate, the store will do it for us */
- query_len = strlen (query);
- query_index = 0;
- buffer_write_int (query_size_buffer, query_len);
+ output_stream = g_unix_output_stream_new (pipefd[1], TRUE);
+ data_output_stream = g_data_output_stream_new (output_stream);
- pipe_write (pipefd[1], query_size_buffer, sizeof (int));
+ g_data_output_stream_put_int32 (data_output_stream,
+ strlen (query),
+ NULL,
+ &inner_error);
- if ((sizeof (int) + query_len) > TRACKER_STEROIDS_BUFFER_SIZE) {
- pipe_write (pipefd[1], query, TRACKER_STEROIDS_BUFFER_SIZE - sizeof (int));
- query_index += TRACKER_STEROIDS_BUFFER_SIZE - sizeof (int);
+ if (inner_error) {
+ g_propagate_error (error, inner_error);
+ g_object_unref (data_output_stream);
+ g_object_unref (output_stream);
+ return NULL;
}
- while (query_index < query_len) {
- int to_send = MIN(TRACKER_STEROIDS_BUFFER_SIZE, query_len - query_index);
- pipe_write (pipefd[1], query + query_index, to_send);
- query_index += to_send;
- }
+ g_data_output_stream_put_string (data_output_stream,
+ query,
+ NULL,
+ &inner_error);
- close (pipefd[1]);
+ if (inner_error) {
+ g_propagate_error (error, inner_error);
+ g_object_unref (data_output_stream);
+ g_object_unref (output_stream);
+ return NULL;
+ }
- g_free (query_size_buffer);
+ g_object_unref (data_output_stream);
+ g_object_unref (output_stream);
- /* dbus_pending_call_block (call); */
- while (!dbus_pending_call_get_completed (call)) {
- g_usleep (NOT_TOO_SHORT_DELAY);
- }
+ dbus_pending_call_block (call);
reply = dbus_pending_call_steal_reply (call);
@@ -1581,6 +1447,9 @@ tracker_resources_sparql_query_iterate (TrackerClient *client,
DBusError dbus_error;
TrackerResultIterator *iterator;
int pipefd[2];
+ GInputStream *input_stream;
+ GOutputStream *iterator_output_stream;
+ GError *inner_error = NULL;
g_return_val_if_fail (TRACKER_IS_CLIENT (client), NULL);
g_return_val_if_fail (query, NULL);
@@ -1624,15 +1493,24 @@ tracker_resources_sparql_query_iterate (TrackerClient *client,
}
iterator = g_slice_new0 (TrackerResultIterator);
- iterator->fd = pipefd[0];
- iterator->buffer_size = TRACKER_STEROIDS_BUFFER_SIZE;
- iterator->buffer = g_malloc (TRACKER_STEROIDS_BUFFER_SIZE);
+ input_stream = g_unix_input_stream_new (pipefd[0], TRUE);
+ iterator_output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
+ g_output_stream_splice (iterator_output_stream,
+ input_stream,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ NULL,
+ &inner_error);
+ iterator->buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (iterator_output_stream));
+
+ g_object_unref (input_stream);
+ g_object_unref (iterator_output_stream);
- if (iterator_buffer_fill (iterator) < 0) {
+ if (inner_error) {
g_set_error (error,
TRACKER_CLIENT_ERROR,
TRACKER_CLIENT_ERROR_BROKEN_PIPE,
"Couldn't get results from server");
+ g_error_free (inner_error);
tracker_result_iterator_free (iterator);
return NULL;
}
@@ -1648,17 +1526,7 @@ tracker_resources_sparql_query_iterate (TrackerClient *client,
iterator->has_next = TRUE;
}
- /* dbus_pending_call_block (call); */
- /* This is a ugly workaround for a race condition in libdbus when you're
- * calling dbus_pending_call_block from a thread which is not the thread
- * where the DBus dispatch call happens. This is the case when the function
- * is run from tracker_resources_sparql_query_iterate_async.
- * See http://lists.freedesktop.org/archives/dbus/2009-March/011105.html
- * for a lengthy explication
- */
- while (!dbus_pending_call_get_completed (call)) {
- g_usleep (NOT_TOO_SHORT_DELAY);
- }
+ dbus_pending_call_block (call);
reply = dbus_pending_call_steal_reply (call);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]