[tracker/extractor-dbus-fd: 2/2] FS miner: use the GetMetadataFast of tracker-extract



commit 5e793d69627f43030631efe77e0f76270f712b29
Author: Adrien Bustany <abustany gnome org>
Date:   Wed Jun 30 16:13:49 2010 +0200

    FS miner: use the GetMetadataFast of tracker-extract

 src/miners/fs/Makefile.am           |    1 +
 src/miners/fs/tracker-miner-files.c |  208 ++++++++++++++++++++++++++++++++--
 2 files changed, 196 insertions(+), 13 deletions(-)
---
diff --git a/src/miners/fs/Makefile.am b/src/miners/fs/Makefile.am
index a786e6f..562b911 100644
--- a/src/miners/fs/Makefile.am
+++ b/src/miners/fs/Makefile.am
@@ -13,6 +13,7 @@ INCLUDES =								\
 	-I$(top_builddir)/src/libtracker-client				\
 	$(WARN_CFLAGS)							\
 	$(GMODULE_CFLAGS)						\
+	$(GIO_CFLAGS)							\
 	$(DBUS_CFLAGS)							\
 	$(GCOV_CFLAGS)
 
diff --git a/src/miners/fs/tracker-miner-files.c b/src/miners/fs/tracker-miner-files.c
index e5ed6ae..815aa21 100644
--- a/src/miners/fs/tracker-miner-files.c
+++ b/src/miners/fs/tracker-miner-files.c
@@ -30,6 +30,11 @@
 #include <glib/gi18n.h>
 #include <glib/gstdio.h>
 
+#include <gio/gunixinputstream.h>
+
+#include <dbus/dbus-glib-lowlevel.h>
+#include <dbus/dbus.h>
+
 #include <libtracker-common/tracker-date-time.h>
 #include <libtracker-common/tracker-ontologies.h>
 #include <libtracker-common/tracker-power.h>
@@ -52,6 +57,10 @@
  */
 #define N_DAYS_THRESHOLD 3
 
+#define TRACKER_DBUS_SERVICE_EXTRACT   "org.freedesktop.Tracker1.Extract"
+#define TRACKER_DBUS_PATH_EXTRACT      "/org/freedesktop/Tracker1/Extract"
+#define TRACKER_DBUS_INTERFACE_EXTRACT "org.freedesktop.Tracker1.Extract"
+
 #define TRACKER_MINER_FILES_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_MINER_FILES, TrackerMinerFilesPrivate))
 
 static GQuark miner_files_error_quark = 0;
@@ -63,9 +72,21 @@ struct ProcessFileData {
 	TrackerSparqlBuilder *sparql;
 	GCancellable *cancellable;
 	GFile *file;
+#ifdef HAVE_DBUS_FD_PASSING
+	DBusPendingCall *call;
+#else /* HAVE_DBUS_FD_PASSING */
 	DBusGProxyCall *call;
+#endif /* HAVE_DBUS_FD_PASSING */
 };
 
+typedef struct {
+	ProcessFileData *process_file_data;
+	GInputStream *buffered_input_stream;
+	GInputStream *unix_input_stream;
+	GOutputStream *output_stream;
+	org_freedesktop_Tracker1_Extract_get_metadata_reply callback;
+} FastAsyncData;
+
 struct TrackerMinerFilesPrivate {
 	TrackerConfig *config;
 	TrackerStorage *storage;
@@ -85,6 +106,7 @@ struct TrackerMinerFilesPrivate {
 #endif /* defined(HAVE_UPOWER) || defined(HAVE_HAL) */
 	gulong finished_handler;
 
+	DBusGConnection *connection;
 	DBusGProxy *extractor_proxy;
 
 	GQuark quark_mount_point_uuid;
@@ -160,7 +182,7 @@ static void        trigger_recheck_cb                   (GObject              *g
 static void        index_volumes_changed_cb             (GObject              *gobject,
 							 GParamSpec           *arg1,
 							 gpointer              user_data);
-static DBusGProxy *extractor_create_proxy               (void);
+static DBusGProxy *extractor_create_proxy               (DBusGConnection      *connection);
 static gboolean    miner_files_check_file               (TrackerMinerFS       *fs,
                                                          GFile                *file);
 static gboolean    miner_files_check_directory          (TrackerMinerFS       *fs,
@@ -238,6 +260,7 @@ static void
 tracker_miner_files_init (TrackerMinerFiles *mf)
 {
 	TrackerMinerFilesPrivate *priv;
+	GError *error = NULL;
 
 	priv = mf->private = TRACKER_MINER_FILES_GET_PRIVATE (mf);
 
@@ -272,7 +295,15 @@ tracker_miner_files_init (TrackerMinerFiles *mf)
 	                  mf);
 
 	/* Set up extractor and signals */
-	priv->extractor_proxy = extractor_create_proxy ();
+	priv->connection = dbus_g_bus_get (DBUS_BUS_SESSION, &error);
+
+	if (!priv->connection) {
+		g_critical ("Could not connect to the D-Bus session bus, %s",
+		            error ? error->message : "no error given.");
+		g_error_free (error);
+	}
+
+	priv->extractor_proxy = extractor_create_proxy (priv->connection);
 
 	priv->quark_mount_point_uuid = g_quark_from_static_string ("tracker-mount-point-uuid");
 	priv->quark_directory_config_root = g_quark_from_static_string ("tracker-directory-config-root");
@@ -1772,20 +1803,11 @@ process_file_data_free (ProcessFileData *data)
 }
 
 static DBusGProxy *
-extractor_create_proxy (void)
+extractor_create_proxy (DBusGConnection *connection)
 {
 	DBusGProxy *proxy;
-	DBusGConnection *connection;
-	GError *error = NULL;
 
-	connection = dbus_g_bus_get (DBUS_BUS_SESSION, &error);
-
-	if (!connection) {
-		g_critical ("Could not connect to the D-Bus session bus, %s",
-		            error ? error->message : "no error given.");
-		g_clear_error (&error);
-		return FALSE;
-	}
+	g_return_val_if_fail (connection, NULL);
 
 	/* Get proxy for the extractor */
 	proxy = dbus_g_proxy_new_for_name (connection,
@@ -1885,8 +1907,11 @@ extractor_get_embedded_metadata_cb (DBusGProxy *proxy,
 	tracker_miner_fs_file_notify (TRACKER_MINER_FS (data->miner), data->file, NULL);
 
 	process_file_data_free (data);
+#ifndef HAVE_DBUS_FD_PASSING
+	/* When using DBus FD passing, we let the caller free */
 	g_free (preupdate);
 	g_free (sparql);
+#endif /* HAVE_DBUS_FD_PASSING */
 }
 
 static void
@@ -1896,8 +1921,16 @@ extractor_get_embedded_metadata_cancel (GCancellable    *cancellable,
 	GError *error;
 
 	/* Cancel extractor call */
+#ifdef HAVE_DBUS_FD_PASSING
+	/* The cancellable is also used for the splice_async. If it was cancelled,
+	 * get_metadata_fast_cb will be called anyway, so the FastAsyncData struct
+	 * will be freed properly.
+	 */
+	dbus_pending_call_cancel (data->call);
+#else /* HAVE_DBUS_FD_PASSING */
 	dbus_g_proxy_cancel_call (data->miner->private->extractor_proxy,
 	                          data->call);
+#endif /* HAVE_DBUS_FD_PASSING */
 
 	error = g_error_new_literal (miner_files_error_quark, 0, "Embedded metadata extraction was cancelled");
 	tracker_miner_fs_file_notify (TRACKER_MINER_FS (data->miner), data->file, error);
@@ -1906,16 +1939,165 @@ extractor_get_embedded_metadata_cancel (GCancellable    *cancellable,
 	g_error_free (error);
 }
 
+#ifdef HAVE_DBUS_FD_PASSING
+static void
+get_metadata_fast_cb (GObject      *source_object,
+                      GAsyncResult *result,
+                      gpointer      user_data)
+{
+	FastAsyncData *data;
+	DBusMessage *reply;
+	GError *error = NULL;
+	gsize buffer_size;
+	gchar *preupdate;
+	gchar *sparql = NULL;
+
+	data = user_data;
+
+	buffer_size = g_output_stream_splice_finish (data->output_stream,
+	                                             result,
+	                                             &error);
+
+	g_object_unref (data->buffered_input_stream);
+	g_object_unref (data->unix_input_stream);
+
+	preupdate = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream));
+
+	g_object_unref (data->output_stream);
+
+	if (G_UNLIKELY (error)) {
+		if (error->code != G_IO_ERROR_CANCELLED) {
+			/* ProcessFileData and error are freed in the callback */
+			(* data->callback) (NULL, NULL, NULL, error, data->process_file_data);
+		} else {
+			/* ProcessFileData was freed in
+			 * extractor_get_embedded_metadata_cancel
+			 */
+			g_error_free (error);
+		}
+	} else {
+		DBusPendingCall *call = data->process_file_data->call;
+
+		dbus_pending_call_block (call);
+		reply = dbus_pending_call_steal_reply (call);
+
+		if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) {
+			DBusError dbus_error;
+
+			dbus_error_init (&dbus_error);
+			dbus_set_error_from_message (&dbus_error, reply);
+			dbus_set_g_error (&error, &dbus_error);
+
+			/* callback frees the error */
+			(* data->callback) (NULL, NULL, NULL, error, data->process_file_data);
+
+			dbus_error_free (&dbus_error);
+		} else {
+			if (buffer_size) {
+			/* sparql is stored just after preupdate in the original buffer */
+				sparql = preupdate + strlen (preupdate) + 1;
+			}
+
+			(* data->callback) (NULL, preupdate, sparql, NULL, data->process_file_data);
+			g_free (preupdate);
+		}
+
+		dbus_message_unref (reply);
+	}
+
+	g_slice_free (FastAsyncData, data);
+}
+
+static DBusPendingCall*
+get_metadata_fast_async (DBusConnection  *connection,
+                         const gchar     *uri,
+                         const gchar     *mime_type,
+						 org_freedesktop_Tracker1_Extract_get_metadata_reply callback,
+						 ProcessFileData *user_data)
+{
+	int pipefd[2];
+	DBusMessage *message;
+	DBusPendingCall *call;
+	GInputStream *unix_input_stream;
+	GInputStream *buffered_input_stream;
+	GOutputStream *output_stream;
+	FastAsyncData *data;
+
+	g_return_val_if_fail (connection, NULL);
+	g_return_val_if_fail (uri, NULL);
+	g_return_val_if_fail (mime_type, NULL);
+	g_return_val_if_fail (callback, NULL);
+
+	if (pipe (pipefd) < 0) {
+		g_critical ("Coudln't open pipe");
+		return NULL;
+	}
+
+	message = dbus_message_new_method_call (TRACKER_DBUS_SERVICE_EXTRACT,
+	                                        TRACKER_DBUS_PATH_EXTRACT,
+	                                        TRACKER_DBUS_INTERFACE_EXTRACT,
+	                                        "GetMetadataFast");
+	dbus_message_append_args (message,
+	                          DBUS_TYPE_STRING, &uri,
+	                          DBUS_TYPE_STRING, &mime_type,
+	                          DBUS_TYPE_UNIX_FD, &pipefd[1],
+	                          DBUS_TYPE_INVALID);
+	dbus_connection_send_with_reply (connection,
+	                                 message,
+	                                 &call,
+	                                 -1);
+	dbus_message_unref (message);
+	close (pipefd[1]);
+
+	if (!call) {
+		close (pipefd[0]);
+		g_critical ("FD passing unsupported or connection disconnected");
+		return NULL;
+	}
+
+	unix_input_stream = g_unix_input_stream_new (pipefd[0], TRUE);
+	buffered_input_stream = g_buffered_input_stream_new_sized (unix_input_stream,
+	                                                           64*1024);
+	output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
+
+	data = g_slice_new0 (FastAsyncData);
+	data->process_file_data = user_data;
+	data->buffered_input_stream = buffered_input_stream;
+	data->unix_input_stream = unix_input_stream;
+	data->output_stream = output_stream;
+	data->callback = callback;
+
+	g_output_stream_splice_async (output_stream,
+	                              buffered_input_stream,
+	                              G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+	                              G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+	                              0,
+	                              /*user_data->cancellable*/ NULL,
+	                              get_metadata_fast_cb,
+	                              data);
+
+	return call;
+}
+#endif /* HAVE_DBUS_FD_PASSING */
+
 static void
 extractor_get_embedded_metadata (ProcessFileData *data,
                                  const gchar     *uri,
                                  const gchar     *mime_type)
 {
+#ifdef HAVE_DBUS_FD_PASSING
+	data->call = get_metadata_fast_async (dbus_g_connection_get_connection (data->miner->private->connection),
+	                                      uri,
+	                                      mime_type,
+	                                      extractor_get_embedded_metadata_cb,
+	                                      data);
+#else /* HAVE_DBUS_FD_PASSING */
 	data->call = org_freedesktop_Tracker1_Extract_get_metadata_async (data->miner->private->extractor_proxy,
 	                                                                  uri,
 	                                                                  mime_type,
 	                                                                  extractor_get_embedded_metadata_cb,
 	                                                                  data);
+#endif
 	g_signal_connect (data->cancellable, "cancelled",
 	                  G_CALLBACK (extractor_get_embedded_metadata_cancel), data);
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]