[tracker/tracker-store] Move TTL import from tracker-indexer to tracker-store



commit bd401b5c480c6ce96ec0a1cc6f7be0e7de37bd3b
Author: Jürg Billeter <j bitron ch>
Date:   Wed May 27 10:41:38 2009 +0200

    Move TTL import from tracker-indexer to tracker-store
---
 data/dbus/tracker-indexer.xml         |    5 -
 src/tracker-indexer/tracker-indexer.c |   97 +-------------------
 src/tracker-indexer/tracker-indexer.h |    4 -
 src/tracker-store/tracker-resources.c |   42 +++++----
 src/tracker-store/tracker-store.c     |  166 +++++++++++++++++++++++++-------
 src/tracker-store/tracker-store.h     |    6 +
 6 files changed, 162 insertions(+), 158 deletions(-)

diff --git a/data/dbus/tracker-indexer.xml b/data/dbus/tracker-indexer.xml
index 93df228..e9335b8 100644
--- a/data/dbus/tracker-indexer.xml
+++ b/data/dbus/tracker-indexer.xml
@@ -11,11 +11,6 @@
 
 <node name="/">
   <interface name="org.freedesktop.Tracker.Indexer">  
-    <method name="TurtleAdd">
-      <annotation name="org.freedesktop.DBus.GLib.Async" value="true"/>
-      <arg type="s" name="file" direction="in" />
-    </method>
-
     <method name="VolumeDisableAll">
       <annotation name="org.freedesktop.DBus.GLib.Async"
       value="true"/>
diff --git a/src/tracker-indexer/tracker-indexer.c b/src/tracker-indexer/tracker-indexer.c
index e4cba40..db77add 100644
--- a/src/tracker-indexer/tracker-indexer.c
+++ b/src/tracker-indexer/tracker-indexer.c
@@ -21,12 +21,7 @@
 
 /* The indexer works as a state machine, there are 3 different queues:
  *
- * * The update queue: the highest priority one, turtle files waiting for
- *   import and single statements waiting for insertion or deleation are
- *   taken one by one in order to be processed, when this queue is
- *   empty, a single token from the next queue is processed.
- *
- * * The files queue: second highest priority, individual files are
+ * * The files queue: highest priority, individual files are
  *   stored here, waiting for metadata extraction, etc... files are
  *   taken one by one in order to be processed, when this queue is
  *   empty, a single token from the next queue is processed.
@@ -126,7 +121,6 @@ typedef struct MetadataRequest MetadataRequest;
 typedef enum TrackerIndexerState TrackerIndexerState;
 
 struct TrackerIndexerPrivate {
-	GQueue *import_queue;
 	GQueue *dir_queue;
 	GQueue *file_queue;
 	GQueue *modules_queue;
@@ -165,8 +159,6 @@ struct TrackerIndexerPrivate {
 	guint in_process : 1;
 	guint interrupted : 1;
 
-	gboolean turtle_import_in_progress;
-
 	guint state;
 };
 
@@ -607,9 +599,6 @@ tracker_indexer_finalize (GObject *object)
 	g_queue_foreach (priv->file_queue, (GFunc) path_info_free, NULL);
 	g_queue_free (priv->file_queue);
 
-	g_queue_foreach (priv->import_queue, (GFunc) g_free, NULL);
-	g_queue_free (priv->import_queue);
-
 	if (priv->volume_monitor) {
 		g_signal_handlers_disconnect_by_func (priv->volume_monitor,
 						      mount_pre_unmount_cb,
@@ -923,7 +912,6 @@ tracker_indexer_init (TrackerIndexer *indexer)
 	priv->items_processed = 0;
 	priv->in_transaction = FALSE;
 
-	priv->import_queue = g_queue_new ();
 	priv->dir_queue = g_queue_new ();
 	priv->file_queue = g_queue_new ();
 	priv->modules_queue = g_queue_new ();
@@ -984,17 +972,6 @@ tracker_indexer_init (TrackerIndexer *indexer)
 }
 
 static void
-add_turtle_file (TrackerIndexer *indexer,
-                 const gchar    *file)
-{
-	g_queue_push_tail (indexer->private->import_queue,
-		g_strdup (file));
-
-	/* Make sure we are still running */
-	check_started (indexer);
-}
-
-static void
 add_file (TrackerIndexer *indexer,
 	  PathInfo *info)
 {
@@ -1660,51 +1637,11 @@ process_module (TrackerIndexer *indexer,
 	g_list_free (dirs);
 }
 
-static void
-process_turtle_file_part (TrackerIndexer *indexer)
-{
-	int i;
-
-	/* process 100 statements at once before returning to main loop */
-
-	i = 0;
-
-	while (tracker_turtle_reader_next ()) {
-		/* insert statement */
-		tracker_data_insert_statement (
-			tracker_turtle_reader_get_subject (),
-			tracker_turtle_reader_get_predicate (),
-			tracker_turtle_reader_get_object ());
-
-		indexer->private->items_processed++;
-		indexer->private->items_to_index++;
-		i++;
-		if (i >= 100) {
-			/* return to main loop */
-			return;
-		}
-	}
-
-	indexer->private->turtle_import_in_progress = FALSE;
-}
-
-static void
-process_turtle_file (TrackerIndexer *indexer,
-                     const gchar    *file)
-{
-	indexer->private->turtle_import_in_progress = TRUE;
-
-	tracker_turtle_reader_init (file, NULL);
-
-	process_turtle_file_part (indexer);
-}
-
 static gboolean
 process_func (gpointer data)
 {
 	TrackerIndexer *indexer;
 	PathInfo *path;
-	gchar *file;
 
 	indexer = TRACKER_INDEXER (data);
 
@@ -1714,13 +1651,7 @@ process_func (gpointer data)
 		start_transaction (indexer);
 	}
 
-	if (indexer->private->turtle_import_in_progress) {
-		process_turtle_file_part (indexer);
-	} else if ((file = g_queue_pop_head (indexer->private->import_queue)) != NULL) {
-		/* Import file */
-		process_turtle_file (indexer, file);
-		g_free (file);
-	} else if ((path = g_queue_peek_head (indexer->private->file_queue)) != NULL) {
+	if ((path = g_queue_peek_head (indexer->private->file_queue)) != NULL) {
 		/* Process file */
 		if (process_file (indexer, path)) {
 			indexer->private->subelements_processed = 0;
@@ -2095,30 +2026,6 @@ tracker_indexer_process_modules (TrackerIndexer  *indexer,
 }
 
 void
-tracker_indexer_turtle_add (TrackerIndexer *indexer,
-			    const gchar    *file,
-			    DBusGMethodInvocation *context,
-			    GError **error)
-{
-	guint request_id;
-
-	request_id = tracker_dbus_get_next_request_id ();
-
-	tracker_dbus_async_return_if_fail (TRACKER_IS_INDEXER (indexer), context);
-	tracker_dbus_async_return_if_fail (file != NULL, context);
-
-	tracker_dbus_request_new (request_id,
-				  "DBus request to check TTL file %s",
-				  file);
-
-	add_turtle_file (indexer, file);
-
-	dbus_g_method_return (context);
-	tracker_dbus_request_success (request_id);
-}
-
-
-void
 tracker_indexer_files_check (TrackerIndexer *indexer,
 			     const gchar *module_name,
 			     GStrv files)
diff --git a/src/tracker-indexer/tracker-indexer.h b/src/tracker-indexer/tracker-indexer.h
index ccc9756..abe9c31 100644
--- a/src/tracker-indexer/tracker-indexer.h
+++ b/src/tracker-indexer/tracker-indexer.h
@@ -103,10 +103,6 @@ void            tracker_indexer_pause_for_duration  (TrackerIndexer         *ind
 void            tracker_indexer_continue            (TrackerIndexer         *indexer,
 						     DBusGMethodInvocation  *context,
 						     GError                **error);
-void		tracker_indexer_turtle_add	   (TrackerIndexer *indexer,
-						    const gchar             *file,
-						    DBusGMethodInvocation *context,
-						    GError **error);
 void            tracker_indexer_files_check         (TrackerIndexer         *indexer,
 						     const gchar            *module,
 						     GStrv                   files);
diff --git a/src/tracker-store/tracker-resources.c b/src/tracker-store/tracker-resources.c
index dcb4367..b3059d4 100644
--- a/src/tracker-store/tracker-resources.c
+++ b/src/tracker-store/tracker-resources.c
@@ -177,16 +177,33 @@ tracker_resources_delete (TrackerResources	     *self,
 	tracker_dbus_request_success (request_id);
 }
 
+static void
+turtle_import_callback (GError *error, gpointer user_data)
+{
+	TrackerDBusMethodInfo *info = user_data;
+
+	if (error) {
+		tracker_dbus_request_failed (info->request_id,
+					     &error,
+					     NULL);
+		dbus_g_method_return_error (info->context, error);
+		return;
+	}
+
+	dbus_g_method_return (info->context);
+
+	tracker_dbus_request_success (info->request_id);
+}
+
 void
 tracker_resources_load (TrackerResources	 *object,
 			const gchar		 *uri,
 			DBusGMethodInvocation	 *context,
 			GError			**error)
 {
+	TrackerDBusMethodInfo   *info;
 	guint		    request_id;
 	GFile  *file;
-	gchar  *path;
-	GError		   *actual_error = NULL;
 
 	request_id = tracker_dbus_get_next_request_id ();
 
@@ -198,25 +215,16 @@ tracker_resources_load (TrackerResources	 *object,
 				  uri);
 
 	file = g_file_new_for_uri (uri);
-	path = g_file_get_path (file);
 
-	org_freedesktop_Tracker_Indexer_turtle_add (tracker_dbus_indexer_get_proxy (),
-						    path,
-						    &actual_error);
-
-	g_free (path);
-	g_object_unref (file);
+	info = g_slice_new (TrackerDBusMethodInfo);
 
-	if (actual_error) {
-		tracker_dbus_request_failed (request_id, &actual_error, NULL);
-		dbus_g_method_return_error (context, actual_error);
-		g_error_free (actual_error);
-		return;
-	}
+	info->request_id = request_id;
+	info->context = context;
 
-	dbus_g_method_return (context);
+	tracker_store_queue_turtle_import (file, turtle_import_callback,
+	                                   info, destroy_method_info);
 
-	tracker_dbus_request_success (request_id);
+	g_object_unref (file);
 }
 
 void
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
index 8295284..428e943 100644
--- a/src/tracker-store/tracker-store.c
+++ b/src/tracker-store/tracker-store.c
@@ -27,6 +27,7 @@
 
 #include <libtracker-data/tracker-data-update.h>
 #include <libtracker-data/tracker-data-query.h>
+#include <libtracker-data/tracker-turtle.h>
 
 #include "tracker-store.h"
 
@@ -41,19 +42,25 @@ typedef struct {
 
 typedef enum {
 	TRACKER_STORE_TASK_TYPE_UPDATE = 0,
-	TRACKER_STORE_TASK_TYPE_COMMIT = 1
+	TRACKER_STORE_TASK_TYPE_COMMIT = 1,
+	TRACKER_STORE_TASK_TYPE_TURTLE = 2
 } TrackerStoreTaskType;
 
 typedef struct {
 	TrackerStoreTaskType  type;
 	union {
 	  gchar                   *query;
+	  struct {
+		gboolean           in_progress;
+		gchar             *path;
+	  } turtle;
 	} data;
 	gpointer                   user_data;
 	GDestroyNotify             destroy;
 	union {
 		TrackerStoreSparqlUpdateCallback update_callback;
 		TrackerStoreCommitCallback       commit_callback;
+		TrackerStoreTurtleCallback       turtle_callback;
 	} callback;
 } TrackerStoreTask;
 
@@ -71,67 +78,124 @@ private_free (gpointer data)
 static void
 tracker_store_task_free (TrackerStoreTask *task)
 {
-	g_free (task->data.query);
+	if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
+		g_free (task->data.turtle.path);
+	} else {
+		g_free (task->data.query);
+	}
 	g_slice_free (TrackerStoreTask, task);
 }
 
-
 static gboolean
-queue_idle_handler (gpointer user_data)
+process_turtle_file_part (void)
 {
-	TrackerStorePrivate *private = user_data;
-	TrackerStoreTask    *task;
-	GError              *error = NULL;
+	int i;
+
+	/* process 10 statements at once before returning to main loop */
 
-	task = g_queue_pop_head (private->queue);
+	i = 0;
 
-	if (!task) {
-		return FALSE;
+	while (tracker_turtle_reader_next ()) {
+		/* insert statement */
+		tracker_data_insert_statement (
+			tracker_turtle_reader_get_subject (),
+			tracker_turtle_reader_get_predicate (),
+			tracker_turtle_reader_get_object ());
+
+		i++;
+		if (i >= 10) {
+			/* return to main loop */
+			return TRUE;
+		}
 	}
 
-	/* Implicit transaction start */
+	return FALSE;
+}
 
-	if (!private->batch_mode && task->type != TRACKER_STORE_TASK_TYPE_COMMIT) {
+static void
+begin_batch (TrackerStorePrivate *private)
+{
+	if (!private->batch_mode) {
 		/* switch to batch mode
 		   delays database commits to improve performance */
 		tracker_data_begin_transaction ();
 		private->batch_mode = TRUE;
 		private->batch_count = 0;
 	}
+}
 
-	switch (task->type) {
-		case TRACKER_STORE_TASK_TYPE_COMMIT:
-			if (private->batch_mode) {
-				/* commit pending batch items */
-				tracker_data_commit_transaction ();
-				private->batch_mode = FALSE;
-				private->batch_count = 0;
-			}
+static void
+end_batch (TrackerStorePrivate *private)
+{
+	if (private->batch_mode) {
+		/* commit pending batch items */
+		tracker_data_commit_transaction ();
+		private->batch_mode = FALSE;
+		private->batch_count = 0;
+	}
+}
 
-			if (task->callback.commit_callback) {
-				task->callback.commit_callback (task->user_data);
+static gboolean
+queue_idle_handler (gpointer user_data)
+{
+	TrackerStorePrivate *private = user_data;
+	TrackerStoreTask    *task;
+	GError              *error = NULL;
+
+	task = g_queue_peek_head (private->queue);
+	g_return_val_if_fail (task != NULL, FALSE);
+
+	if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
+		begin_batch (private);
+
+		tracker_data_update_sparql (task->data.query, &error);
+		if (!error) {
+			private->batch_count++;
+			if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
+				end_batch (private);
 			}
-			break;
+		}
+
+		if (task->callback.update_callback) {
+			task->callback.update_callback (error, task->user_data);
+		}
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+		end_batch (private);
 
-		case TRACKER_STORE_TASK_TYPE_UPDATE:
-			tracker_data_update_sparql (task->data.query, &error);
-			if (task->callback.update_callback) {
-				task->callback.update_callback (error, task->user_data);
+		if (task->callback.commit_callback) {
+			task->callback.commit_callback (task->user_data);
+		}
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
+		begin_batch (private);
+
+		if (!task->data.turtle.in_progress) {
+			tracker_turtle_reader_init (task->data.turtle.path, NULL);
+			task->data.turtle.in_progress = TRUE;
+		}
+
+		if (process_turtle_file_part ()) {
+			/* import still in progress */
+			private->batch_count++;
+			if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
+				end_batch (private);
 			}
 
-			if (!error) {
-				private->batch_count++;
+			return TRUE;
+		} else {
+			/* import finished */
+			task->data.turtle.in_progress = FALSE;
+
+			end_batch (private);
+
+			if (task->callback.turtle_callback) {
+				task->callback.turtle_callback (error, task->user_data);
 			}
-			break;
-	}
+		}
 
-	if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
-		/* commit pending batch items */
-		tracker_data_commit_transaction ();
-		private->batch_mode = FALSE;
-		private->batch_count = 0;
 	}
 
+	g_queue_pop_head (private->queue);
+
 	if (task->destroy) {
 		task->destroy (task->user_data);
 	}
@@ -142,7 +206,7 @@ queue_idle_handler (gpointer user_data)
 
 	tracker_store_task_free (task);
 
-	return TRUE;
+	return !g_queue_is_empty (private->queue);
 }
 
 static void
@@ -251,6 +315,34 @@ tracker_store_queue_sparql_update (const gchar *sparql,
 }
 
 void
+tracker_store_queue_turtle_import (GFile                      *file,
+				   TrackerStoreTurtleCallback  callback,
+				   gpointer                    user_data,
+				   GDestroyNotify              destroy)
+{
+	TrackerStorePrivate *private;
+	TrackerStoreTask    *task;
+
+	g_return_if_fail (G_IS_FILE (file));
+
+	private = g_static_private_get (&private_key);
+	g_return_if_fail (private != NULL);
+
+	task = g_slice_new0 (TrackerStoreTask);
+	task->type = TRACKER_STORE_TASK_TYPE_TURTLE;
+	task->data.turtle.path = g_file_get_path (file);
+	task->user_data = user_data;
+	task->callback.update_callback = callback;
+	task->destroy = destroy;
+
+	g_queue_push_tail (private->queue, task);
+
+	if (!private->have_handler) {
+		start_handler (private);
+	}
+}
+
+void
 tracker_store_sparql_update (const gchar *sparql,
                              GError     **error)
 {
diff --git a/src/tracker-store/tracker-store.h b/src/tracker-store/tracker-store.h
index c0253e4..0415b12 100644
--- a/src/tracker-store/tracker-store.h
+++ b/src/tracker-store/tracker-store.h
@@ -33,6 +33,8 @@ G_BEGIN_DECLS
 typedef void (* TrackerStoreSparqlUpdateCallback)  (GError          *error,
                                                     gpointer         user_data);
 typedef void (* TrackerStoreCommitCallback)        (gpointer         user_data);
+typedef void (* TrackerStoreTurtleCallback)        (GError          *error,
+                                                    gpointer         user_data);
 
 void         tracker_store_init                   (void);
 void         tracker_store_shutdown               (void);
@@ -43,6 +45,10 @@ void         tracker_store_queue_sparql_update    (const gchar   *sparql,
                                                    TrackerStoreSparqlUpdateCallback callback,
                                                    gpointer       user_data,
                                                    GDestroyNotify destroy);
+void         tracker_store_queue_turtle_import    (GFile         *file,
+                                                   TrackerStoreTurtleCallback callback,
+                                                   gpointer       user_data,
+                                                   GDestroyNotify destroy);
 void         tracker_store_sparql_update          (const gchar   *sparql,
                                                    GError       **error);
 void         tracker_store_insert_statement       (const gchar   *subject,



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]