[tracker/miner-extractor-ipc] tracker-extract, miner-fs: Allow multiple uris to be passed to extractor



commit 08559fda927b5b6f7893dafbbf1920430d729a79
Author: Philip Van Hoof <philip codeminded be>
Date:   Thu Feb 24 16:25:23 2011 +0100

    tracker-extract, miner-fs: Allow multiple uris to be passed to extractor

 src/miners/fs/tracker-miner-files.c   |  232 ++++++++++++++++-----------------
 src/tracker-extract/tracker-extract.c |  116 ++++++++---------
 2 files changed, 163 insertions(+), 185 deletions(-)
---
diff --git a/src/miners/fs/tracker-miner-files.c b/src/miners/fs/tracker-miner-files.c
index a48a234..fcfc6f5 100644
--- a/src/miners/fs/tracker-miner-files.c
+++ b/src/miners/fs/tracker-miner-files.c
@@ -82,12 +82,10 @@ typedef struct {
 	GInputStream *unix_input_stream;
 	GInputStream *buffered_input_stream;
 	GOutputStream *output_stream;
-	GCancellable *cancellable;
-	gpointer user_data;
 	gboolean splice_finished;
 	gboolean dbus_finished;
 	GError *error;
-	fast_async_cb callback;
+	GPtrArray *items;
 } SendAndSpliceData;
 
 typedef struct {
@@ -2050,84 +2048,89 @@ extractor_skip_embedded_metadata_idle (gpointer user_data)
 	return FALSE;
 }
 
-static guint32
-read_uint32 (const guint8 *data)
-{
-	return data[0] << 24 |
-	       data[1] << 16 |
-	       data[2] << 8 |
-	       data[3];
-}
-
 static void
-get_metadata_fast_async_callback (SendAndSpliceData *data)
+flush_extract_queue_async_callback (SendAndSpliceData *data)
 {
 	if (!data->error) {
 		const gchar *preupdate = NULL;
 		const gchar *sparql = NULL;
-		const gchar *buffer;
+		const gchar *buffer, *buffer_p;
 		gssize buffer_size;
-		gsize preupdate_length;
 		GError *error = NULL;
-		gint32 code;
+		guint i;
 
 		buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream));
 		buffer_size = g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (data->output_stream));
 
 		if (buffer_size == 0) {
-			error = g_error_new_literal (miner_files_error_quark,
-			                             0,
+			error = g_error_new_literal (miner_files_error_quark, 0,
 			                             "Invalid data received from GetMetadataFast");
+			for (i = 0; i < data->items->len; i++) {
+				ExtractQueueItem *item = g_ptr_array_index (data->items, i);
+				(* item->callback) (preupdate, sparql, error, item->user_data);
+			}
 		} else {
-			code = read_uint32 (buffer);
-
-			if (code >= 0) {
-
-				/* This still works atm because we still work one by one at
-				 * flush_extract_queue_shared. We need to read only one result,
-				 * no need for a loop yet. */
-
-				preupdate = buffer + 4;
-				preupdate_length = strnlen (preupdate, buffer_size);
-
-				if (preupdate_length < buffer_size && preupdate[buffer_size - 1] == '\0') {
-					/* sparql is stored just after preupdate in the original buffer */
-					sparql = preupdate + preupdate_length + 1;
+			buffer_p = buffer;
+
+			for (i = 0; i < data->items->len; i++) {
+				ExtractQueueItem *item = g_ptr_array_index (data->items, i);
+				gint32 code;
+				gsize preupdate_length;
+
+				code = buffer_p[0];
+				buffer_p += 1;
+
+				if (code == 'r') {
+					preupdate = buffer_p;
+					preupdate_length = strnlen (preupdate, buffer_size);
+					buffer_p += preupdate_length + 1;
+
+					if (preupdate_length < buffer_size && preupdate[buffer_size - 1] == '\0') {
+						gint len;
+						/* sparql is stored just after preupdate in the original buffer */
+						sparql = preupdate + preupdate_length + 1;
+						len = strnlen (sparql, buffer_size);
+						buffer_p += len + 1;
+					} else {
+						guint y;
+						preupdate = NULL;
+						error = g_error_new_literal (miner_files_error_quark, 0,
+						                             "Invalid data received from GetMetadataFast");
+						for (y = i; y < data->items->len; y++) {
+							ExtractQueueItem *item = g_ptr_array_index (data->items, y);
+							(* item->callback) (preupdate, sparql, error, item->user_data);
+						}
+						break;
+					}
 				} else {
-					preupdate = NULL;
-					error = g_error_new_literal (miner_files_error_quark,
-					                             0,
-					                             "Invalid data received from GetMetadataFast");
+					const gchar *error_message = buffer_p;
+					error = g_error_new_literal (miner_files_error_quark, 0,
+					                             error_message);
+					buffer_p += strnlen (error_message, buffer_size) + 1;
 				}
-			} else {
-				const gchar *error_message;
-				gint32 error_code;
 
-				error_code = read_uint32 (buffer + 4);
-				error_message = buffer + 4 + 4;
+				(* item->callback) (preupdate, sparql, error, item->user_data);
 
-				error = g_error_new_literal (miner_files_error_quark,
-				                             error_code,
-				                             error_message);
+				g_clear_error (&error);
 			}
 		}
 
-		(* data->callback) (preupdate, sparql, error, data->user_data);
-
-		g_clear_error (&error);
-
 	} else {
+		guint i;
+
 		/* D-Bus or request-wide error */
-		(* data->callback) (NULL, NULL, data->error, data->user_data);
+
+		for (i = 0; i < data->items->len; i++) {
+			ExtractQueueItem *item = g_ptr_array_index (data->items, i);
+			(* item->callback) (NULL, NULL, data->error, item->user_data);
+		}
 	}
 
 	g_object_unref (data->unix_input_stream);
 	g_object_unref (data->buffered_input_stream);
 	g_object_unref (data->output_stream);
 
-	if (data->cancellable) {
-		g_object_unref (data->cancellable);
-	}
+	g_ptr_array_unref (data->items);
 
 	if (data->error) {
 		g_error_free (data->error);
@@ -2137,9 +2140,9 @@ get_metadata_fast_async_callback (SendAndSpliceData *data)
 }
 
 static void
-get_metadata_fast_splice_callback (GObject      *source,
-                                   GAsyncResult *result,
-                                   gpointer      user_data)
+flush_extract_queue_splice_callback (GObject      *source,
+                                     GAsyncResult *result,
+                                     gpointer      user_data)
 {
 	SendAndSpliceData *data = user_data;
 	GError *error = NULL;
@@ -2157,14 +2160,14 @@ get_metadata_fast_splice_callback (GObject      *source,
 	data->splice_finished = TRUE;
 
 	if (data->dbus_finished) {
-		get_metadata_fast_async_callback (data);
+		flush_extract_queue_async_callback (data);
 	}
 }
 
 static void
-get_metadata_fast_dbus_callback (GObject      *source,
-                                 GAsyncResult *result,
-                                 gpointer      user_data)
+flush_extract_queue_dbus_callback (GObject      *source,
+                                   GAsyncResult *result,
+                                   gpointer      user_data)
 {
 	SendAndSpliceData *data = user_data;
 	GDBusMessage *reply;
@@ -2192,29 +2195,39 @@ get_metadata_fast_dbus_callback (GObject      *source,
 	data->dbus_finished = TRUE;
 
 	if (data->splice_finished) {
-		get_metadata_fast_async_callback (data);
+		flush_extract_queue_async_callback (data);
 	}
 }
 
+
 static void
-get_metadata_fast_async (GDBusConnection *connection,
-                         const gchar     *uri,
-                         const gchar     *mime_type,
-                         GCancellable    *cancellable,
-                         fast_async_cb    callback,
-                         ProcessFileData *user_data)
+free_queue_item (ExtractQueueItem *item)
+{
+	g_free (item->uri);
+	g_free (item->mime_type);
+	g_object_unref (item->cancellable);
+	g_slice_free (ExtractQueueItem, item);
+}
+
+static void
+flush_extract_queue_shared (TrackerMinerFiles *miner)
 {
 	SendAndSpliceData *data;
 	GDBusMessage *message;
 	GUnixFDList *fd_list;
 	int pipefd[2];
-	const gchar *uris[2];
-	const gchar *mime_types[2];
+	gchar **uris;
+	gchar **mime_types;
+	GPtrArray *queue;
+	guint i;
+
+	queue = miner->private->extract_queue;
 
-	g_return_if_fail (connection);
-	g_return_if_fail (uri);
-	g_return_if_fail (mime_type);
-	g_return_if_fail (callback);
+	if (!queue || queue->len == 0) {
+		return;
+	}
+
+	miner->private->extract_queue = NULL;
 
 	if (pipe (pipefd) < 0) {
 		g_critical ("Coudln't open pipe");
@@ -2228,11 +2241,14 @@ get_metadata_fast_async (GDBusConnection *connection,
 
 	fd_list = g_unix_fd_list_new ();
 
-	uris[0] = uri;
-	uris[1] = NULL;
+	uris = g_new0 (gchar *, queue->len + 1);
+	mime_types = g_new0 (gchar *, queue->len + 1);
 
-	mime_types[0] = mime_type;
-	mime_types[1] = NULL;
+	for (i = 0; i < queue->len; i++) {
+		ExtractQueueItem *item = g_ptr_array_index (queue, i);
+		uris[i] = item->uri;
+		mime_types[i] = item->mime_type;
+	}
 
 	g_dbus_message_set_body (message, g_variant_new ("(^as^ash)",
 	                                                 uris,
@@ -2242,6 +2258,9 @@ get_metadata_fast_async (GDBusConnection *connection,
 	                                                                        NULL)));
 	g_dbus_message_set_unix_fd_list (message, fd_list);
 
+	g_free (uris);
+	g_free (mime_types);
+
 	/* We need to close the fd as g_unix_fd_list_append duplicates the fd */
 
 	close (pipefd[1]);
@@ -2255,20 +2274,25 @@ get_metadata_fast_async (GDBusConnection *connection,
 	                                                                 DBUS_PIPE_BUFFER_SIZE);
 	data->output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
 
-	if (cancellable) {
-		data->cancellable = g_object_ref (cancellable);
-	}
+	data->items = queue;
 
-	data->callback = callback;
-	data->user_data = user_data;
+	/* Note about timeouts that tracker-extract's alarm() would exit the service's
+	 * process resulting in a timeout DBus error message too (if alarm is triggered) */
 
-	g_dbus_connection_send_message_with_reply (connection,
+	g_dbus_connection_send_message_with_reply (miner->private->connection,
 	                                           message,
 	                                           G_DBUS_SEND_MESSAGE_FLAGS_NONE,
-	                                           -1,
+	                                           G_MAXINT, /* We are pushing a queue
+	                                                        of items here, so don't
+	                                                        use a arbitrary timeout.
+	                                                        We could use queue->len
+	                                                        times the default timeout,
+	                                                        but apparently doesn't
+	                                                        GDBus have a way to get
+	                                                        the default timeout :( */
 	                                           NULL,
-	                                           cancellable,
-	                                           get_metadata_fast_dbus_callback,
+	                                           NULL,
+	                                           flush_extract_queue_dbus_callback,
 	                                           data);
 
 	g_output_stream_splice_async (data->output_stream,
@@ -2276,47 +2300,13 @@ get_metadata_fast_async (GDBusConnection *connection,
 	                              G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
 	                              G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
 	                              0,
-	                              data->cancellable,
-	                              get_metadata_fast_splice_callback,
+	                              NULL,
+	                              flush_extract_queue_splice_callback,
 	                              data);
 
 	g_object_unref (message);
 }
 
-static void
-free_queue_item (ExtractQueueItem *item)
-{
-	g_free (item->uri);
-	g_free (item->mime_type);
-	g_object_unref (item->cancellable);
-	g_slice_free (ExtractQueueItem, item);
-}
-
-static void
-flush_extract_queue_shared (TrackerMinerFiles *miner)
-{
-	guint i;
-	GPtrArray *queue = miner->private->extract_queue;
-	GDBusConnection *connection = miner->private->connection;
-
-	if (!queue || queue->len == 0) {
-		return;
-	}
-
-	for (i = 0; i < queue->len; i++) {
-		ExtractQueueItem *item = g_ptr_array_index (queue, i);
-
-		get_metadata_fast_async (connection,
-		                         item->uri,
-		                         item->mime_type,
-		                         item->cancellable,
-		                         item->callback,
-		                         item->user_data);
-	}
-
-	g_ptr_array_remove_range (queue, 0, queue->len);
-}
-
 static gboolean
 flush_extract_queue_idle (gpointer user_data)
 {
diff --git a/src/tracker-extract/tracker-extract.c b/src/tracker-extract/tracker-extract.c
index 75cee36..4c2d502 100644
--- a/src/tracker-extract/tracker-extract.c
+++ b/src/tracker-extract/tracker-extract.c
@@ -806,9 +806,6 @@ handle_method_call_get_metadata_fast (TrackerExtract        *object,
 	priv = TRACKER_EXTRACT_GET_PRIVATE (object);
 
 	tracker_main_quit_timeout_reset ();
-	if (!priv->disable_shutdown) {
-		alarm (MAX_EXTRACT_TIME);
-	}
 
 	unix_output_stream = g_unix_output_stream_new (fd, TRUE);
 	buffered_output_stream = g_buffered_output_stream_new_sized (unix_output_stream,
@@ -822,66 +819,73 @@ handle_method_call_get_metadata_fast (TrackerExtract        *object,
 		const gchar *uri = (const gchar *) uris[i];
 		const gchar *mime = (const gchar *) mimes[i];
 		TrackerSparqlBuilder *sparql, *preupdate;
+		gint len;
+
+		if (!priv->disable_shutdown) {
+			alarm (MAX_EXTRACT_TIME);
+		}
 
 		extracted = get_file_metadata (object, request, NULL, uri, mime, &preupdate, &sparql);
 
+		if (!priv->disable_shutdown) {
+			/* Unset alarm so the extractor doesn't die when it's idle */
+			alarm (0);
+		}
+
 		if (extracted) {
-			gint32 len = tracker_sparql_builder_get_length (sparql);
+			len = tracker_sparql_builder_get_length (sparql);
+		}
 
-			/* Anything >= 0 is error-free reply, 0 just means no metadata
-			 * (which isn't an error) */
+		if (extracted && len > 0) {
+			const gchar *preupdate_str = NULL;
 
-			g_data_output_stream_put_int32  (data_output_stream,
-			                                 len,
-			                                 NULL,
-			                                 &error);
+			g_data_output_stream_put_byte (data_output_stream,
+			                               'r',
+			                               NULL,
+			                               &error);
 
 			if (error) {
 				break;
 			}
 
-			if (len > 0) {
-				const gchar *preupdate_str = NULL;
-
-				if (tracker_sparql_builder_get_length (preupdate) > 0) {
-					preupdate_str = tracker_sparql_builder_get_result (preupdate);
-				}
+			if (tracker_sparql_builder_get_length (preupdate) > 0) {
+				preupdate_str = tracker_sparql_builder_get_result (preupdate);
+			}
 
-				g_data_output_stream_put_string (data_output_stream,
-				                                 preupdate_str ? preupdate_str : "",
-				                                 NULL,
-				                                 &error);
+			g_data_output_stream_put_string (data_output_stream,
+			                                 preupdate_str ? preupdate_str : "",
+			                                 NULL,
+			                                 &error);
 
-				if (error) {
-					break;
-				}
+			if (error) {
+				break;
+			}
 
-				g_data_output_stream_put_byte (data_output_stream,
-				                               0,
-				                               NULL,
-				                               &error);
+			g_data_output_stream_put_byte (data_output_stream,
+			                               0,
+			                               NULL,
+			                               &error);
 
-				if (error) {
-					break;
-				}
+			if (error) {
+				break;
+			}
 
-				g_data_output_stream_put_string (data_output_stream,
-				                                 tracker_sparql_builder_get_result (sparql),
-				                                 NULL,
-				                                 &error);
+			g_data_output_stream_put_string (data_output_stream,
+			                                 tracker_sparql_builder_get_result (sparql),
+			                                 NULL,
+			                                 &error);
 
-				if (error) {
-					break;
-				}
+			if (error) {
+				break;
+			}
 
-				g_data_output_stream_put_byte (data_output_stream,
-				                               0,
-				                               NULL,
-				                               &error);
+			g_data_output_stream_put_byte (data_output_stream,
+			                               0,
+			                               NULL,
+			                               &error);
 
-				if (error) {
-					break;
-				}
+			if (error) {
+				break;
 			}
 
 			g_object_unref (sparql);
@@ -890,11 +894,10 @@ handle_method_call_get_metadata_fast (TrackerExtract        *object,
 		} else {
 			GError *internal_error;
 
-			/* Anything < 0 is error reply, error's message follows */
-			g_data_output_stream_put_int32  (data_output_stream,
-			                                 -1,
-			                                 NULL,
-			                                 &error);
+			g_data_output_stream_put_byte (data_output_stream,
+			                               'e',
+			                               NULL,
+			                               &error);
 
 			if (error) {
 				break;
@@ -904,16 +907,6 @@ handle_method_call_get_metadata_fast (TrackerExtract        *object,
 			                              "Could not get any metadata for uri:'%s' and mime:'%s'",
 			                              uri, mime);
 
-			g_data_output_stream_put_int32  (data_output_stream,
-			                                 internal_error->code,
-			                                 NULL,
-			                                 &error);
-
-			if (error) {
-				g_error_free (internal_error);
-				break;
-			}
-
 			g_data_output_stream_put_string (data_output_stream,
 			                                 internal_error->message,
 			                                 NULL,
@@ -965,11 +958,6 @@ bail_out:
 
 	g_object_unref (fd_list);
 	g_object_unref (reply);
-
-	if (!priv->disable_shutdown) {
-		/* Unset alarm so the extractor doesn't die when it's idle */
-		alarm (0);
-	}
 }
 
 static void



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]