[tracker/miner-extractor-ipc] tracker-extract, miner-fs: Allow multiple uris to be passed to extractor
- From: Philip Van Hoof <pvanhoof src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-extractor-ipc] tracker-extract, miner-fs: Allow multiple uris to be passed to extractor
- Date: Thu, 24 Feb 2011 15:25:53 +0000 (UTC)
commit 08559fda927b5b6f7893dafbbf1920430d729a79
Author: Philip Van Hoof <philip codeminded be>
Date: Thu Feb 24 16:25:23 2011 +0100
tracker-extract, miner-fs: Allow multiple uris to be passed to extractor
src/miners/fs/tracker-miner-files.c | 232 ++++++++++++++++-----------------
src/tracker-extract/tracker-extract.c | 116 ++++++++---------
2 files changed, 163 insertions(+), 185 deletions(-)
---
diff --git a/src/miners/fs/tracker-miner-files.c b/src/miners/fs/tracker-miner-files.c
index a48a234..fcfc6f5 100644
--- a/src/miners/fs/tracker-miner-files.c
+++ b/src/miners/fs/tracker-miner-files.c
@@ -82,12 +82,10 @@ typedef struct {
GInputStream *unix_input_stream;
GInputStream *buffered_input_stream;
GOutputStream *output_stream;
- GCancellable *cancellable;
- gpointer user_data;
gboolean splice_finished;
gboolean dbus_finished;
GError *error;
- fast_async_cb callback;
+ GPtrArray *items;
} SendAndSpliceData;
typedef struct {
@@ -2050,84 +2048,89 @@ extractor_skip_embedded_metadata_idle (gpointer user_data)
return FALSE;
}
-static guint32
-read_uint32 (const guint8 *data)
-{
- return data[0] << 24 |
- data[1] << 16 |
- data[2] << 8 |
- data[3];
-}
-
static void
-get_metadata_fast_async_callback (SendAndSpliceData *data)
+flush_extract_queue_async_callback (SendAndSpliceData *data)
{
if (!data->error) {
const gchar *preupdate = NULL;
const gchar *sparql = NULL;
- const gchar *buffer;
+ const gchar *buffer, *buffer_p;
gssize buffer_size;
- gsize preupdate_length;
GError *error = NULL;
- gint32 code;
+ guint i;
buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream));
buffer_size = g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (data->output_stream));
if (buffer_size == 0) {
- error = g_error_new_literal (miner_files_error_quark,
- 0,
+ error = g_error_new_literal (miner_files_error_quark, 0,
"Invalid data received from GetMetadataFast");
+ for (i = 0; i < data->items->len; i++) {
+ ExtractQueueItem *item = g_ptr_array_index (data->items, i);
+ (* item->callback) (preupdate, sparql, error, item->user_data);
+ }
} else {
- code = read_uint32 (buffer);
-
- if (code >= 0) {
-
- /* This still works atm because we still work one by one at
- * flush_extract_queue_shared. We need to read only one result,
- * no need for a loop yet. */
-
- preupdate = buffer + 4;
- preupdate_length = strnlen (preupdate, buffer_size);
-
- if (preupdate_length < buffer_size && preupdate[buffer_size - 1] == '\0') {
- /* sparql is stored just after preupdate in the original buffer */
- sparql = preupdate + preupdate_length + 1;
+ buffer_p = buffer;
+
+ for (i = 0; i < data->items->len; i++) {
+ ExtractQueueItem *item = g_ptr_array_index (data->items, i);
+ gint32 code;
+ gsize preupdate_length;
+
+ code = buffer_p[0];
+ buffer_p += 1;
+
+ if (code == 'r') {
+ preupdate = buffer_p;
+ preupdate_length = strnlen (preupdate, buffer_size);
+ buffer_p += preupdate_length + 1;
+
+ if (preupdate_length < buffer_size && preupdate[buffer_size - 1] == '\0') {
+ gint len;
+ /* sparql is stored just after preupdate in the original buffer */
+ sparql = preupdate + preupdate_length + 1;
+ len = strnlen (sparql, buffer_size);
+ buffer_p += len + 1;
+ } else {
+ guint y;
+ preupdate = NULL;
+ error = g_error_new_literal (miner_files_error_quark, 0,
+ "Invalid data received from GetMetadataFast");
+ for (y = i; y < data->items->len; y++) {
+ ExtractQueueItem *item = g_ptr_array_index (data->items, y);
+ (* item->callback) (preupdate, sparql, error, item->user_data);
+ }
+ break;
+ }
} else {
- preupdate = NULL;
- error = g_error_new_literal (miner_files_error_quark,
- 0,
- "Invalid data received from GetMetadataFast");
+ const gchar *error_message = buffer_p;
+ error = g_error_new_literal (miner_files_error_quark, 0,
+ error_message);
+ buffer_p += strnlen (error_message, buffer_size) + 1;
}
- } else {
- const gchar *error_message;
- gint32 error_code;
- error_code = read_uint32 (buffer + 4);
- error_message = buffer + 4 + 4;
+ (* item->callback) (preupdate, sparql, error, item->user_data);
- error = g_error_new_literal (miner_files_error_quark,
- error_code,
- error_message);
+ g_clear_error (&error);
}
}
- (* data->callback) (preupdate, sparql, error, data->user_data);
-
- g_clear_error (&error);
-
} else {
+ guint i;
+
/* D-Bus or request-wide error */
- (* data->callback) (NULL, NULL, data->error, data->user_data);
+
+ for (i = 0; i < data->items->len; i++) {
+ ExtractQueueItem *item = g_ptr_array_index (data->items, i);
+ (* item->callback) (NULL, NULL, data->error, item->user_data);
+ }
}
g_object_unref (data->unix_input_stream);
g_object_unref (data->buffered_input_stream);
g_object_unref (data->output_stream);
- if (data->cancellable) {
- g_object_unref (data->cancellable);
- }
+ g_ptr_array_unref (data->items);
if (data->error) {
g_error_free (data->error);
@@ -2137,9 +2140,9 @@ get_metadata_fast_async_callback (SendAndSpliceData *data)
}
static void
-get_metadata_fast_splice_callback (GObject *source,
- GAsyncResult *result,
- gpointer user_data)
+flush_extract_queue_splice_callback (GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
{
SendAndSpliceData *data = user_data;
GError *error = NULL;
@@ -2157,14 +2160,14 @@ get_metadata_fast_splice_callback (GObject *source,
data->splice_finished = TRUE;
if (data->dbus_finished) {
- get_metadata_fast_async_callback (data);
+ flush_extract_queue_async_callback (data);
}
}
static void
-get_metadata_fast_dbus_callback (GObject *source,
- GAsyncResult *result,
- gpointer user_data)
+flush_extract_queue_dbus_callback (GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
{
SendAndSpliceData *data = user_data;
GDBusMessage *reply;
@@ -2192,29 +2195,39 @@ get_metadata_fast_dbus_callback (GObject *source,
data->dbus_finished = TRUE;
if (data->splice_finished) {
- get_metadata_fast_async_callback (data);
+ flush_extract_queue_async_callback (data);
}
}
+
static void
-get_metadata_fast_async (GDBusConnection *connection,
- const gchar *uri,
- const gchar *mime_type,
- GCancellable *cancellable,
- fast_async_cb callback,
- ProcessFileData *user_data)
+free_queue_item (ExtractQueueItem *item)
+{
+ g_free (item->uri);
+ g_free (item->mime_type);
+ g_object_unref (item->cancellable);
+ g_slice_free (ExtractQueueItem, item);
+}
+
+static void
+flush_extract_queue_shared (TrackerMinerFiles *miner)
{
SendAndSpliceData *data;
GDBusMessage *message;
GUnixFDList *fd_list;
int pipefd[2];
- const gchar *uris[2];
- const gchar *mime_types[2];
+ gchar **uris;
+ gchar **mime_types;
+ GPtrArray *queue;
+ guint i;
+
+ queue = miner->private->extract_queue;
- g_return_if_fail (connection);
- g_return_if_fail (uri);
- g_return_if_fail (mime_type);
- g_return_if_fail (callback);
+ if (!queue || queue->len == 0) {
+ return;
+ }
+
+ miner->private->extract_queue = NULL;
if (pipe (pipefd) < 0) {
g_critical ("Coudln't open pipe");
@@ -2228,11 +2241,14 @@ get_metadata_fast_async (GDBusConnection *connection,
fd_list = g_unix_fd_list_new ();
- uris[0] = uri;
- uris[1] = NULL;
+ uris = g_new0 (gchar *, queue->len + 1);
+ mime_types = g_new0 (gchar *, queue->len + 1);
- mime_types[0] = mime_type;
- mime_types[1] = NULL;
+ for (i = 0; i < queue->len; i++) {
+ ExtractQueueItem *item = g_ptr_array_index (queue, i);
+ uris[i] = item->uri;
+ mime_types[i] = item->mime_type;
+ }
g_dbus_message_set_body (message, g_variant_new ("(^as^ash)",
uris,
@@ -2242,6 +2258,9 @@ get_metadata_fast_async (GDBusConnection *connection,
NULL)));
g_dbus_message_set_unix_fd_list (message, fd_list);
+ g_free (uris);
+ g_free (mime_types);
+
/* We need to close the fd as g_unix_fd_list_append duplicates the fd */
close (pipefd[1]);
@@ -2255,20 +2274,25 @@ get_metadata_fast_async (GDBusConnection *connection,
DBUS_PIPE_BUFFER_SIZE);
data->output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
- if (cancellable) {
- data->cancellable = g_object_ref (cancellable);
- }
+ data->items = queue;
- data->callback = callback;
- data->user_data = user_data;
+ /* Note about timeouts that tracker-extract's alarm() would exit the service's
+ * process resulting in a timeout DBus error message too (if alarm is triggered) */
- g_dbus_connection_send_message_with_reply (connection,
+ g_dbus_connection_send_message_with_reply (miner->private->connection,
message,
G_DBUS_SEND_MESSAGE_FLAGS_NONE,
- -1,
+ G_MAXINT, /* We are pushing a queue
+ of items here, so don't
+ use a arbitrary timeout.
+ We could use queue->len
+ times the default timeout,
+ but apparently doesn't
+ GDBus have a way to get
+ the default timeout :( */
NULL,
- cancellable,
- get_metadata_fast_dbus_callback,
+ NULL,
+ flush_extract_queue_dbus_callback,
data);
g_output_stream_splice_async (data->output_stream,
@@ -2276,47 +2300,13 @@ get_metadata_fast_async (GDBusConnection *connection,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
0,
- data->cancellable,
- get_metadata_fast_splice_callback,
+ NULL,
+ flush_extract_queue_splice_callback,
data);
g_object_unref (message);
}
-static void
-free_queue_item (ExtractQueueItem *item)
-{
- g_free (item->uri);
- g_free (item->mime_type);
- g_object_unref (item->cancellable);
- g_slice_free (ExtractQueueItem, item);
-}
-
-static void
-flush_extract_queue_shared (TrackerMinerFiles *miner)
-{
- guint i;
- GPtrArray *queue = miner->private->extract_queue;
- GDBusConnection *connection = miner->private->connection;
-
- if (!queue || queue->len == 0) {
- return;
- }
-
- for (i = 0; i < queue->len; i++) {
- ExtractQueueItem *item = g_ptr_array_index (queue, i);
-
- get_metadata_fast_async (connection,
- item->uri,
- item->mime_type,
- item->cancellable,
- item->callback,
- item->user_data);
- }
-
- g_ptr_array_remove_range (queue, 0, queue->len);
-}
-
static gboolean
flush_extract_queue_idle (gpointer user_data)
{
diff --git a/src/tracker-extract/tracker-extract.c b/src/tracker-extract/tracker-extract.c
index 75cee36..4c2d502 100644
--- a/src/tracker-extract/tracker-extract.c
+++ b/src/tracker-extract/tracker-extract.c
@@ -806,9 +806,6 @@ handle_method_call_get_metadata_fast (TrackerExtract *object,
priv = TRACKER_EXTRACT_GET_PRIVATE (object);
tracker_main_quit_timeout_reset ();
- if (!priv->disable_shutdown) {
- alarm (MAX_EXTRACT_TIME);
- }
unix_output_stream = g_unix_output_stream_new (fd, TRUE);
buffered_output_stream = g_buffered_output_stream_new_sized (unix_output_stream,
@@ -822,66 +819,73 @@ handle_method_call_get_metadata_fast (TrackerExtract *object,
const gchar *uri = (const gchar *) uris[i];
const gchar *mime = (const gchar *) mimes[i];
TrackerSparqlBuilder *sparql, *preupdate;
+ gint len;
+
+ if (!priv->disable_shutdown) {
+ alarm (MAX_EXTRACT_TIME);
+ }
extracted = get_file_metadata (object, request, NULL, uri, mime, &preupdate, &sparql);
+ if (!priv->disable_shutdown) {
+ /* Unset alarm so the extractor doesn't die when it's idle */
+ alarm (0);
+ }
+
if (extracted) {
- gint32 len = tracker_sparql_builder_get_length (sparql);
+ len = tracker_sparql_builder_get_length (sparql);
+ }
- /* Anything >= 0 is error-free reply, 0 just means no metadata
- * (which isn't an error) */
+ if (extracted && len > 0) {
+ const gchar *preupdate_str = NULL;
- g_data_output_stream_put_int32 (data_output_stream,
- len,
- NULL,
- &error);
+ g_data_output_stream_put_byte (data_output_stream,
+ 'r',
+ NULL,
+ &error);
if (error) {
break;
}
- if (len > 0) {
- const gchar *preupdate_str = NULL;
-
- if (tracker_sparql_builder_get_length (preupdate) > 0) {
- preupdate_str = tracker_sparql_builder_get_result (preupdate);
- }
+ if (tracker_sparql_builder_get_length (preupdate) > 0) {
+ preupdate_str = tracker_sparql_builder_get_result (preupdate);
+ }
- g_data_output_stream_put_string (data_output_stream,
- preupdate_str ? preupdate_str : "",
- NULL,
- &error);
+ g_data_output_stream_put_string (data_output_stream,
+ preupdate_str ? preupdate_str : "",
+ NULL,
+ &error);
- if (error) {
- break;
- }
+ if (error) {
+ break;
+ }
- g_data_output_stream_put_byte (data_output_stream,
- 0,
- NULL,
- &error);
+ g_data_output_stream_put_byte (data_output_stream,
+ 0,
+ NULL,
+ &error);
- if (error) {
- break;
- }
+ if (error) {
+ break;
+ }
- g_data_output_stream_put_string (data_output_stream,
- tracker_sparql_builder_get_result (sparql),
- NULL,
- &error);
+ g_data_output_stream_put_string (data_output_stream,
+ tracker_sparql_builder_get_result (sparql),
+ NULL,
+ &error);
- if (error) {
- break;
- }
+ if (error) {
+ break;
+ }
- g_data_output_stream_put_byte (data_output_stream,
- 0,
- NULL,
- &error);
+ g_data_output_stream_put_byte (data_output_stream,
+ 0,
+ NULL,
+ &error);
- if (error) {
- break;
- }
+ if (error) {
+ break;
}
g_object_unref (sparql);
@@ -890,11 +894,10 @@ handle_method_call_get_metadata_fast (TrackerExtract *object,
} else {
GError *internal_error;
- /* Anything < 0 is error reply, error's message follows */
- g_data_output_stream_put_int32 (data_output_stream,
- -1,
- NULL,
- &error);
+ g_data_output_stream_put_byte (data_output_stream,
+ 'e',
+ NULL,
+ &error);
if (error) {
break;
@@ -904,16 +907,6 @@ handle_method_call_get_metadata_fast (TrackerExtract *object,
"Could not get any metadata for uri:'%s' and mime:'%s'",
uri, mime);
- g_data_output_stream_put_int32 (data_output_stream,
- internal_error->code,
- NULL,
- &error);
-
- if (error) {
- g_error_free (internal_error);
- break;
- }
-
g_data_output_stream_put_string (data_output_stream,
internal_error->message,
NULL,
@@ -965,11 +958,6 @@ bail_out:
g_object_unref (fd_list);
g_object_unref (reply);
-
- if (!priv->disable_shutdown) {
- /* Unset alarm so the extractor doesn't die when it's idle */
- alarm (0);
- }
}
static void
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]