[tracker/dbus-fd-experiment-gio: 7/11] Add gio based update
- From: Adrien Bustany <abustany src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/dbus-fd-experiment-gio: 7/11] Add gio based update
- Date: Thu, 10 Jun 2010 22:33:13 +0000 (UTC)
commit e7664994cd1086f87119f1772a16f4ec4e99444c
Author: Adrien Bustany <abustany gnome org>
Date: Thu Jun 10 18:26:10 2010 -0400
Add gio based update
src/libtracker-client/tracker.c | 283 ++++++++++++++++++++++++++++++++++++---
1 files changed, 264 insertions(+), 19 deletions(-)
---
diff --git a/src/libtracker-client/tracker.c b/src/libtracker-client/tracker.c
index 18c5299..aa42b5b 100644
--- a/src/libtracker-client/tracker.c
+++ b/src/libtracker-client/tracker.c
@@ -183,7 +183,7 @@ typedef enum {
typedef struct {
FastOperationType operation;
- gchar *query;
+ const gchar *query;
gpointer user_data;
GInputStream *input_stream;
GOutputStream *output_stream;
@@ -815,6 +815,30 @@ find_conversion (const char *format,
}
#ifdef HAVE_DBUS_FD_PASSING
+static GHashTable*
+unmarshall_hash_table (DBusMessageIter *iter) {
+ GHashTable *result;
+ DBusMessageIter subiter, subsubiter;
+
+ result = g_hash_table_new (g_str_hash, g_str_equal);
+
+ dbus_message_iter_recurse (iter, &subiter);
+
+ while (dbus_message_iter_get_arg_type (&subiter) != DBUS_TYPE_INVALID) {
+ const gchar *key, *value;
+
+ dbus_message_iter_recurse (&subiter, &subsubiter);
+ dbus_message_iter_get_basic (&subsubiter, &key);
+ dbus_message_iter_next (&subsubiter);
+ dbus_message_iter_get_basic (&subsubiter, &value);
+ g_hash_table_insert (result, g_strdup (key), g_strdup (value));
+
+ dbus_message_iter_next (&subiter);
+ }
+
+ return result;
+}
+
static int
iterator_buffer_read_int (TrackerResultIterator *iterator)
{
@@ -825,6 +849,82 @@ iterator_buffer_read_int (TrackerResultIterator *iterator)
return GINT32_FROM_BE (v);
}
+static void
+sparql_update_fast_callback (DBusPendingCall *call,
+ void *user_data)
+{
+ FastAsyncData *data = user_data;
+ DBusMessage *reply;
+ DBusError dbus_error;
+ GError *error = NULL;
+ DBusMessageIter iter, subiter, subsubiter;
+ GPtrArray *result;
+
+ dbus_error_init (&dbus_error);
+
+ reply = dbus_pending_call_steal_reply (call);
+
+ g_assert (reply);
+
+ if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) {
+ dbus_set_error_from_message (&dbus_error, reply);
+ dbus_set_g_error (&error, &dbus_error);
+
+ switch (data->operation) {
+ case FAST_UPDATE:
+ case FAST_UPDATE_BATCH:
+ (* data->void_callback) (error, data->user_data);
+ break;
+ case FAST_UPDATE_BLANK:
+ (* data->gptrarray_callback) (NULL, error, data->user_data);
+ break;
+ default:
+ g_assert_not_reached ();
+ break;
+ }
+
+ dbus_message_unref (reply);
+ dbus_pending_call_unref (call);
+ g_slice_free (FastAsyncData, data);
+ return;
+ }
+
+ switch (data->operation) {
+ case FAST_UPDATE:
+ case FAST_UPDATE_BATCH:
+ (* data->void_callback) (NULL, data->user_data);
+ break;
+ case FAST_UPDATE_BLANK:
+ result = g_ptr_array_new ();
+ dbus_message_iter_init (reply, &iter);
+ dbus_message_iter_recurse (&iter, &subiter);
+
+ while (dbus_message_iter_get_arg_type (&subiter) != DBUS_TYPE_INVALID) {
+ GPtrArray *inner_array;
+
+ inner_array = g_ptr_array_new ();
+ g_ptr_array_add (result, inner_array);
+ dbus_message_iter_recurse (&subiter, &subsubiter);
+
+ while (dbus_message_iter_get_arg_type (&subsubiter) != DBUS_TYPE_INVALID) {
+ g_ptr_array_add (inner_array, unmarshall_hash_table (&subsubiter));
+ dbus_message_iter_next (&subsubiter);
+ }
+
+ dbus_message_iter_next (&subiter);
+ }
+ (* data->gptrarray_callback) (result, error, data->user_data);
+ break;
+ default:
+ g_assert_not_reached ();
+ break;
+ }
+
+ dbus_message_unref (reply);
+ dbus_pending_call_unref (call);
+ g_slice_free (FastAsyncData, data);
+}
+
static DBusMessage*
sparql_update_fast (TrackerClient *client,
const gchar *query,
@@ -944,30 +1044,106 @@ sparql_update_fast (TrackerClient *client,
return reply;
}
-static GHashTable*
-unmarshall_hash_table (DBusMessageIter *iter) {
- GHashTable *result;
- DBusMessageIter subiter, subsubiter;
+static void
+sparql_update_fast_async (TrackerClient *client,
+ FastAsyncData *data,
+ GError **error)
+{
+ TrackerClientPrivate *private;
+ DBusConnection *connection;
+ gchar *dbus_method;
+ DBusMessage *message;
+ DBusMessageIter iter;
+ DBusPendingCall *call;
+ DBusError dbus_error;
+ int pipefd[2];
+ GOutputStream *output_stream;
+ GDataOutputStream *data_output_stream;
+ GError *inner_error = NULL;
- result = g_hash_table_new (g_str_hash, g_str_equal);
+ private = TRACKER_CLIENT_GET_PRIVATE (client);
- dbus_message_iter_recurse (iter, &subiter);
+ if (pipe (pipefd) < 0) {
+ g_set_error (error,
+ TRACKER_CLIENT_ERROR,
+ TRACKER_CLIENT_ERROR_UNSUPPORTED,
+ "Cannot open pipe");
+ return;
+ }
- while (dbus_message_iter_get_arg_type (&subiter) != DBUS_TYPE_INVALID) {
- const gchar *key, *value;
+ connection = dbus_g_connection_get_connection (private->connection);
- dbus_message_iter_recurse (&subiter, &subsubiter);
- dbus_message_iter_get_basic (&subsubiter, &key);
- dbus_message_iter_next (&subsubiter);
- dbus_message_iter_get_basic (&subsubiter, &value);
- g_hash_table_insert (result, g_strdup (key), g_strdup (value));
+ dbus_error_init (&dbus_error);
- dbus_message_iter_next (&subiter);
+ switch (data->operation) {
+ case FAST_UPDATE:
+ dbus_method = "Update";
+ break;
+ case FAST_UPDATE_BLANK:
+ dbus_method = "UpdateBlank";
+ break;
+ case FAST_UPDATE_BATCH:
+ dbus_method = "BatchUpdate";
+ break;
+ default:
+ g_assert_not_reached ();
}
- return result;
+ message = dbus_message_new_method_call (TRACKER_STEROIDS_SERVICE,
+ TRACKER_STEROIDS_PATH,
+ TRACKER_STEROIDS_INTERFACE,
+ dbus_method);
+ dbus_message_iter_init_append (message, &iter);
+ dbus_message_iter_append_basic (&iter, DBUS_TYPE_UNIX_FD, &pipefd[0]);
+ dbus_connection_send_with_reply (connection,
+ message,
+ &call,
+ -1);
+ dbus_message_unref (message);
+ close (pipefd[0]);
+
+ if (!call) {
+ g_set_error (error,
+ TRACKER_CLIENT_ERROR,
+ TRACKER_CLIENT_ERROR_UNSUPPORTED,
+ "FD passing unsupported or connection disconnected");
+ return;
+ }
+
+ output_stream = g_unix_output_stream_new (pipefd[1], TRUE);
+ data_output_stream = g_data_output_stream_new (output_stream);
+
+ g_data_output_stream_put_int32 (data_output_stream,
+ strlen (data->query),
+ NULL,
+ &inner_error);
+
+ if (inner_error) {
+ g_propagate_error (error, inner_error);
+ g_object_unref (data_output_stream);
+ g_object_unref (output_stream);
+ return;
+ }
+
+ g_data_output_stream_put_string (data_output_stream,
+ data->query,
+ NULL,
+ &inner_error);
+
+ if (inner_error) {
+ g_propagate_error (error, inner_error);
+ g_object_unref (data_output_stream);
+ g_object_unref (output_stream);
+ return;
+ }
+
+ g_object_unref (data_output_stream);
+ g_object_unref (output_stream);
+
+ dbus_pending_call_set_notify (call, sparql_update_fast_callback, data, NULL);
}
+
#endif
/**
@@ -2170,7 +2346,30 @@ tracker_resources_sparql_update_fast_async (TrackerClient *client,
TrackerReplyVoid callback,
gpointer user_data)
{
- return 0;
+ FastAsyncData *data;
+ GError *error = NULL;
+
+ g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
+ g_return_val_if_fail (query != NULL, 0);
+ g_return_val_if_fail (callback != NULL, 0);
+
+ data = g_slice_new0 (FastAsyncData);
+ data->operation = FAST_UPDATE;
+ data->query = query;
+ data->void_callback = callback;
+ data->user_data = user_data;
+
+ sparql_update_fast_async (client, data, &error);
+
+ if (error) {
+ g_critical ("Could not initiate update: %s", error->message);
+ g_error_free (error);
+ g_slice_free (FastAsyncData, data);
+
+ return 0;
+ }
+
+ return 42;
}
guint
@@ -2210,7 +2409,30 @@ tracker_resources_sparql_update_blank_fast_async (TrackerClient *client,
TrackerReplyGPtrArray callback,
gpointer user_data)
{
- return 0;
+ FastAsyncData *data;
+ GError *error = NULL;
+
+ g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
+ g_return_val_if_fail (query != NULL, 0);
+ g_return_val_if_fail (callback != NULL, 0);
+
+ data = g_slice_new0 (FastAsyncData);
+ data->operation = FAST_UPDATE_BLANK;
+ data->query = query;
+ data->gptrarray_callback = callback;
+ data->user_data = user_data;
+
+ sparql_update_fast_async (client, data, &error);
+
+ if (error) {
+ g_critical ("Could not initiate update: %s", error->message);
+ g_error_free (error);
+ g_slice_free (FastAsyncData, data);
+
+ return 0;
+ }
+
+ return 42;
}
/**
@@ -2264,7 +2486,30 @@ tracker_resources_batch_sparql_update_fast_async (TrackerClient *client,
TrackerReplyVoid callback,
gpointer user_data)
{
- return 0;
+ FastAsyncData *data;
+ GError *error = NULL;
+
+ g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
+ g_return_val_if_fail (query != NULL, 0);
+ g_return_val_if_fail (callback != NULL, 0);
+
+ data = g_slice_new0 (FastAsyncData);
+ data->operation = FAST_UPDATE_BATCH;
+ data->query = query;
+ data->void_callback = callback;
+ data->user_data = user_data;
+
+ sparql_update_fast_async (client, data, &error);
+
+ if (error) {
+ g_critical ("Could not initiate update: %s", error->message);
+ g_error_free (error);
+ g_slice_free (FastAsyncData, data);
+
+ return 0;
+ }
+
+ return 42;
}
/**
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]