[tracker/dbus-fd-experiment] Properly handle call cancelling with FD passing



commit 186c67eacd6f5ed93787f3c1cf7e5dd95a2d4c30
Author: Adrien Bustany <abustany gnome org>
Date:   Tue Jun 15 12:38:52 2010 -0400

    Properly handle call cancelling with FD passing

 src/libtracker-client/tracker.c |  124 +++++++++++++++++++++++++++++++++++++--
 1 files changed, 119 insertions(+), 5 deletions(-)
---
diff --git a/src/libtracker-client/tracker.c b/src/libtracker-client/tracker.c
index 9d7a141..60a5d85 100644
--- a/src/libtracker-client/tracker.c
+++ b/src/libtracker-client/tracker.c
@@ -122,6 +122,13 @@ typedef struct {
 } TrackerClientPrivate;
 
 typedef struct {
+#ifdef HAVE_DBUS_FD_PASSING
+	/* This field is set to one for calls using DBus FD passing (and zero else).
+	 * It is common to both the PendingCallData and the FastPendingCallData
+	 * structs, so it allows differentiating between them, and cast
+	 * appropriately */
+	char fast;
+#endif
 	DBusGProxy *proxy;
 	DBusGProxyCall *pending_call;
 } PendingCallData;
@@ -182,6 +189,8 @@ typedef enum {
 } FastOperationType;
 
 typedef struct {
+	TrackerClient         *client;
+	guint                  request_id;
 	FastOperationType      operation;
 	const gchar           *query;
 	gpointer               user_data;
@@ -199,6 +208,12 @@ typedef struct {
 		TrackerReplyIterator   iterator_callback;
 	};
 } FastAsyncData;
+
+typedef struct {
+	char           fast;
+	GCancellable  *cancellable;
+	FastAsyncData *data;
+} FastPendingCallData;
 #else
 typedef struct {
 	TrackerReplyIterator callback;
@@ -234,6 +249,13 @@ G_DEFINE_TYPE(TrackerClient, tracker_client, G_TYPE_OBJECT)
 static void
 pending_call_free (PendingCallData *data)
 {
+#ifdef HAVE_DBUS_FD_PASSING
+	if (data->fast) {
+		FastPendingCallData *fast_data = (FastPendingCallData *) data;
+		g_slice_free (FastPendingCallData, fast_data);
+		return;
+	}
+#endif
 	g_slice_free (PendingCallData, data);
 }
 
@@ -251,6 +273,9 @@ pending_call_new (TrackerClient  *client,
 	id = ++pending_call_id;
 
 	data = g_slice_new (PendingCallData);
+#ifdef HAVE_DBUS_FD_PASSING
+	data->fast = 0;
+#endif
 	data->proxy = proxy;
 	data->pending_call = pending_call;
 
@@ -263,6 +288,35 @@ pending_call_new (TrackerClient  *client,
 	return id;
 }
 
+#ifdef HAVE_DBUS_FD_PASSING
+static guint
+pending_call_new_fast (TrackerClient   *client,
+                      GCancellable    *cancellable,
+                      FastAsyncData   *data)
+{
+	TrackerClientPrivate *private;
+	FastPendingCallData *call_data;
+	guint id;
+
+	private = TRACKER_CLIENT_GET_PRIVATE (client);
+
+	id = ++pending_call_id;
+
+	call_data = g_slice_new (FastPendingCallData);
+	call_data->fast = 1;
+	call_data->cancellable = cancellable;
+	call_data->data = data;
+
+	g_hash_table_insert (private->pending_calls,
+	                     GUINT_TO_POINTER (id),
+	                     call_data);
+
+	private->last_call = id;
+
+	return id;
+}
+#endif /* HAVE_DBUS_FD_PASSING */
+
 static void
 writeback_cb (DBusGProxy       *proxy,
               const GHashTable *resources,
@@ -510,6 +564,7 @@ fast_async_callback_iterator (GObject      *source_object,
                               GAsyncResult *result,
                               gpointer      user_data)
 {
+	TrackerClientPrivate *private;
 	DBusMessage *reply;
 	GError *inner_error = NULL;
 	GError *error = NULL;
@@ -521,6 +576,10 @@ fast_async_callback_iterator (GObject      *source_object,
 	                                                       result,
 	                                                       &inner_error);
 
+	private = TRACKER_CLIENT_GET_PRIVATE (data->client);
+	g_hash_table_remove (private->pending_calls,
+	                     GUINT_TO_POINTER (data->request_id));
+
 	iterator->buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream));
 
 	base_input_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (data->input_stream));
@@ -882,12 +941,16 @@ static void
 sparql_update_fast_callback (DBusPendingCall *call,
                              void            *user_data)
 {
+	TrackerClientPrivate *private;
 	FastAsyncData *data = user_data;
 	DBusMessage *reply;
 	GError *error = NULL;
 	DBusMessageIter iter, subiter, subsubiter;
 	GPtrArray *result;
 
+	private = TRACKER_CLIENT_GET_PRIVATE (data->client);
+	g_hash_table_remove (private->pending_calls,
+	                     GUINT_TO_POINTER (data->request_id));
 
 	reply = dbus_pending_call_steal_reply (call);
 
@@ -1313,6 +1376,47 @@ tracker_cancel_call (TrackerClient *client,
 		return FALSE;
 	}
 
+#ifdef HAVE_DBUS_FD_PASSING
+	if (data->fast) {
+		FastPendingCallData *fast_data = (FastPendingCallData *) data;
+		FastAsyncData *async_data = fast_data->data;
+		GInputStream *base_input_stream;
+
+		if (fast_data->cancellable) {
+			g_cancellable_cancel (fast_data->cancellable);
+			/* When cancelling a GIO call, the callback is called with an
+			 * error, so we do the cleanup there */
+			return TRUE;
+		}
+
+		if (async_data->dbus_call) {
+			dbus_pending_call_cancel (async_data->dbus_call);
+		}
+
+		switch (async_data->operation) {
+		case FAST_QUERY:
+			base_input_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (async_data->input_stream));
+			g_object_unref (async_data->input_stream);
+			g_object_unref (base_input_stream);
+			g_object_unref (async_data->output_stream);
+			/* Fall through */
+		case FAST_UPDATE:
+		case FAST_UPDATE_BLANK:
+		case FAST_UPDATE_BATCH:
+			dbus_pending_call_unref (async_data->dbus_call);
+			break;
+		default:
+			g_assert_not_reached ();
+		}
+
+		g_slice_free (FastAsyncData, async_data);
+
+		g_hash_table_remove (private->pending_calls,
+		                     GUINT_TO_POINTER (call_id));
+		return TRUE;
+	}
+#endif
+
 	dbus_g_proxy_cancel_call (data->proxy, data->pending_call);
 	g_hash_table_remove (private->pending_calls,
 	                     GUINT_TO_POINTER (call_id));
@@ -2182,6 +2286,7 @@ tracker_resources_sparql_query_iterate_async (TrackerClient         *client,
 	GInputStream *input_stream;
 	GInputStream *buffered_input_stream;
 	GOutputStream *iterator_output_stream;
+	GCancellable *cancellable;
 	FastAsyncData *async_data;
 
 	g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
@@ -2224,22 +2329,25 @@ tracker_resources_sparql_query_iterate_async (TrackerClient         *client,
 	buffered_input_stream = g_buffered_input_stream_new_sized (input_stream,
 	                                                           TRACKER_STEROIDS_BUFFER_SIZE);
 	iterator_output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
+	cancellable = g_cancellable_new ();
 
+	async_data->client = client;
 	async_data->result_iterator = iterator;
 	async_data->input_stream = buffered_input_stream;
 	async_data->output_stream = iterator_output_stream;
 	async_data->dbus_call = call;
 	async_data->iterator_callback = callback;
 	async_data->user_data = user_data;
+	async_data->request_id = pending_call_new_fast (client, cancellable, async_data);
 
 	g_output_stream_splice_async (iterator_output_stream,
 	                              buffered_input_stream,
 	                              G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
 	                              0,
-	                              NULL,
+	                              cancellable,
 	                              fast_async_callback_iterator,
 	                              async_data);
-	return 42;
+	return async_data->request_id;
 #else
 	FastQueryAsyncCompatData *data;
 
@@ -2311,10 +2419,12 @@ tracker_resources_sparql_update_async (TrackerClient    *client,
 	g_return_val_if_fail (callback != NULL, 0);
 
 	data = g_slice_new0 (FastAsyncData);
+	data->client = client;
 	data->operation = FAST_UPDATE;
 	data->query = query;
 	data->void_callback = callback;
 	data->user_data = user_data;
+	data->request_id = pending_call_new_fast (client, NULL, data);
 
 	sparql_update_fast_async (client, data, &error);
 
@@ -2326,7 +2436,7 @@ tracker_resources_sparql_update_async (TrackerClient    *client,
 		return 0;
 	}
 
-	return 42;
+	return data->request_id;
 #else
 	return tracker_resources_sparql_update_compat_async (client, query, callback, user_data);
 #endif
@@ -2378,10 +2488,12 @@ tracker_resources_sparql_update_blank_async (TrackerClient         *client,
 	g_return_val_if_fail (callback != NULL, 0);
 
 	data = g_slice_new0 (FastAsyncData);
+	data->client = client;
 	data->operation = FAST_UPDATE_BLANK;
 	data->query = query;
 	data->gptrarray_callback = callback;
 	data->user_data = user_data;
+	data->request_id = pending_call_new_fast (client, NULL, data);
 
 	sparql_update_fast_async (client, data, &error);
 
@@ -2393,7 +2505,7 @@ tracker_resources_sparql_update_blank_async (TrackerClient         *client,
 		return 0;
 	}
 
-	return 42;
+	return data->request_id;
 #else
 	return tracker_resources_sparql_update_blank_compat_async (client, query, callback, user_data);
 #endif
@@ -2459,10 +2571,12 @@ tracker_resources_batch_sparql_update_async (TrackerClient    *client,
 	g_return_val_if_fail (callback != NULL, 0);
 
 	data = g_slice_new0 (FastAsyncData);
+	data->client = client;
 	data->operation = FAST_UPDATE_BATCH;
 	data->query = query;
 	data->void_callback = callback;
 	data->user_data = user_data;
+	data->request_id = pending_call_new_fast (client, NULL, data);
 
 	sparql_update_fast_async (client, data, &error);
 
@@ -2474,7 +2588,7 @@ tracker_resources_batch_sparql_update_async (TrackerClient    *client,
 		return 0;
 	}
 
-	return 42;
+	return data->request_id;
 #else
 	return tracker_resources_batch_sparql_update_compat_async (client, query, callback, user_data);
 #endif



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]