[tracker/miner-extractor-ipc: 4/4] miner-fs: Pass multiple uris to the extractor to improve D-Bus performance
- From: Philip Van Hoof <pvanhoof src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-extractor-ipc: 4/4] miner-fs: Pass multiple uris to the extractor to improve D-Bus performance
- Date: Thu, 24 Feb 2011 16:50:42 +0000 (UTC)
commit dd067259c99042e139f6774142ae0569f4f72054
Author: Philip Van Hoof <philip codeminded be>
Date: Thu Feb 24 17:37:37 2011 +0100
miner-fs: Pass multiple uris to the extractor to improve D-Bus performance
src/miners/fs/tracker-miner-files.c | 221 ++++++++++++++++++-----------------
1 files changed, 111 insertions(+), 110 deletions(-)
---
diff --git a/src/miners/fs/tracker-miner-files.c b/src/miners/fs/tracker-miner-files.c
index 03835b1..799ea49 100644
--- a/src/miners/fs/tracker-miner-files.c
+++ b/src/miners/fs/tracker-miner-files.c
@@ -82,12 +82,10 @@ typedef struct {
GInputStream *unix_input_stream;
GInputStream *buffered_input_stream;
GOutputStream *output_stream;
- GCancellable *cancellable;
- gpointer user_data;
gboolean splice_finished;
gboolean dbus_finished;
GError *error;
- fast_async_cb callback;
+ GPtrArray *items;
} SendAndSpliceData;
typedef struct {
@@ -2051,72 +2049,88 @@ extractor_skip_embedded_metadata_idle (gpointer user_data)
}
static void
-get_metadata_fast_async_callback (SendAndSpliceData *data)
+flush_extract_queue_async_callback (SendAndSpliceData *data)
{
if (!data->error) {
const gchar *preupdate = NULL;
const gchar *sparql = NULL;
- const gchar *buffer;
+ const gchar *buffer, *buffer_p;
gssize buffer_size;
- gsize preupdate_length;
GError *error = NULL;
- gint32 code;
+ guint i;
buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream));
buffer_size = g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (data->output_stream));
if (buffer_size == 0) {
- error = g_error_new_literal (miner_files_error_quark,
- 0,
+ error = g_error_new_literal (miner_files_error_quark, 0,
"Invalid data received from GetMetadataFast");
+ for (i = 0; i < data->items->len; i++) {
+ ExtractQueueItem *item = g_ptr_array_index (data->items, i);
+ (* item->callback) (preupdate, sparql, error, item->user_data);
+ }
} else {
- code = buffer[0];
-
- if (code == 'r') {
-
- /* This still works atm because we still work one by one at
- * flush_extract_queue_shared. We need to read only one result,
- * no need for a loop yet. */
-
- preupdate = buffer + 1;
- preupdate_length = strnlen (preupdate, buffer_size);
-
- if (preupdate_length < buffer_size && preupdate[buffer_size - 1] == '\0') {
- /* sparql is stored just after preupdate in the original buffer */
- sparql = preupdate + preupdate_length + 1;
+ buffer_p = buffer;
+
+ for (i = 0; i < data->items->len; i++) {
+ ExtractQueueItem *item = g_ptr_array_index (data->items, i);
+ gint32 code;
+ gsize preupdate_length;
+
+ code = buffer_p[0];
+ buffer_p += 1;
+
+ if (code == 'r') {
+ preupdate = buffer_p;
+ preupdate_length = strnlen (preupdate, buffer_size);
+ buffer_p += preupdate_length + 1;
+
+ if (preupdate_length < buffer_size) {
+ gint len;
+ /* sparql is stored just after preupdate in the original buffer */
+ sparql = preupdate + preupdate_length + 1;
+ len = strnlen (sparql, buffer_size);
+ buffer_p += len + 1;
+ } else {
+ guint y;
+ preupdate = NULL;
+ error = g_error_new_literal (miner_files_error_quark, 0,
+ "Invalid data received from GetMetadataFast");
+ for (y = i; y < data->items->len; y++) {
+ ExtractQueueItem *item = g_ptr_array_index (data->items, y);
+ (* item->callback) (preupdate, sparql, error, item->user_data);
+ }
+ break;
+ }
} else {
- preupdate = NULL;
- error = g_error_new_literal (miner_files_error_quark,
- 0,
- "Invalid data received from GetMetadataFast");
+ const gchar *error_message = buffer_p;
+ error = g_error_new_literal (miner_files_error_quark, 0,
+ error_message);
+ buffer_p += strnlen (error_message, buffer_size) + 1;
}
- } else {
- const gchar *error_message;
- error_message = buffer + 1;
+ (* item->callback) (preupdate, sparql, error, item->user_data);
- error = g_error_new_literal (miner_files_error_quark,
- 0,
- error_message);
+ g_clear_error (&error);
}
}
- (* data->callback) (preupdate, sparql, error, data->user_data);
-
- g_clear_error (&error);
-
} else {
+ guint i;
+
/* D-Bus or request-wide error */
- (* data->callback) (NULL, NULL, data->error, data->user_data);
+
+ for (i = 0; i < data->items->len; i++) {
+ ExtractQueueItem *item = g_ptr_array_index (data->items, i);
+ (* item->callback) (NULL, NULL, data->error, item->user_data);
+ }
}
g_object_unref (data->unix_input_stream);
g_object_unref (data->buffered_input_stream);
g_object_unref (data->output_stream);
- if (data->cancellable) {
- g_object_unref (data->cancellable);
- }
+ g_ptr_array_unref (data->items);
if (data->error) {
g_error_free (data->error);
@@ -2126,9 +2140,9 @@ get_metadata_fast_async_callback (SendAndSpliceData *data)
}
static void
-get_metadata_fast_splice_callback (GObject *source,
- GAsyncResult *result,
- gpointer user_data)
+flush_extract_queue_splice_callback (GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
{
SendAndSpliceData *data = user_data;
GError *error = NULL;
@@ -2146,14 +2160,14 @@ get_metadata_fast_splice_callback (GObject *source,
data->splice_finished = TRUE;
if (data->dbus_finished) {
- get_metadata_fast_async_callback (data);
+ flush_extract_queue_async_callback (data);
}
}
static void
-get_metadata_fast_dbus_callback (GObject *source,
- GAsyncResult *result,
- gpointer user_data)
+flush_extract_queue_dbus_callback (GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
{
SendAndSpliceData *data = user_data;
GDBusMessage *reply;
@@ -2181,29 +2195,39 @@ get_metadata_fast_dbus_callback (GObject *source,
data->dbus_finished = TRUE;
if (data->splice_finished) {
- get_metadata_fast_async_callback (data);
+ flush_extract_queue_async_callback (data);
}
}
+
static void
-get_metadata_fast_async (GDBusConnection *connection,
- const gchar *uri,
- const gchar *mime_type,
- GCancellable *cancellable,
- fast_async_cb callback,
- ProcessFileData *user_data)
+free_queue_item (ExtractQueueItem *item)
+{
+ g_free (item->uri);
+ g_free (item->mime_type);
+ g_object_unref (item->cancellable);
+ g_slice_free (ExtractQueueItem, item);
+}
+
+static void
+flush_extract_queue_shared (TrackerMinerFiles *miner)
{
SendAndSpliceData *data;
GDBusMessage *message;
GUnixFDList *fd_list;
int pipefd[2];
- const gchar *uris[2];
- const gchar *mime_types[2];
+ gchar **uris;
+ gchar **mime_types;
+ GPtrArray *queue;
+ guint i;
- g_return_if_fail (connection);
- g_return_if_fail (uri);
- g_return_if_fail (mime_type);
- g_return_if_fail (callback);
+ queue = miner->private->extract_queue;
+
+ if (!queue || queue->len == 0) {
+ return;
+ }
+
+ miner->private->extract_queue = NULL;
if (pipe (pipefd) < 0) {
g_critical ("Coudln't open pipe");
@@ -2217,11 +2241,14 @@ get_metadata_fast_async (GDBusConnection *connection,
fd_list = g_unix_fd_list_new ();
- uris[0] = uri;
- uris[1] = NULL;
+ uris = g_new0 (gchar *, queue->len + 1);
+ mime_types = g_new0 (gchar *, queue->len + 1);
- mime_types[0] = mime_type;
- mime_types[1] = NULL;
+ for (i = 0; i < queue->len; i++) {
+ ExtractQueueItem *item = g_ptr_array_index (queue, i);
+ uris[i] = item->uri;
+ mime_types[i] = item->mime_type;
+ }
g_dbus_message_set_body (message, g_variant_new ("(^as^ash)",
uris,
@@ -2231,6 +2258,9 @@ get_metadata_fast_async (GDBusConnection *connection,
NULL)));
g_dbus_message_set_unix_fd_list (message, fd_list);
+ g_free (uris);
+ g_free (mime_types);
+
/* We need to close the fd as g_unix_fd_list_append duplicates the fd */
close (pipefd[1]);
@@ -2244,20 +2274,25 @@ get_metadata_fast_async (GDBusConnection *connection,
DBUS_PIPE_BUFFER_SIZE);
data->output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
- if (cancellable) {
- data->cancellable = g_object_ref (cancellable);
- }
+ data->items = queue;
- data->callback = callback;
- data->user_data = user_data;
+ /* Note about timeouts that tracker-extract's alarm() would exit the service's
+ * process resulting in a timeout DBus error message too (if alarm is triggered) */
- g_dbus_connection_send_message_with_reply (connection,
+ g_dbus_connection_send_message_with_reply (miner->private->connection,
message,
G_DBUS_SEND_MESSAGE_FLAGS_NONE,
- -1,
+ G_MAXINT, /* We are pushing a queue
+ of items here, so don't
+ use a arbitrary timeout.
+ We could use queue->len
+ times the default timeout,
+ but apparently doesn't
+ GDBus have a way to get
+ the default timeout :( */
NULL,
- cancellable,
- get_metadata_fast_dbus_callback,
+ NULL,
+ flush_extract_queue_dbus_callback,
data);
g_output_stream_splice_async (data->output_stream,
@@ -2265,47 +2300,13 @@ get_metadata_fast_async (GDBusConnection *connection,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
0,
- data->cancellable,
- get_metadata_fast_splice_callback,
+ NULL,
+ flush_extract_queue_splice_callback,
data);
g_object_unref (message);
}
-static void
-free_queue_item (ExtractQueueItem *item)
-{
- g_free (item->uri);
- g_free (item->mime_type);
- g_object_unref (item->cancellable);
- g_slice_free (ExtractQueueItem, item);
-}
-
-static void
-flush_extract_queue_shared (TrackerMinerFiles *miner)
-{
- guint i;
- GPtrArray *queue = miner->private->extract_queue;
- GDBusConnection *connection = miner->private->connection;
-
- if (!queue || queue->len == 0) {
- return;
- }
-
- for (i = 0; i < queue->len; i++) {
- ExtractQueueItem *item = g_ptr_array_index (queue, i);
-
- get_metadata_fast_async (connection,
- item->uri,
- item->mime_type,
- item->cancellable,
- item->callback,
- item->user_data);
- }
-
- g_ptr_array_remove_range (queue, 0, queue->len);
-}
-
static gboolean
flush_extract_queue_idle (gpointer user_data)
{
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]