[tracker/miner-extractor-ipc: 4/4] miner-fs: Pass multiple uris to the extractor to improve D-Bus performance



commit dd067259c99042e139f6774142ae0569f4f72054
Author: Philip Van Hoof <philip codeminded be>
Date:   Thu Feb 24 17:37:37 2011 +0100

    miner-fs: Pass multiple uris to the extractor to improve D-Bus performance

 src/miners/fs/tracker-miner-files.c |  221 ++++++++++++++++++-----------------
 1 files changed, 111 insertions(+), 110 deletions(-)
---
diff --git a/src/miners/fs/tracker-miner-files.c b/src/miners/fs/tracker-miner-files.c
index 03835b1..799ea49 100644
--- a/src/miners/fs/tracker-miner-files.c
+++ b/src/miners/fs/tracker-miner-files.c
@@ -82,12 +82,10 @@ typedef struct {
 	GInputStream *unix_input_stream;
 	GInputStream *buffered_input_stream;
 	GOutputStream *output_stream;
-	GCancellable *cancellable;
-	gpointer user_data;
 	gboolean splice_finished;
 	gboolean dbus_finished;
 	GError *error;
-	fast_async_cb callback;
+	GPtrArray *items;
 } SendAndSpliceData;
 
 typedef struct {
@@ -2051,72 +2049,88 @@ extractor_skip_embedded_metadata_idle (gpointer user_data)
 }
 
 static void
-get_metadata_fast_async_callback (SendAndSpliceData *data)
+flush_extract_queue_async_callback (SendAndSpliceData *data)
 {
 	if (!data->error) {
 		const gchar *preupdate = NULL;
 		const gchar *sparql = NULL;
-		const gchar *buffer;
+		const gchar *buffer, *buffer_p;
 		gssize buffer_size;
-		gsize preupdate_length;
 		GError *error = NULL;
-		gint32 code;
+		guint i;
 
 		buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream));
 		buffer_size = g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (data->output_stream));
 
 		if (buffer_size == 0) {
-			error = g_error_new_literal (miner_files_error_quark,
-			                             0,
+			error = g_error_new_literal (miner_files_error_quark, 0,
 			                             "Invalid data received from GetMetadataFast");
+			for (i = 0; i < data->items->len; i++) {
+				ExtractQueueItem *item = g_ptr_array_index (data->items, i);
+				(* item->callback) (preupdate, sparql, error, item->user_data);
+			}
 		} else {
-			code = buffer[0];
-
-			if (code == 'r') {
-
-				/* This still works atm because we still work one by one at
-				 * flush_extract_queue_shared. We need to read only one result,
-				 * no need for a loop yet. */
-
-				preupdate = buffer + 1;
-				preupdate_length = strnlen (preupdate, buffer_size);
-
-				if (preupdate_length < buffer_size && preupdate[buffer_size - 1] == '\0') {
-					/* sparql is stored just after preupdate in the original buffer */
-					sparql = preupdate + preupdate_length + 1;
+			buffer_p = buffer;
+
+			for (i = 0; i < data->items->len; i++) {
+				ExtractQueueItem *item = g_ptr_array_index (data->items, i);
+				gint32 code;
+				gsize preupdate_length;
+
+				code = buffer_p[0];
+				buffer_p += 1;
+
+				if (code == 'r') {
+					preupdate = buffer_p;
+					preupdate_length = strnlen (preupdate, buffer_size);
+					buffer_p += preupdate_length + 1;
+
+					if (preupdate_length < buffer_size) {
+						gint len;
+						/* sparql is stored just after preupdate in the original buffer */
+						sparql = preupdate + preupdate_length + 1;
+						len = strnlen (sparql, buffer_size);
+						buffer_p += len + 1;
+					} else {
+						guint y;
+						preupdate = NULL;
+						error = g_error_new_literal (miner_files_error_quark, 0,
+						                             "Invalid data received from GetMetadataFast");
+						for (y = i; y < data->items->len; y++) {
+							ExtractQueueItem *item = g_ptr_array_index (data->items, y);
+							(* item->callback) (preupdate, sparql, error, item->user_data);
+						}
+						break;
+					}
 				} else {
-					preupdate = NULL;
-					error = g_error_new_literal (miner_files_error_quark,
-					                             0,
-					                             "Invalid data received from GetMetadataFast");
+					const gchar *error_message = buffer_p;
+					error = g_error_new_literal (miner_files_error_quark, 0,
+					                             error_message);
+					buffer_p += strnlen (error_message, buffer_size) + 1;
 				}
-			} else {
-				const gchar *error_message;
 
-				error_message = buffer + 1;
+				(* item->callback) (preupdate, sparql, error, item->user_data);
 
-				error = g_error_new_literal (miner_files_error_quark,
-				                             0,
-				                             error_message);
+				g_clear_error (&error);
 			}
 		}
 
-		(* data->callback) (preupdate, sparql, error, data->user_data);
-
-		g_clear_error (&error);
-
 	} else {
+		guint i;
+
 		/* D-Bus or request-wide error */
-		(* data->callback) (NULL, NULL, data->error, data->user_data);
+
+		for (i = 0; i < data->items->len; i++) {
+			ExtractQueueItem *item = g_ptr_array_index (data->items, i);
+			(* item->callback) (NULL, NULL, data->error, item->user_data);
+		}
 	}
 
 	g_object_unref (data->unix_input_stream);
 	g_object_unref (data->buffered_input_stream);
 	g_object_unref (data->output_stream);
 
-	if (data->cancellable) {
-		g_object_unref (data->cancellable);
-	}
+	g_ptr_array_unref (data->items);
 
 	if (data->error) {
 		g_error_free (data->error);
@@ -2126,9 +2140,9 @@ get_metadata_fast_async_callback (SendAndSpliceData *data)
 }
 
 static void
-get_metadata_fast_splice_callback (GObject      *source,
-                                   GAsyncResult *result,
-                                   gpointer      user_data)
+flush_extract_queue_splice_callback (GObject      *source,
+                                     GAsyncResult *result,
+                                     gpointer      user_data)
 {
 	SendAndSpliceData *data = user_data;
 	GError *error = NULL;
@@ -2146,14 +2160,14 @@ get_metadata_fast_splice_callback (GObject      *source,
 	data->splice_finished = TRUE;
 
 	if (data->dbus_finished) {
-		get_metadata_fast_async_callback (data);
+		flush_extract_queue_async_callback (data);
 	}
 }
 
 static void
-get_metadata_fast_dbus_callback (GObject      *source,
-                                 GAsyncResult *result,
-                                 gpointer      user_data)
+flush_extract_queue_dbus_callback (GObject      *source,
+                                   GAsyncResult *result,
+                                   gpointer      user_data)
 {
 	SendAndSpliceData *data = user_data;
 	GDBusMessage *reply;
@@ -2181,29 +2195,39 @@ get_metadata_fast_dbus_callback (GObject      *source,
 	data->dbus_finished = TRUE;
 
 	if (data->splice_finished) {
-		get_metadata_fast_async_callback (data);
+		flush_extract_queue_async_callback (data);
 	}
 }
 
+
 static void
-get_metadata_fast_async (GDBusConnection *connection,
-                         const gchar     *uri,
-                         const gchar     *mime_type,
-                         GCancellable    *cancellable,
-                         fast_async_cb    callback,
-                         ProcessFileData *user_data)
+free_queue_item (ExtractQueueItem *item)
+{
+	g_free (item->uri);
+	g_free (item->mime_type);
+	g_object_unref (item->cancellable);
+	g_slice_free (ExtractQueueItem, item);
+}
+
+static void
+flush_extract_queue_shared (TrackerMinerFiles *miner)
 {
 	SendAndSpliceData *data;
 	GDBusMessage *message;
 	GUnixFDList *fd_list;
 	int pipefd[2];
-	const gchar *uris[2];
-	const gchar *mime_types[2];
+	gchar **uris;
+	gchar **mime_types;
+	GPtrArray *queue;
+	guint i;
 
-	g_return_if_fail (connection);
-	g_return_if_fail (uri);
-	g_return_if_fail (mime_type);
-	g_return_if_fail (callback);
+	queue = miner->private->extract_queue;
+
+	if (!queue || queue->len == 0) {
+		return;
+	}
+
+	miner->private->extract_queue = NULL;
 
 	if (pipe (pipefd) < 0) {
 		g_critical ("Coudln't open pipe");
@@ -2217,11 +2241,14 @@ get_metadata_fast_async (GDBusConnection *connection,
 
 	fd_list = g_unix_fd_list_new ();
 
-	uris[0] = uri;
-	uris[1] = NULL;
+	uris = g_new0 (gchar *, queue->len + 1);
+	mime_types = g_new0 (gchar *, queue->len + 1);
 
-	mime_types[0] = mime_type;
-	mime_types[1] = NULL;
+	for (i = 0; i < queue->len; i++) {
+		ExtractQueueItem *item = g_ptr_array_index (queue, i);
+		uris[i] = item->uri;
+		mime_types[i] = item->mime_type;
+	}
 
 	g_dbus_message_set_body (message, g_variant_new ("(^as^ash)",
 	                                                 uris,
@@ -2231,6 +2258,9 @@ get_metadata_fast_async (GDBusConnection *connection,
 	                                                                        NULL)));
 	g_dbus_message_set_unix_fd_list (message, fd_list);
 
+	g_free (uris);
+	g_free (mime_types);
+
 	/* We need to close the fd as g_unix_fd_list_append duplicates the fd */
 
 	close (pipefd[1]);
@@ -2244,20 +2274,25 @@ get_metadata_fast_async (GDBusConnection *connection,
 	                                                                 DBUS_PIPE_BUFFER_SIZE);
 	data->output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
 
-	if (cancellable) {
-		data->cancellable = g_object_ref (cancellable);
-	}
+	data->items = queue;
 
-	data->callback = callback;
-	data->user_data = user_data;
+	/* Note about timeouts that tracker-extract's alarm() would exit the service's
+	 * process resulting in a timeout DBus error message too (if alarm is triggered) */
 
-	g_dbus_connection_send_message_with_reply (connection,
+	g_dbus_connection_send_message_with_reply (miner->private->connection,
 	                                           message,
 	                                           G_DBUS_SEND_MESSAGE_FLAGS_NONE,
-	                                           -1,
+	                                           G_MAXINT, /* We are pushing a queue
+	                                                        of items here, so don't
+	                                                        use a arbitrary timeout.
+	                                                        We could use queue->len
+	                                                        times the default timeout,
+	                                                        but apparently doesn't
+	                                                        GDBus have a way to get
+	                                                        the default timeout :( */
 	                                           NULL,
-	                                           cancellable,
-	                                           get_metadata_fast_dbus_callback,
+	                                           NULL,
+	                                           flush_extract_queue_dbus_callback,
 	                                           data);
 
 	g_output_stream_splice_async (data->output_stream,
@@ -2265,47 +2300,13 @@ get_metadata_fast_async (GDBusConnection *connection,
 	                              G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
 	                              G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
 	                              0,
-	                              data->cancellable,
-	                              get_metadata_fast_splice_callback,
+	                              NULL,
+	                              flush_extract_queue_splice_callback,
 	                              data);
 
 	g_object_unref (message);
 }
 
-static void
-free_queue_item (ExtractQueueItem *item)
-{
-	g_free (item->uri);
-	g_free (item->mime_type);
-	g_object_unref (item->cancellable);
-	g_slice_free (ExtractQueueItem, item);
-}
-
-static void
-flush_extract_queue_shared (TrackerMinerFiles *miner)
-{
-	guint i;
-	GPtrArray *queue = miner->private->extract_queue;
-	GDBusConnection *connection = miner->private->connection;
-
-	if (!queue || queue->len == 0) {
-		return;
-	}
-
-	for (i = 0; i < queue->len; i++) {
-		ExtractQueueItem *item = g_ptr_array_index (queue, i);
-
-		get_metadata_fast_async (connection,
-		                         item->uri,
-		                         item->mime_type,
-		                         item->cancellable,
-		                         item->callback,
-		                         item->user_data);
-	}
-
-	g_ptr_array_remove_range (queue, 0, queue->len);
-}
-
 static gboolean
 flush_extract_queue_idle (gpointer user_data)
 {



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]