[tracker/dbus-fd-experiment-gio: 3/11] Steroids client: make async query use GIO and not threads
- From: Adrien Bustany <abustany src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/dbus-fd-experiment-gio: 3/11] Steroids client: make async query use GIO and not threads
- Date: Thu, 10 Jun 2010 22:32:53 +0000 (UTC)
commit 4fea19d382809615b91fb92eb319ca8b494b1d9c
Author: Adrien Bustany <abustany gnome org>
Date: Thu Jun 10 12:31:55 2010 -0400
Steroids client: make async query use GIO and not threads
src/libtracker-client/tracker.c | 358 +++++++++++++++++----------------------
1 files changed, 155 insertions(+), 203 deletions(-)
---
diff --git a/src/libtracker-client/tracker.c b/src/libtracker-client/tracker.c
index 11a3fd7..18c5299 100644
--- a/src/libtracker-client/tracker.c
+++ b/src/libtracker-client/tracker.c
@@ -117,10 +117,6 @@ typedef struct {
GList *writeback_callbacks;
gboolean is_constructed;
-
-#ifdef HAVE_DBUS_FD_PASSING
- GThreadPool *thread_pool;
-#endif
} TrackerClientPrivate;
typedef struct {
@@ -165,6 +161,7 @@ struct TrackerResultIterator {
int rc;
char *buffer;
int buffer_index;
+ long buffer_size;
guint n_columns;
int *offsets;
@@ -187,8 +184,10 @@ typedef enum {
typedef struct {
FastOperationType operation;
gchar *query;
- GError *error;
gpointer user_data;
+ GInputStream *input_stream;
+ GOutputStream *output_stream;
+ DBusPendingCall *dbus_call;
union {
GPtrArray *result_gptrarray;
TrackerResultIterator *result_iterator;
@@ -214,6 +213,10 @@ static void client_get_property (GObject *object,
GParamSpec *pspec);
static void client_constructed (GObject *object);
+#ifdef HAVE_DBUS_FD_PASSING
+static int iterator_buffer_read_int (TrackerResultIterator *iterator);
+#endif
+
enum {
PROP_0,
PROP_ENABLE_WARNINGS,
@@ -281,74 +284,6 @@ writeback_cb (DBusGProxy *proxy,
}
}
-static gboolean
-thread_callback (gpointer data)
-{
- FastAsyncData *async_data = data;
-
- switch (async_data->operation) {
- case FAST_QUERY:
- (* async_data->iterator_callback) (async_data->result_iterator,
- async_data->error,
- async_data->user_data);
- break;
- case FAST_UPDATE:
- case FAST_UPDATE_BATCH:
- (* async_data->void_callback) (async_data->error,
- async_data->user_data);
- break;
- case FAST_UPDATE_BLANK:
- (* async_data->gptrarray_callback) (async_data->result_gptrarray,
- async_data->error,
- async_data->user_data);
- break;
- default:
- g_assert_not_reached ();
- }
-
- g_free (async_data->query);
-
- if (async_data->error) {
- g_error_free (async_data->error);
- }
-
- return FALSE;
-}
-
-static void
-thread_dispatch (gpointer data, gpointer user_data)
-{
- FastAsyncData *async_data = data;
- TrackerClient *client = user_data;
-
- switch (async_data->operation) {
- case FAST_QUERY:
- async_data->result_iterator = tracker_resources_sparql_query_iterate (client,
- async_data->query,
- &async_data->error);
- break;
- case FAST_UPDATE:
- tracker_resources_sparql_update_fast (client,
- async_data->query,
- &async_data->error);
- break;
- case FAST_UPDATE_BLANK:
- async_data->result_gptrarray = tracker_resources_sparql_update_blank_fast (client,
- async_data->query,
- &async_data->error);
- break;
- case FAST_UPDATE_BATCH:
- tracker_resources_batch_sparql_update_fast (client,
- async_data->query,
- &async_data->error);
- break;
- default:
- g_assert_not_reached ();
- }
-
- g_idle_add (thread_callback, async_data);
-}
-
static void
tracker_client_class_init (TrackerClientClass *klass)
{
@@ -407,12 +342,6 @@ client_finalize (GObject *object)
if (private->pending_calls) {
g_hash_table_unref (private->pending_calls);
}
-
- if (private->thread_pool) {
- g_thread_pool_free (private->thread_pool,
- TRUE,
- TRUE);
- }
}
static void
@@ -518,14 +447,6 @@ client_constructed (GObject *object)
TRACKER_TYPE_STR_STRV_MAP,
G_TYPE_INVALID);
-#ifdef HAVE_DBUS_FD_PASSING
- private->thread_pool = g_thread_pool_new (thread_dispatch,
- object,
- 5,
- FALSE,
- NULL);
-#endif
-
private->is_constructed = TRUE;
}
@@ -572,6 +493,73 @@ callback_with_void (DBusGProxy *proxy,
g_slice_free (CallbackVoid, cb);
}
+static void
+fast_async_callback_iterator (GObject *source_object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ DBusMessage *reply;
+ DBusError dbus_error;
+ GError *inner_error = NULL;
+ GError *error = NULL;
+ FastAsyncData *data = user_data;
+ TrackerResultIterator *iterator = data->result_iterator;
+
+ dbus_error_init (&dbus_error);
+
+ iterator->buffer_size = g_output_stream_splice_finish (data->output_stream,
+ result,
+ &inner_error);
+
+ iterator->buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream));
+
+ g_object_unref (data->input_stream);
+ g_object_unref (data->output_stream);
+
+ if (inner_error) {
+ g_set_error (&error,
+ TRACKER_CLIENT_ERROR,
+ TRACKER_CLIENT_ERROR_BROKEN_PIPE,
+ "Couldn't get results from server");
+ g_error_free (inner_error);
+ tracker_result_iterator_free (iterator);
+ dbus_pending_call_unref (data->dbus_call);
+ (* data->iterator_callback) (NULL, error, data->user_data);
+ return;
+ }
+
+ iterator->buffer_index = 0;
+ iterator->rc = iterator_buffer_read_int (iterator);
+
+ /* Reset the iterator internal state */
+ iterator->buffer_index = 0;
+
+ if (iterator->rc == TRACKER_STEROIDS_RC_ROW ||
+ iterator->rc == TRACKER_STEROIDS_RC_LARGEROW) {
+ iterator->has_next = TRUE;
+ }
+
+ dbus_pending_call_block (data->dbus_call);
+
+ reply = dbus_pending_call_steal_reply (data->dbus_call);
+
+ g_assert (reply);
+
+ if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) {
+ dbus_set_error_from_message (&dbus_error, reply);
+ dbus_set_g_error (&error, &dbus_error);
+ dbus_pending_call_unref (data->dbus_call);
+ (* data->iterator_callback) (NULL, error, data->user_data);
+ return ;
+ }
+
+ dbus_message_unref (reply);
+
+ dbus_pending_call_unref (data->dbus_call);
+
+ (* data->iterator_callback) (iterator, NULL, data->user_data);
+}
+
/* Deprecated and only used for 0.6 API */
static void
callback_with_array (DBusGProxy *proxy,
@@ -828,28 +816,13 @@ find_conversion (const char *format,
#ifdef HAVE_DBUS_FD_PASSING
static int
-buffer_read_int (char *buf)
-{
- int result = 0;
-
- result += (((unsigned char)*(buf++)));
- result += (((unsigned char)*(buf++)) << 8);
- result += (((unsigned char)*(buf++)) << 16);
- result += (((unsigned char)*(buf++)) << 24);
-
- return result;
-}
-
-static int
iterator_buffer_read_int (TrackerResultIterator *iterator)
{
- int result;
+ int v = *((int *)(iterator->buffer + iterator->buffer_index));
- result = buffer_read_int (iterator->buffer + iterator->buffer_index);
+ iterator->buffer_index += 4;
- iterator->buffer_index += sizeof (int);
-
- return result;
+ return GINT32_FROM_BE (v);
}
static DBusMessage*
@@ -1495,11 +1468,11 @@ tracker_resources_sparql_query_iterate (TrackerClient *client,
iterator = g_slice_new0 (TrackerResultIterator);
input_stream = g_unix_input_stream_new (pipefd[0], TRUE);
iterator_output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
- g_output_stream_splice (iterator_output_stream,
- input_stream,
- G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
- NULL,
- &inner_error);
+ iterator->buffer_size = g_output_stream_splice (iterator_output_stream,
+ input_stream,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ NULL,
+ &inner_error);
iterator->buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (iterator_output_stream));
g_object_unref (input_stream);
@@ -1663,21 +1636,29 @@ tracker_result_iterator_next (TrackerResultIterator *iterator)
{
#ifdef HAVE_DBUS_FD_PASSING
int nextrc;
+ int last_offset;
iterator->rc = iterator_buffer_read_int (iterator);
switch (iterator->rc) {
- case TRACKER_STEROIDS_RC_LARGEROW:
- /* Skip row size int */
- iterator_buffer_read_int (iterator);
case TRACKER_STEROIDS_RC_ROW:
iterator->n_columns = iterator_buffer_read_int (iterator);
iterator->offsets = (int *)(iterator->buffer + iterator->buffer_index);
- iterator->buffer_index += iterator->n_columns * sizeof (int);
+ iterator->buffer_index += sizeof (int) * (iterator->n_columns - 1);
+ last_offset = iterator_buffer_read_int (iterator);
iterator->data = iterator->buffer + iterator->buffer_index;
- iterator->buffer_index += iterator->offsets[iterator->n_columns - 1] + 1;
+ iterator->buffer_index += last_offset + 1;
+
+ nextrc = iterator_buffer_read_int (iterator);
+ iterator->buffer_index -= 4;
- nextrc = buffer_read_int (iterator->buffer + iterator->buffer_index);
- iterator->has_next = (nextrc == TRACKER_STEROIDS_RC_ROW || nextrc == TRACKER_STEROIDS_RC_LARGEROW);
+ if (nextrc == TRACKER_STEROIDS_RC_ROW || nextrc == TRACKER_STEROIDS_RC_LARGEROW) {
+ iterator->has_next = TRUE;
+ } else if (nextrc == TRACKER_STEROIDS_RC_DONE) {
+ iterator->has_next = FALSE;
+ } else {
+ g_critical ("Invalid row code %d", nextrc);
+ iterator->has_next = FALSE;
+ }
break;
case TRACKER_STEROIDS_RC_DONE:
break;
@@ -2069,30 +2050,73 @@ tracker_resources_sparql_query_iterate_async (TrackerClient *client,
gpointer user_data)
{
TrackerClientPrivate *private;
- FastAsyncData *data;
- GError *error = NULL;
+ DBusConnection *connection;
+ DBusMessage *message;
+ DBusMessageIter iter;
+ DBusPendingCall *call;
+ DBusError dbus_error;
+ TrackerResultIterator *iterator;
+ int pipefd[2];
+ GInputStream *input_stream;
+ GOutputStream *iterator_output_stream;
+ FastAsyncData *async_data;
g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
- g_return_val_if_fail (query != NULL, 0);
- g_return_val_if_fail (callback != NULL, 0);
+ g_return_val_if_fail (query, 0);
private = TRACKER_CLIENT_GET_PRIVATE (client);
- data = g_slice_new0 (FastAsyncData);
- data->operation = FAST_QUERY;
- data->query = g_strdup (query);
- data->iterator_callback = callback;
- data->user_data = user_data;
+ if (pipe (pipefd) < 0) {
+ g_critical ("Cannot open pipe");
+ return 0;
+ }
- g_thread_pool_push (private->thread_pool, data, &error);
+ connection = dbus_g_connection_get_connection (private->connection);
- if (error) {
- g_critical ("Could not create thread: %s", error->message);
- g_error_free (error);
+ dbus_error_init (&dbus_error);
+
+ message = dbus_message_new_method_call (TRACKER_STEROIDS_SERVICE,
+ TRACKER_STEROIDS_PATH,
+ TRACKER_STEROIDS_INTERFACE,
+ "Query");
+
+ dbus_message_iter_init_append (message, &iter);
+ dbus_message_iter_append_basic (&iter, DBUS_TYPE_STRING, &query);
+ dbus_message_iter_append_basic (&iter, DBUS_TYPE_UNIX_FD, &pipefd[1]);
+
+ dbus_connection_send_with_reply (connection,
+ message,
+ &call,
+ -1);
+ dbus_message_unref (message);
+ close (pipefd[1]);
+
+ if (!call) {
+ g_critical ("FD passing unsupported or connection disconnected");
return 0;
}
- return 0;
+ async_data = g_slice_new0 (FastAsyncData);
+
+ iterator = g_slice_new0 (TrackerResultIterator);
+ input_stream = g_unix_input_stream_new (pipefd[0], TRUE);
+ iterator_output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
+
+ async_data->result_iterator = iterator;
+ async_data->input_stream = input_stream;
+ async_data->output_stream = iterator_output_stream;
+ async_data->dbus_call = call;
+ async_data->iterator_callback = callback;
+ async_data->user_data = user_data;
+
+ g_output_stream_splice_async (iterator_output_stream,
+ input_stream,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ 0,
+ NULL,
+ fast_async_callback_iterator,
+ async_data);
+ return 42;
}
/**
@@ -2146,30 +2170,6 @@ tracker_resources_sparql_update_fast_async (TrackerClient *client,
TrackerReplyVoid callback,
gpointer user_data)
{
- TrackerClientPrivate *private;
- FastAsyncData *data;
- GError *error = NULL;
-
- g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
- g_return_val_if_fail (query != NULL, 0);
- g_return_val_if_fail (callback != NULL, 0);
-
- private = TRACKER_CLIENT_GET_PRIVATE (client);
-
- data = g_slice_new0 (FastAsyncData);
- data->operation = FAST_UPDATE;
- data->query = g_strdup (query);
- data->void_callback = callback;
- data->user_data = user_data;
-
- g_thread_pool_push (private->thread_pool, data, &error);
-
- if (error) {
- g_critical ("Could not create thread: %s", error->message);
- g_error_free (error);
- return 0;
- }
-
return 0;
}
@@ -2210,30 +2210,6 @@ tracker_resources_sparql_update_blank_fast_async (TrackerClient *client,
TrackerReplyGPtrArray callback,
gpointer user_data)
{
- TrackerClientPrivate *private;
- FastAsyncData *data;
- GError *error = NULL;
-
- g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
- g_return_val_if_fail (query != NULL, 0);
- g_return_val_if_fail (callback != NULL, 0);
-
- private = TRACKER_CLIENT_GET_PRIVATE (client);
-
- data = g_slice_new0 (FastAsyncData);
- data->operation = FAST_UPDATE_BLANK;
- data->query = g_strdup (query);
- data->gptrarray_callback = callback;
- data->user_data = user_data;
-
- g_thread_pool_push (private->thread_pool, data, &error);
-
- if (error) {
- g_critical ("Could not create thread: %s", error->message);
- g_error_free (error);
- return 0;
- }
-
return 0;
}
@@ -2288,30 +2264,6 @@ tracker_resources_batch_sparql_update_fast_async (TrackerClient *client,
TrackerReplyVoid callback,
gpointer user_data)
{
- TrackerClientPrivate *private;
- FastAsyncData *data;
- GError *error = NULL;
-
- g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
- g_return_val_if_fail (query != NULL, 0);
- g_return_val_if_fail (callback != NULL, 0);
-
- private = TRACKER_CLIENT_GET_PRIVATE (client);
-
- data = g_slice_new0 (FastAsyncData);
- data->operation = FAST_UPDATE_BLANK;
- data->query = g_strdup (query);
- data->void_callback = callback;
- data->user_data = user_data;
-
- g_thread_pool_push (private->thread_pool, data, &error);
-
- if (error) {
- g_critical ("Could not create thread: %s", error->message);
- g_error_free (error);
- return 0;
- }
-
return 0;
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]