[tracker/dbus-fd-experiment] Properly handle call cancelling with FD passing
- From: Adrien Bustany <abustany src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/dbus-fd-experiment] Properly handle call cancelling with FD passing
- Date: Tue, 15 Jun 2010 16:39:19 +0000 (UTC)
commit 186c67eacd6f5ed93787f3c1cf7e5dd95a2d4c30
Author: Adrien Bustany <abustany gnome org>
Date: Tue Jun 15 12:38:52 2010 -0400
Properly handle call cancelling with FD passing
src/libtracker-client/tracker.c | 124 +++++++++++++++++++++++++++++++++++++--
1 files changed, 119 insertions(+), 5 deletions(-)
---
diff --git a/src/libtracker-client/tracker.c b/src/libtracker-client/tracker.c
index 9d7a141..60a5d85 100644
--- a/src/libtracker-client/tracker.c
+++ b/src/libtracker-client/tracker.c
@@ -122,6 +122,13 @@ typedef struct {
} TrackerClientPrivate;
typedef struct {
+#ifdef HAVE_DBUS_FD_PASSING
+ /* This field is set to one for calls using DBus FD passing (and zero else).
+ * It is common to both the PendingCallData and the FastPendingCallData
+ * structs, so it allows differentiating between them, and cast
+ * appropriately */
+ char fast;
+#endif
DBusGProxy *proxy;
DBusGProxyCall *pending_call;
} PendingCallData;
@@ -182,6 +189,8 @@ typedef enum {
} FastOperationType;
typedef struct {
+ TrackerClient *client;
+ guint request_id;
FastOperationType operation;
const gchar *query;
gpointer user_data;
@@ -199,6 +208,12 @@ typedef struct {
TrackerReplyIterator iterator_callback;
};
} FastAsyncData;
+
+typedef struct {
+ char fast;
+ GCancellable *cancellable;
+ FastAsyncData *data;
+} FastPendingCallData;
#else
typedef struct {
TrackerReplyIterator callback;
@@ -234,6 +249,13 @@ G_DEFINE_TYPE(TrackerClient, tracker_client, G_TYPE_OBJECT)
static void
pending_call_free (PendingCallData *data)
{
+#ifdef HAVE_DBUS_FD_PASSING
+ if (data->fast) {
+ FastPendingCallData *fast_data = (FastPendingCallData *) data;
+ g_slice_free (FastPendingCallData, fast_data);
+ return;
+ }
+#endif
g_slice_free (PendingCallData, data);
}
@@ -251,6 +273,9 @@ pending_call_new (TrackerClient *client,
id = ++pending_call_id;
data = g_slice_new (PendingCallData);
+#ifdef HAVE_DBUS_FD_PASSING
+ data->fast = 0;
+#endif
data->proxy = proxy;
data->pending_call = pending_call;
@@ -263,6 +288,35 @@ pending_call_new (TrackerClient *client,
return id;
}
+#ifdef HAVE_DBUS_FD_PASSING
+static guint
+pending_call_new_fast (TrackerClient *client,
+ GCancellable *cancellable,
+ FastAsyncData *data)
+{
+ TrackerClientPrivate *private;
+ FastPendingCallData *call_data;
+ guint id;
+
+ private = TRACKER_CLIENT_GET_PRIVATE (client);
+
+ id = ++pending_call_id;
+
+ call_data = g_slice_new (FastPendingCallData);
+ call_data->fast = 1;
+ call_data->cancellable = cancellable;
+ call_data->data = data;
+
+ g_hash_table_insert (private->pending_calls,
+ GUINT_TO_POINTER (id),
+ call_data);
+
+ private->last_call = id;
+
+ return id;
+}
+#endif /* HAVE_DBUS_FD_PASSING */
+
static void
writeback_cb (DBusGProxy *proxy,
const GHashTable *resources,
@@ -510,6 +564,7 @@ fast_async_callback_iterator (GObject *source_object,
GAsyncResult *result,
gpointer user_data)
{
+ TrackerClientPrivate *private;
DBusMessage *reply;
GError *inner_error = NULL;
GError *error = NULL;
@@ -521,6 +576,10 @@ fast_async_callback_iterator (GObject *source_object,
result,
&inner_error);
+ private = TRACKER_CLIENT_GET_PRIVATE (data->client);
+ g_hash_table_remove (private->pending_calls,
+ GUINT_TO_POINTER (data->request_id));
+
iterator->buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream));
base_input_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (data->input_stream));
@@ -882,12 +941,16 @@ static void
sparql_update_fast_callback (DBusPendingCall *call,
void *user_data)
{
+ TrackerClientPrivate *private;
FastAsyncData *data = user_data;
DBusMessage *reply;
GError *error = NULL;
DBusMessageIter iter, subiter, subsubiter;
GPtrArray *result;
+ private = TRACKER_CLIENT_GET_PRIVATE (data->client);
+ g_hash_table_remove (private->pending_calls,
+ GUINT_TO_POINTER (data->request_id));
reply = dbus_pending_call_steal_reply (call);
@@ -1313,6 +1376,47 @@ tracker_cancel_call (TrackerClient *client,
return FALSE;
}
+#ifdef HAVE_DBUS_FD_PASSING
+ if (data->fast) {
+ FastPendingCallData *fast_data = (FastPendingCallData *) data;
+ FastAsyncData *async_data = fast_data->data;
+ GInputStream *base_input_stream;
+
+ if (fast_data->cancellable) {
+ g_cancellable_cancel (fast_data->cancellable);
+ /* When cancelling a GIO call, the callback is called with an
+ * error, so we do the cleanup there */
+ return TRUE;
+ }
+
+ if (async_data->dbus_call) {
+ dbus_pending_call_cancel (async_data->dbus_call);
+ }
+
+ switch (async_data->operation) {
+ case FAST_QUERY:
+ base_input_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (async_data->input_stream));
+ g_object_unref (async_data->input_stream);
+ g_object_unref (base_input_stream);
+ g_object_unref (async_data->output_stream);
+ /* Fall through */
+ case FAST_UPDATE:
+ case FAST_UPDATE_BLANK:
+ case FAST_UPDATE_BATCH:
+ dbus_pending_call_unref (async_data->dbus_call);
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+
+ g_slice_free (FastAsyncData, async_data);
+
+ g_hash_table_remove (private->pending_calls,
+ GUINT_TO_POINTER (call_id));
+ return TRUE;
+ }
+#endif
+
dbus_g_proxy_cancel_call (data->proxy, data->pending_call);
g_hash_table_remove (private->pending_calls,
GUINT_TO_POINTER (call_id));
@@ -2182,6 +2286,7 @@ tracker_resources_sparql_query_iterate_async (TrackerClient *client,
GInputStream *input_stream;
GInputStream *buffered_input_stream;
GOutputStream *iterator_output_stream;
+ GCancellable *cancellable;
FastAsyncData *async_data;
g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
@@ -2224,22 +2329,25 @@ tracker_resources_sparql_query_iterate_async (TrackerClient *client,
buffered_input_stream = g_buffered_input_stream_new_sized (input_stream,
TRACKER_STEROIDS_BUFFER_SIZE);
iterator_output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
+ cancellable = g_cancellable_new ();
+ async_data->client = client;
async_data->result_iterator = iterator;
async_data->input_stream = buffered_input_stream;
async_data->output_stream = iterator_output_stream;
async_data->dbus_call = call;
async_data->iterator_callback = callback;
async_data->user_data = user_data;
+ async_data->request_id = pending_call_new_fast (client, cancellable, async_data);
g_output_stream_splice_async (iterator_output_stream,
buffered_input_stream,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
0,
- NULL,
+ cancellable,
fast_async_callback_iterator,
async_data);
- return 42;
+ return async_data->request_id;
#else
FastQueryAsyncCompatData *data;
@@ -2311,10 +2419,12 @@ tracker_resources_sparql_update_async (TrackerClient *client,
g_return_val_if_fail (callback != NULL, 0);
data = g_slice_new0 (FastAsyncData);
+ data->client = client;
data->operation = FAST_UPDATE;
data->query = query;
data->void_callback = callback;
data->user_data = user_data;
+ data->request_id = pending_call_new_fast (client, NULL, data);
sparql_update_fast_async (client, data, &error);
@@ -2326,7 +2436,7 @@ tracker_resources_sparql_update_async (TrackerClient *client,
return 0;
}
- return 42;
+ return data->request_id;
#else
return tracker_resources_sparql_update_compat_async (client, query, callback, user_data);
#endif
@@ -2378,10 +2488,12 @@ tracker_resources_sparql_update_blank_async (TrackerClient *client,
g_return_val_if_fail (callback != NULL, 0);
data = g_slice_new0 (FastAsyncData);
+ data->client = client;
data->operation = FAST_UPDATE_BLANK;
data->query = query;
data->gptrarray_callback = callback;
data->user_data = user_data;
+ data->request_id = pending_call_new_fast (client, NULL, data);
sparql_update_fast_async (client, data, &error);
@@ -2393,7 +2505,7 @@ tracker_resources_sparql_update_blank_async (TrackerClient *client,
return 0;
}
- return 42;
+ return data->request_id;
#else
return tracker_resources_sparql_update_blank_compat_async (client, query, callback, user_data);
#endif
@@ -2459,10 +2571,12 @@ tracker_resources_batch_sparql_update_async (TrackerClient *client,
g_return_val_if_fail (callback != NULL, 0);
data = g_slice_new0 (FastAsyncData);
+ data->client = client;
data->operation = FAST_UPDATE_BATCH;
data->query = query;
data->void_callback = callback;
data->user_data = user_data;
+ data->request_id = pending_call_new_fast (client, NULL, data);
sparql_update_fast_async (client, data, &error);
@@ -2474,7 +2588,7 @@ tracker_resources_batch_sparql_update_async (TrackerClient *client,
return 0;
}
- return 42;
+ return data->request_id;
#else
return tracker_resources_batch_sparql_update_compat_async (client, query, callback, user_data);
#endif
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]