[tracker/wip/miner-priority-queues: 14/19] libtracker-miner: Use TrackerSparqlBuffer for metadata insertions



commit 92a93bdd774993053a82554264007aae8831e4e3
Author: Carlos Garnacho <carlos lanedo com>
Date:   Tue Jul 5 18:20:55 2011 +0200

    libtracker-miner: Use TrackerSparqlBuffer for metadata insertions

 src/libtracker-miner/tracker-miner-fs.c |  251 +++++++++++++++++--------------
 1 files changed, 139 insertions(+), 112 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index dff0f5e..03ef6f3 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -31,9 +31,9 @@
 #include "tracker-monitor.h"
 #include "tracker-utils.h"
 #include "tracker-thumbnailer.h"
-#include "tracker-miner-fs-processing-pool.h"
 #include "tracker-priority-queue.h"
 #include "tracker-task-pool.h"
+#include "tracker-sparql-buffer.h"
 
 /* If defined will print the tree from GNode while running */
 #ifdef CRAWLED_TREE_ENABLE_TRACE
@@ -205,7 +205,8 @@ struct _TrackerMinerFSPrivate {
 	TrackerTaskPool *task_pool;
 
 	/* Sparql insertion tasks */
-	TrackerProcessingPool *processing_pool;
+	TrackerSparqlBuffer *sparql_buffer;
+	guint sparql_buffer_limit;
 
 	/* URI mtime cache */
 	GFile          *current_mtime_cache_parent;
@@ -282,6 +283,8 @@ enum {
 	PROP_INITIAL_CRAWLING
 };
 
+static void           miner_fs_initable_iface_init        (GInitableIface       *iface);
+
 static void           fs_finalize                         (GObject              *object);
 static void           fs_set_property                     (GObject              *object,
                                                            guint                 prop_id,
@@ -373,9 +376,12 @@ static void           task_pool_cancel_foreach                (gpointer        d
                                                                gpointer        user_data);
 
 
+static GInitableIface* miner_fs_initable_parent_iface;
 static guint signals[LAST_SIGNAL] = { 0, };
 
-G_DEFINE_ABSTRACT_TYPE (TrackerMinerFS, tracker_miner_fs, TRACKER_TYPE_MINER)
+G_DEFINE_ABSTRACT_TYPE_WITH_CODE (TrackerMinerFS, tracker_miner_fs, TRACKER_TYPE_MINER,
+                                  G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE,
+                                                         miner_fs_initable_iface_init));
 
 static void
 tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
@@ -696,10 +702,6 @@ tracker_miner_fs_init (TrackerMinerFS *object)
 
 	/* Create processing pools */
 	priv->task_pool = tracker_task_pool_new (DEFAULT_WAIT_POOL_LIMIT);
-	priv->processing_pool = tracker_processing_pool_new (object,
-	                                                     DEFAULT_WAIT_POOL_LIMIT,
-	                                                     DEFAULT_READY_POOL_LIMIT,
-	                                                     DEFAULT_N_REQUESTS_POOL_LIMIT);
 
 	/* Set up the crawlers now we have config and hal */
 	priv->crawler = tracker_crawler_new ();
@@ -758,6 +760,33 @@ tracker_miner_fs_init (TrackerMinerFS *object)
 	priv->dirs_without_parent = NULL;
 }
 
+static gboolean
+miner_fs_initable_init (GInitable     *initable,
+                        GCancellable  *cancellable,
+                        GError       **error)
+{
+	TrackerMinerFSPrivate *priv;
+	guint limit;
+
+	if (!miner_fs_initable_parent_iface->init (initable, cancellable, error)) {
+		return FALSE;
+	}
+
+	priv = TRACKER_MINER_FS_GET_PRIVATE (initable);
+
+	g_object_get (initable, "processing-pool-ready-limit", &limit, NULL);
+	priv->sparql_buffer = tracker_sparql_buffer_new (tracker_miner_get_connection (TRACKER_MINER (initable)),
+	                                                 limit);
+	return TRUE;
+}
+
+static void
+miner_fs_initable_iface_init (GInitableIface *iface)
+{
+	miner_fs_initable_parent_iface = g_type_interface_peek_parent (iface);
+	iface->init = miner_fs_initable_init;
+}
+
 static void
 fs_finalize (GObject *object)
 {
@@ -806,7 +835,9 @@ fs_finalize (GObject *object)
 	                           NULL);
 	g_object_unref (priv->task_pool);
 
-	tracker_processing_pool_free (priv->processing_pool);
+	if (priv->sparql_buffer) {
+		g_object_unref (priv->sparql_buffer);
+	}
 
 	tracker_priority_queue_foreach (priv->items_moved,
 	                                (GFunc) item_moved_data_free,
@@ -871,12 +902,18 @@ fs_set_property (GObject      *object,
 		                             g_value_get_uint (value));
 		break;
 	case PROP_READY_POOL_LIMIT:
-		tracker_processing_pool_set_ready_limit (fs->priv->processing_pool,
-		                                         g_value_get_uint (value));
+		fs->priv->sparql_buffer_limit = g_value_get_uint (value);
+
+		if (fs->priv->sparql_buffer) {
+			tracker_task_pool_set_limit (TRACKER_TASK_POOL (fs->priv->sparql_buffer),
+			                             fs->priv->sparql_buffer_limit);
+		}
 		break;
 	case PROP_N_REQUESTS_POOL_LIMIT:
+#if 0
 		tracker_processing_pool_set_n_requests_limit (fs->priv->processing_pool,
 		                                              g_value_get_uint (value));
+#endif
 		break;
 	case PROP_MTIME_CHECKING:
 		fs->priv->mtime_checking = g_value_get_boolean (value);
@@ -909,12 +946,13 @@ fs_get_property (GObject    *object,
 		                  tracker_task_pool_get_limit (fs->priv->task_pool));
 		break;
 	case PROP_READY_POOL_LIMIT:
-		g_value_set_uint (value,
-		                  tracker_processing_pool_get_ready_limit (fs->priv->processing_pool));
+		g_value_set_uint (value, fs->priv->sparql_buffer_limit);
 		break;
 	case PROP_N_REQUESTS_POOL_LIMIT:
+#if 0
 		g_value_set_uint (value,
 		                  tracker_processing_pool_get_n_requests_limit (fs->priv->processing_pool));
+#endif
 		break;
 	case PROP_MTIME_CHECKING:
 		g_value_set_boolean (value, fs->priv->mtime_checking);
@@ -1150,48 +1188,50 @@ item_moved_data_free (ItemMovedData *data)
 }
 
 static void
-processing_pool_task_finished_cb (TrackerProcessingTask *task,
-                                  gpointer               user_data,
-                                  const GError          *error)
+sparql_buffer_task_finished_cb (GObject      *object,
+                                GAsyncResult *result,
+                                gpointer      user_data)
 {
 	TrackerMinerFS *fs;
 	TrackerMinerFSPrivate *priv;
+	TrackerTask *task;
+	GError *error = NULL;
 
 	fs = user_data;
 	priv = fs->priv;
 
-	if (error) {
+	if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (result),
+	                                           &error)) {
 		g_critical ("Could not execute sparql: %s", error->message);
 		priv->total_files_notified_error++;
-	} else {
-		if (fs->priv->current_iri_cache_parent) {
-			GFile *parent;
-			GFile *task_file;
-
-			task_file = tracker_processing_task_get_file (task);
-
-			/* Note: parent may be NULL if the file represents
-			 * the root directory of the file system (applies to
-			 * .gvfs mounts also!) */
-			parent = g_file_get_parent (task_file);
-
-			if (parent) {
-				if (g_file_equal (parent, fs->priv->current_iri_cache_parent) &&
-				    g_hash_table_lookup (fs->priv->iri_cache, task_file) == NULL) {
-					/* Item is processed, add an empty element for the processed GFile,
-					 * in case it is again processed before the cache expires
-					 */
-					g_hash_table_insert (fs->priv->iri_cache,
-					                     g_object_ref (task_file),
-					                     NULL);
-				}
+		g_error_free (error);
+	} else if (fs->priv->current_iri_cache_parent) {
+		GFile *parent;
+		GFile *task_file;
+
+		task = g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (result));
+		task_file = tracker_task_get_file (task);
 
-				g_object_unref (parent);
+		/* Note: parent may be NULL if the file represents
+		 * the root directory of the file system (applies to
+		 * .gvfs mounts also!) */
+		parent = g_file_get_parent (task_file);
+
+		if (parent) {
+			if (g_file_equal (parent, fs->priv->current_iri_cache_parent) &&
+			    g_hash_table_lookup (fs->priv->iri_cache, task_file) == NULL) {
+				/* Item is processed, add an empty element for the processed GFile,
+				 * in case it is again processed before the cache expires
+				 */
+				g_hash_table_insert (fs->priv->iri_cache,
+				                     g_object_ref (task_file),
+				                     NULL);
 			}
+
+			g_object_unref (parent);
 		}
 	}
 
-	tracker_processing_task_unref (task);
 	item_queue_handlers_set_up (fs);
 }
 
@@ -1697,7 +1737,7 @@ item_add_or_update_cb (TrackerMinerFS *fs,
                        const GError   *error)
 {
 	UpdateProcessingTaskContext *ctxt;
-	TrackerProcessingTask *task;
+	TrackerTask *sparql_task;
 	GFile *task_file;
 	gchar *uri;
 
@@ -1711,8 +1751,7 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 		g_message ("Could not process '%s': %s", uri, error->message);
 
 		fs->priv->total_files_notified_error++;
-
-		item_queue_handlers_set_up (fs);
+		sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
 	} else {
 		if (ctxt->urn) {
 			gboolean attribute_update_only;
@@ -1756,27 +1795,26 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 				                               ctxt->urn, ctxt->urn,
 				                               tracker_sparql_builder_get_result (ctxt->builder));
 
-				/* Note that set_sparql_string() takes ownership of the passed string */
-				tracker_processing_task_set_sparql_string (task, full_sparql);
+				sparql_task = tracker_sparql_task_new_take_sparql_str (task_file, full_sparql);
 			} else {
 				/* Do not drop graph if only updating attributes, the SPARQL builder
 				 * will already contain the necessary DELETE statements for the properties
 				 * being updated */
-				tracker_processing_task_set_sparql (task, ctxt->builder);
+				sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
 			}
 		} else {
 			g_debug ("Creating new item '%s'", uri);
-			tracker_processing_task_set_sparql (task, ctxt->builder);
+			sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
 		}
+	}
 
-		/* If push_ready_task() returns FALSE, it means the actual db update was delayed,
-		 * and in this case we need to setup queue handlers again */
-		if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
-		                                              task,
-		                                              processing_pool_task_finished_cb,
-		                                              fs)) {
-			item_queue_handlers_set_up (fs);
-		}
+	tracker_sparql_buffer_push (fs->priv->sparql_buffer,
+	                            sparql_task,
+	                            sparql_buffer_task_finished_cb,
+	                            fs);
+
+	if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
+		item_queue_handlers_set_up (fs);
 	}
 
 	tracker_task_unref (extraction_task);
@@ -1875,7 +1913,7 @@ item_remove (TrackerMinerFS *fs,
 {
 	gchar *uri;
 	gchar *mime = NULL;
-	TrackerProcessingTask *task;
+	TrackerTask *task;
 
 	iri_cache_invalidate (fs, file);
 	uri = g_file_get_uri (file);
@@ -1903,22 +1941,17 @@ item_remove (TrackerMinerFS *fs,
 	 * device). */
 
 	/* Add new task to processing pool */
-	task = tracker_processing_task_new (file);
-	tracker_processing_task_set_bulk_operation (task,
-	                                            "DELETE { "
-	                                            "  ?f tracker:available true "
-	                                            "}",
-	                                            TRACKER_BULK_MATCH_EQUALS |
-	                                            TRACKER_BULK_MATCH_CHILDREN);
-
-	/* If push_ready_task() returns FALSE, it means the actual db update was delayed,
-	 * and in this case we need to setup queue handlers again */
-	if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
-	                                              task,
-	                                              processing_pool_task_finished_cb,
-	                                              fs)) {
-		item_queue_handlers_set_up (fs);
-	}
+	task = tracker_sparql_task_new_bulk (file,
+	                                     "DELETE { "
+	                                     "  ?f tracker:available true "
+	                                     "}",
+	                                     TRACKER_BULK_MATCH_EQUALS |
+	                                     TRACKER_BULK_MATCH_CHILDREN);
+
+	tracker_sparql_buffer_push (fs->priv->sparql_buffer,
+	                            task,
+	                            sparql_buffer_task_finished_cb,
+	                            fs);
 
 	/* SECOND:
 	 * Actually remove all resources. This operation is the one which may take
@@ -1926,20 +1959,19 @@ item_remove (TrackerMinerFS *fs,
 	 */
 
 	/* Add new task to processing pool */
-	task = tracker_processing_task_new (file);
-	tracker_processing_task_set_bulk_operation (task,
-	                                            "DELETE { "
-	                                            "  ?f a rdfs:Resource "
-	                                            "}",
-	                                            TRACKER_BULK_MATCH_EQUALS |
-	                                            TRACKER_BULK_MATCH_CHILDREN);
-
-	/* If push_ready_task() returns FALSE, it means the actual db update was delayed,
-	 * and in this case we need to setup queue handlers again */
-	if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
-	                                              task,
-	                                              processing_pool_task_finished_cb,
-	                                              fs)) {
+	task = tracker_sparql_task_new_bulk (file,
+	                                     "DELETE { "
+	                                     "  ?f a rdfs:Resource "
+	                                     "}",
+	                                     TRACKER_BULK_MATCH_EQUALS |
+	                                     TRACKER_BULK_MATCH_CHILDREN);
+
+	tracker_sparql_buffer_push (fs->priv->sparql_buffer,
+	                            task,
+	                            sparql_buffer_task_finished_cb,
+	                            fs);
+
+	if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
 		item_queue_handlers_set_up (fs);
 	}
 
@@ -2132,7 +2164,7 @@ item_move (TrackerMinerFS *fs,
 	GFileInfo *file_info;
 	GString   *sparql;
 	RecursiveMoveData move_data;
-	TrackerProcessingTask *task;
+	TrackerTask *task;
 	gchar *source_iri;
 	gchar *display_name;
 	gboolean source_exists;
@@ -2278,16 +2310,15 @@ item_move (TrackerMinerFS *fs,
 	g_main_loop_unref (move_data.main_loop);
 
 	/* Add new task to processing pool */
-	task = tracker_processing_task_new (file);
-	/* Note that set_sparql_string() takes ownership of the passed string */
-	tracker_processing_task_set_sparql_string (task,
-	                                           g_string_free (sparql, FALSE));
-	/* If push_ready_task() returns FALSE, it means the actual db update was delayed,
-	 * and in this case we need to setup queue handlers again */
-	if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
-	                                              task,
-	                                              processing_pool_task_finished_cb,
-	                                              fs)) {
+	task = tracker_sparql_task_new_take_sparql_str (file,
+	                                                g_string_free (sparql,
+	                                                               FALSE));
+	tracker_sparql_buffer_push (fs->priv->sparql_buffer,
+	                            task,
+	                            sparql_buffer_task_finished_cb,
+	                            fs);
+
+	if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
 		item_queue_handlers_set_up (fs);
 	}
 
@@ -2407,7 +2438,8 @@ should_wait (TrackerMinerFS *fs,
 	GFile *parent;
 
 	/* Is the item already being processed? */
-	if (tracker_task_pool_find (fs->priv->task_pool, file)) {
+	if (tracker_task_pool_find (fs->priv->task_pool, file) ||
+	    tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), file)) {
 		/* Yes, a previous event on same item currently
 		 * being processed */
 		return TRUE;
@@ -2416,7 +2448,8 @@ should_wait (TrackerMinerFS *fs,
 	/* Is the item's parent being processed right now? */
 	parent = g_file_get_parent (file);
 	if (parent) {
-		if (tracker_task_pool_find (fs->priv->task_pool, parent)) {
+		if (tracker_task_pool_find (fs->priv->task_pool, parent) ||
+		    tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), parent)) {
 			/* Yes, a previous event on the parent of this item
 			 * currently being processed */
 			g_object_unref (parent);
@@ -2708,8 +2741,8 @@ item_queue_handlers_cb (gpointer user_data)
 		 * if there was a previous task on the same file we want to
 		 * process now, we want it to get finished before we can go
 		 * on with the queues... */
-		tracker_processing_pool_buffer_flush (fs->priv->processing_pool,
-		                                      "Queue handlers WAIT");
+		tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
+		                             "Queue handlers WAIT");
 
 		return FALSE;
 	}
@@ -2819,13 +2852,13 @@ item_queue_handlers_cb (gpointer user_data)
 		/* Print stats and signal finished */
 		if (!fs->priv->is_crawling &&
 		    tracker_task_pool_get_size (fs->priv->task_pool) == 0 &&
-		    tracker_processing_pool_get_total_task_count (fs->priv->processing_pool) == 0) {
+		    tracker_task_pool_get_size (TRACKER_TASK_POOL (fs->priv->sparql_buffer)) == 0) {
 			process_stop (fs);
 		}
 
 		/* Flush any possible pending update here */
-		tracker_processing_pool_buffer_flush (fs->priv->processing_pool,
-		                                      "Queue handlers NONE");
+		tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
+		                             "Queue handlers NONE");
 
 		tracker_thumbnailer_send ();
 		tracker_albumart_check_cleanup (tracker_miner_get_connection (TRACKER_MINER (fs)));
@@ -2917,13 +2950,7 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
 		return;
 	}
 
-	/* Already sent max number of requests to tracker-store?
-	 * In this case, we also slow down the processing of items, as we don't
-	 * want to keep on extracting if the communication with tracker-store is
-	 * very busy. Note that this is not very likely to happen, as the bottleneck
-	 * during extraction is not the communication with tracker-store.
-	 */
-	if (tracker_processing_pool_n_requests_limit_reached (fs->priv->processing_pool)) {
+	if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
 		return;
 	}
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]