[tracker/miner-fs-refactor-multi-insert: 2/16] libtracker-miner: Integrate the new processing pool



commit 7a6226bb34e3b499d2474b29c40adcd49a636783
Author: Aleksander Morgado <aleksander lanedo com>
Date:   Thu Oct 14 16:17:23 2010 +0200

    libtracker-miner: Integrate the new processing pool

 src/libtracker-miner/tracker-miner-fs.c |  471 +++++++++++++++++--------------
 1 files changed, 266 insertions(+), 205 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 47965b3..7aa0081 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -30,6 +30,7 @@
 #include "tracker-monitor.h"
 #include "tracker-utils.h"
 #include "tracker-thumbnailer.h"
+#include "tracker-miner-fs-processing-pool.h"
 
 /* If defined will print the tree from GNode while running */
 #undef ENABLE_TREE_DEBUGGING
@@ -38,6 +39,10 @@
 /* If defined will print contents of populated mtime cache while running */
 #undef PRINT_MTIME_CACHE_CONTENTS
 
+/* Default processing pool limits to be set */
+#define DEFAULT_WAIT_POOL_LIMIT 1
+#define DEFAULT_PROCESS_POOL_LIMIT 1
+
 /**
  * SECTION:tracker-miner-fs
  * @short_description: Abstract base class for filesystem miners
@@ -63,13 +68,12 @@ typedef struct {
 } DirectoryData;
 
 typedef struct {
-	GFile *file;
 	gchar *urn;
 	gchar *parent_urn;
 	GCancellable *cancellable;
 	TrackerSparqlBuilder *builder;
 	TrackerMiner *miner;
-} ProcessData;
+} UpdateProcessingTaskContext;
 
 typedef struct {
 	GMainLoop *main_loop;
@@ -126,8 +130,7 @@ struct _TrackerMinerFSPrivate {
 
 	gdouble         throttle;
 
-	GList          *processing_pool;
-	guint           pool_limit;
+	ProcessingPool *processing_pool;
 
 	/* URI mtime cache */
 	GFile          *current_mtime_cache_parent;
@@ -193,7 +196,8 @@ enum {
 enum {
 	PROP_0,
 	PROP_THROTTLE,
-	PROP_POOL_LIMIT,
+	PROP_WAIT_POOL_LIMIT,
+	PROP_PROCESS_POOL_LIMIT,
 	PROP_MTIME_CHECKING,
 	PROP_INITIAL_CRAWLING
 };
@@ -318,11 +322,20 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
 	                                                      0, 1, 0,
 	                                                      G_PARAM_READWRITE));
 	g_object_class_install_property (object_class,
-	                                 PROP_POOL_LIMIT,
+	                                 PROP_WAIT_POOL_LIMIT,
+	                                 g_param_spec_uint ("wait-pool-limit",
+	                                                    "Processing pool limit for WAIT tasks",
+	                                                    "Maximum number of files that can be concurrently "
+	                                                    "processed by the upper layer",
+	                                                    1, G_MAXUINT, DEFAULT_WAIT_POOL_LIMIT,
+	                                                    G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
+	g_object_class_install_property (object_class,
+	                                 PROP_PROCESS_POOL_LIMIT,
 	                                 g_param_spec_uint ("process-pool-limit",
-	                                                    "Processing pool limit",
-	                                                    "Number of files that can be concurrently processed",
-	                                                    1, G_MAXUINT, 1,
+	                                                    "Processing pool limit for PROCESS tasks",
+	                                                    "Maximum number of SPARQL updates that can be merged "
+	                                                    "in a single connection to the store",
+	                                                    1, G_MAXUINT, DEFAULT_PROCESS_POOL_LIMIT,
 	                                                    G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
 	g_object_class_install_property (object_class,
 	                                 PROP_MTIME_CHECKING,
@@ -563,6 +576,11 @@ tracker_miner_fs_init (TrackerMinerFS *object)
 	                                                        (GDestroyNotify) g_free,
 	                                                        (GDestroyNotify) NULL);
 
+	/* Create processing pool */
+	priv->processing_pool = processing_pool_new (tracker_miner_get_connection (TRACKER_MINER (object)),
+	                                             DEFAULT_WAIT_POOL_LIMIT,
+	                                             DEFAULT_PROCESS_POOL_LIMIT);
+
 	/* Set up the crawlers now we have config and hal */
 	priv->crawler = tracker_crawler_new ();
 
@@ -619,84 +637,6 @@ tracker_miner_fs_init (TrackerMinerFS *object)
 	priv->dirs_without_parent = NULL;
 }
 
-static ProcessData *
-process_data_new (TrackerMiner         *miner,
-                  GFile                *file,
-                  const gchar          *urn,
-                  const gchar          *parent_urn,
-                  GCancellable         *cancellable,
-                  TrackerSparqlBuilder *builder)
-{
-	ProcessData *data;
-
-	data = g_slice_new0 (ProcessData);
-	data->miner = miner;
-	data->file = g_object_ref (file);
-	data->urn = g_strdup (urn);
-	data->parent_urn = g_strdup (parent_urn);
-
-	if (cancellable) {
-		data->cancellable = g_object_ref (cancellable);
-	}
-
-	if (builder) {
-		data->builder = g_object_ref (builder);
-	}
-
-	return data;
-}
-
-static void
-process_data_free (ProcessData *data)
-{
-	g_object_unref (data->file);
-	g_free (data->urn);
-	g_free (data->parent_urn);
-
-	if (data->cancellable) {
-		g_object_unref (data->cancellable);
-	}
-
-	if (data->builder) {
-		g_object_unref (data->builder);
-	}
-
-	g_slice_free (ProcessData, data);
-}
-
-static ProcessData *
-process_data_find (TrackerMinerFS *fs,
-                   GFile          *file,
-                   gboolean        path_search)
-{
-	GList *l;
-
-	for (l = fs->private->processing_pool; l; l = l->next) {
-		ProcessData *data = l->data;
-
-		if (!path_search) {
-			/* Different operations for the same file URI could be
-			 * piled up here, each being a different GFile object.
-			 * Miner implementations should really notify on the
-			 * same GFile object that's being passed, so we check for
-			 * pointer equality here, rather than doing path comparisons
-			 */
-			if(data->file == file)
-				return data;
-		} else {
-			/* Note that if there are different GFiles being
-			 * processed for the same file path, we are actually
-			 * returning the first one found, If you want exactly
-			 * the same GFile as the one as input, use the
-			 * process_data_find() method instead */
-			if (g_file_equal (data->file, file))
-				return data;
-		}
-	}
-
-	return NULL;
-}
-
 static void
 fs_finalize (GObject *object)
 {
@@ -737,8 +677,7 @@ fs_finalize (GObject *object)
 	g_queue_foreach (priv->crawled_directories, (GFunc) crawled_directory_data_free, NULL);
 	g_queue_free (priv->crawled_directories);
 
-	g_list_foreach (priv->processing_pool, (GFunc) process_data_free, NULL);
-	g_list_free (priv->processing_pool);
+	processing_pool_free (priv->processing_pool);
 
 	g_queue_foreach (priv->items_moved, (GFunc) item_moved_data_free, NULL);
 	g_queue_free (priv->items_moved);
@@ -785,9 +724,13 @@ fs_set_property (GObject      *object,
 		tracker_miner_fs_set_throttle (TRACKER_MINER_FS (object),
 		                               g_value_get_double (value));
 		break;
-	case PROP_POOL_LIMIT:
-		fs->private->pool_limit = g_value_get_uint (value);
-		g_message ("Miner process pool limit is set to %d", fs->private->pool_limit);
+	case PROP_WAIT_POOL_LIMIT:
+		processing_pool_set_wait_limit (fs->private->processing_pool,
+		                                g_value_get_uint (value));
+		break;
+	case PROP_PROCESS_POOL_LIMIT:
+		processing_pool_set_process_limit (fs->private->processing_pool,
+		                                   g_value_get_uint (value));
 		break;
 	case PROP_MTIME_CHECKING:
 		fs->private->mtime_checking = g_value_get_boolean (value);
@@ -815,8 +758,13 @@ fs_get_property (GObject    *object,
 	case PROP_THROTTLE:
 		g_value_set_double (value, fs->private->throttle);
 		break;
-	case PROP_POOL_LIMIT:
-		g_value_set_uint (value, fs->private->pool_limit);
+	case PROP_WAIT_POOL_LIMIT:
+		g_value_set_uint (value,
+		                  processing_pool_get_wait_limit (fs->private->processing_pool));
+		break;
+	case PROP_PROCESS_POOL_LIMIT:
+		g_value_set_uint (value,
+		                  processing_pool_get_process_limit (fs->private->processing_pool));
 		break;
 	case PROP_MTIME_CHECKING:
 		g_value_set_boolean (value, fs->private->mtime_checking);
@@ -1048,42 +996,39 @@ item_moved_data_free (ItemMovedData *data)
 }
 
 static void
-sparql_update_cb (GObject      *object,
-                  GAsyncResult *result,
-                  gpointer      user_data)
+processing_pool_task_finished_cb (ProcessingTask *task,
+                                  gpointer        user_data,
+                                  const GError   *error)
 {
 	TrackerMinerFS *fs;
 	TrackerMinerFSPrivate *priv;
-	ProcessData *data;
-	GError *error = NULL;
 
-	tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
-
-	data = user_data;
-	fs = TRACKER_MINER_FS (data->miner);
+	fs = user_data;
 	priv = fs->private;
 
 	if (error) {
 		g_critical ("Could not execute sparql: %s", error->message);
 		priv->total_files_notified_error++;
-		g_error_free (error);
 	} else {
 		if (fs->private->current_iri_cache_parent) {
 			GFile *parent;
+			GFile *task_file;
+
+			task_file = processing_task_get_file (task);
 
 			/* Note: parent may be NULL if the file represents
 			 * the root directory of the file system (applies to
 			 * .gvfs mounts also!) */
-			parent = g_file_get_parent (data->file);
+			parent = g_file_get_parent (task_file);
 
 			if (parent) {
 				if (g_file_equal (parent, fs->private->current_iri_cache_parent) &&
-				    g_hash_table_lookup (fs->private->iri_cache, data->file) == NULL) {
+				    g_hash_table_lookup (fs->private->iri_cache, task_file) == NULL) {
 					/* Item is processed, add an empty element for the processed GFile,
 					 * in case it is again processed before the cache expires
 					 */
 					g_hash_table_insert (fs->private->iri_cache,
-					                     g_object_ref (data->file),
+					                     g_object_ref (task_file),
 					                     NULL);
 				}
 
@@ -1092,9 +1037,6 @@ sparql_update_cb (GObject      *object,
 		}
 	}
 
-	priv->processing_pool = g_list_remove (priv->processing_pool, data);
-	process_data_free (data);
-
 	item_queue_handlers_set_up (fs);
 }
 
@@ -1457,30 +1399,80 @@ iri_cache_invalidate (TrackerMinerFS *fs,
 	g_hash_table_remove (fs->private->iri_cache, file);
 }
 
+static UpdateProcessingTaskContext *
+update_process_task_context_new (TrackerMiner         *miner,
+                                 const gchar          *urn,
+                                 const gchar          *parent_urn,
+                                 GCancellable         *cancellable,
+                                 TrackerSparqlBuilder *builder)
+{
+	UpdateProcessingTaskContext *ctxt;
+
+	ctxt = g_slice_new0 (UpdateProcessingTaskContext);
+	ctxt->miner = miner;
+	ctxt->urn = g_strdup (urn);
+	ctxt->parent_urn = g_strdup (parent_urn);
+
+	if (cancellable) {
+		ctxt->cancellable = g_object_ref (cancellable);
+	}
+
+	if (builder) {
+		ctxt->builder = g_object_ref (builder);
+	}
+
+	return ctxt;
+}
+
+static void
+update_process_task_context_free (UpdateProcessingTaskContext *ctxt)
+{
+	g_free (ctxt->urn);
+	g_free (ctxt->parent_urn);
+
+	if (ctxt->cancellable) {
+		g_object_unref (ctxt->cancellable);
+	}
+
+	if (ctxt->builder) {
+		g_object_unref (ctxt->builder);
+	}
+
+	g_slice_free (UpdateProcessingTaskContext, ctxt);
+}
+
 static gboolean
 do_process_file (TrackerMinerFS *fs,
-                 ProcessData    *data)
+                 ProcessingTask *task)
 {
 	TrackerMinerFSPrivate *priv;
 	gboolean processing;
 	gboolean attribute_update_only;
 	gchar *uri;
+	GFile *task_file;
+	UpdateProcessingTaskContext *ctxt;
 
-	uri = g_file_get_uri (data->file);
+	ctxt = processing_task_get_context (task);
+	task_file = processing_task_get_file (task);
+	uri = g_file_get_uri (task_file);
 	priv = fs->private;
 
-	attribute_update_only = GPOINTER_TO_INT (g_object_get_qdata (G_OBJECT (data->file),
+	attribute_update_only = GPOINTER_TO_INT (g_object_get_qdata (G_OBJECT (task_file),
 	                                                             priv->quark_attribute_updated));
 
 	if (!attribute_update_only) {
 		g_debug ("Processing file '%s'...", uri);
 		g_signal_emit (fs, signals[PROCESS_FILE], 0,
-		               data->file, data->builder, data->cancellable,
+		               task_file,
+		               ctxt->builder,
+		               ctxt->cancellable,
 		               &processing);
 	} else {
 		g_debug ("Processing attributes in file '%s'...", uri);
 		g_signal_emit (fs, signals[PROCESS_FILE_ATTRIBUTES], 0,
-		               data->file, data->builder, data->cancellable,
+		               task_file,
+		               ctxt->builder,
+		               ctxt->cancellable,
 		               &processing);
 	}
 
@@ -1488,18 +1480,18 @@ do_process_file (TrackerMinerFS *fs,
 		/* Re-fetch data, since it might have been
 		 * removed in broken implementations
 		 */
-		data = process_data_find (fs, data->file, FALSE);
+		task = processing_pool_find_task (priv->processing_pool, task_file, FALSE);
 
 		g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
 
-		if (!data) {
+		if (!task) {
 			g_critical ("%s has returned FALSE in ::process-file for '%s', "
 			            "but it seems that this file has been processed through "
 			            "tracker_miner_fs_file_notify(), this is an "
 			            "implementation error", G_OBJECT_TYPE_NAME (fs), uri);
 		} else {
-			priv->processing_pool = g_list_remove (priv->processing_pool, data);
-			process_data_free (data);
+			processing_pool_remove_task (priv->processing_pool, task);
+			processing_task_free (task);
 		}
 	}
 
@@ -1510,19 +1502,21 @@ do_process_file (TrackerMinerFS *fs,
 
 static void
 item_add_or_update_cb (TrackerMinerFS *fs,
-                       ProcessData    *data,
+                       ProcessingTask *task,
                        const GError   *error)
 {
+	UpdateProcessingTaskContext *ctxt;
+	GFile *task_file;
 	gchar *uri;
 
-	uri = g_file_get_uri (data->file);
+	ctxt = processing_task_get_context (task);
+	task_file = processing_task_get_file (task);
+	uri = g_file_get_uri (task_file);
 
 	if (error) {
-		ProcessData *first_item_data;
-		GList *last;
+		ProcessingTask *first_item_task;
 
-		last = g_list_last (fs->private->processing_pool);
-		first_item_data = last->data;
+		first_item_task = processing_pool_get_last_wait (fs->private->processing_pool);
 
 		/* Perhaps this is too specific to TrackerMinerFiles, if the extractor
 		 * is choking on some file, the miner will get a timeout for all files
@@ -1530,38 +1524,38 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 		 * is the first one that was added to the processing pool, so we retry
 		 * the others.
 		 */
-		if (data != first_item_data &&
+		if (task != first_item_task &&
 		    (error->code == DBUS_GERROR_NO_REPLY ||
 		     error->code == DBUS_GERROR_TIMEOUT ||
 		     error->code == DBUS_GERROR_TIMED_OUT)) {
 			g_debug ("  Got DBus timeout error on '%s', but it could not be caused by it. Retrying file.", uri);
 
 			/* Reset the TrackerSparqlBuilder */
-			g_object_unref (data->builder);
-			data->builder = tracker_sparql_builder_new_update ();
+			g_object_unref (ctxt->builder);
+			ctxt->builder = tracker_sparql_builder_new_update ();
 
-			do_process_file (fs, data);
+			do_process_file (fs, task);
 		} else {
 			g_message ("Could not process '%s': %s", uri, error->message);
 
 			fs->private->total_files_notified_error++;
-			fs->private->processing_pool =
-				g_list_remove (fs->private->processing_pool, data);
-			process_data_free (data);
+
+			processing_pool_remove_task (fs->private->processing_pool, task);
+			processing_task_free (task);
 
 			item_queue_handlers_set_up (fs);
 		}
 	} else {
 		gchar *full_sparql;
 
-		if (data->urn) {
+		if (ctxt->urn) {
 			gboolean attribute_update_only;
 
-			attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (data->file),
+			attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (task_file),
 			                                                               fs->private->quark_attribute_updated));
 			g_debug ("Updating item '%s' with urn '%s'%s",
 			         uri,
-			         data->urn,
+			         ctxt->urn,
 			         attribute_update_only ? " (attributes only)" : "");
 
 			if (!attribute_update_only) {
@@ -1570,28 +1564,33 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 				full_sparql = g_strdup_printf ("DELETE { GRAPH <%s> { <%s> ?p ?o } } "
 				                               "WHERE { GRAPH <%s> { <%s> ?p ?o FILTER (?p != rdf:type) } } %s",
 				                               TRACKER_MINER_FS_GRAPH_URN,
-				                               data->urn,
+				                               ctxt->urn,
 				                               TRACKER_MINER_FS_GRAPH_URN,
-				                               data->urn,
-				                               tracker_sparql_builder_get_result (data->builder));
+				                               ctxt->urn,
+				                               tracker_sparql_builder_get_result (ctxt->builder));
 			} else {
 				/* Do not drop graph if only updating attributes, the SPARQL builder
 				 * will already contain the necessary DELETE statements for the properties
 				 * being updated */
-				full_sparql = g_strdup (tracker_sparql_builder_get_result (data->builder));
+				full_sparql = g_strdup (tracker_sparql_builder_get_result (ctxt->builder));
 			}
 		} else {
 			g_debug ("Creating new item '%s'", uri);
 
 			/* new file */
-			full_sparql = g_strdup (tracker_sparql_builder_get_result (data->builder));
+			full_sparql = g_strdup (tracker_sparql_builder_get_result (ctxt->builder));
 		}
 
-		tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (fs)),
-		                                        full_sparql,
-		                                        G_PRIORITY_DEFAULT,
-		                                        NULL,
-		                                        sparql_update_cb, data);
+		processing_task_set_sparql (task, full_sparql);
+		/* If process_task() returns FALSE, it means the actual db update was delayed,
+		 * and in this case we need to setup queue handlers again */
+		if (!processing_pool_process_task (fs->private->processing_pool,
+		                                   task,
+		                                   TRUE, /* buffer! */
+		                                   processing_pool_task_finished_cb,
+		                                   fs)) {
+			item_queue_handlers_set_up (fs);
+		}
 		g_free (full_sparql);
 	}
 
@@ -1606,7 +1605,7 @@ item_add_or_update (TrackerMinerFS *fs,
 	TrackerSparqlBuilder *sparql;
 	GCancellable *cancellable;
 	gboolean retval;
-	ProcessData *data;
+	ProcessingTask *task;
 	GFile *parent;
 	const gchar *urn;
 	const gchar *parent_urn = NULL;
@@ -1655,17 +1654,22 @@ item_add_or_update (TrackerMinerFS *fs,
 	 * updated the cache, or without a proper nfo:belongsToContainer */
 	urn = iri_cache_lookup (fs, file, TRUE);
 
-	data = process_data_new (TRACKER_MINER (fs), file, urn, parent_urn, cancellable, sparql);
-	priv->processing_pool = g_list_prepend (priv->processing_pool, data);
-
-	if (do_process_file (fs, data)) {
-		guint length;
-
-		length = g_list_length (priv->processing_pool);
-
+	/* Create task and add it to the pool as a WAIT task (we need to extract
+	 * the file metadata and such) */
+	task = processing_task_new (file);
+	processing_task_set_context (task,
+	                             update_process_task_context_new (TRACKER_MINER (fs),
+	                                                              urn,
+	                                                              parent_urn,
+	                                                              cancellable,
+	                                                              sparql),
+	                             (GFreeFunc) update_process_task_context_free);
+	processing_pool_wait_task (priv->processing_pool, task);
+
+	if (do_process_file (fs, task)) {
 		fs->private->total_files_processed++;
 
-		if (length >= priv->pool_limit) {
+		if (processing_pool_wait_limit_reached (priv->processing_pool)) {
 			retval = FALSE;
 		}
 	}
@@ -1684,7 +1688,7 @@ item_remove (TrackerMinerFS *fs,
 	GString *sparql;
 	gchar *uri;
 	gchar *mime = NULL;
-	ProcessData *data;
+	ProcessingTask *task;
 
 	iri_cache_invalidate (fs, file);
 	uri = g_file_get_uri (file);
@@ -1724,14 +1728,19 @@ item_remove (TrackerMinerFS *fs,
 	                        "}",
 	                        uri);
 
-	data = process_data_new (TRACKER_MINER (fs), file, NULL, NULL, NULL, NULL);
-	fs->private->processing_pool = g_list_prepend (fs->private->processing_pool, data);
+	/* Add new task to processing pool */
+	task = processing_task_new (file);
+	processing_task_set_sparql (task, sparql->str);
+	/* If process_task() returns FALSE, it means the actual db update was delayed,
+	 * and in this case we need to setup queue handlers again */
+	if (!processing_pool_process_task (fs->private->processing_pool,
+	                                   task,
+	                                   FALSE,
+	                                   processing_pool_task_finished_cb,
+	                                   fs)) {
+		item_queue_handlers_set_up (fs);
+	}
 
-	tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (fs)),
-	                                        sparql->str,
-	                                        G_PRIORITY_DEFAULT,
-	                                        NULL,
-	                                        sparql_update_cb, data);
 
 	g_string_free (sparql, TRUE);
 	g_free (uri);
@@ -1923,7 +1932,7 @@ item_move (TrackerMinerFS *fs,
 	GFileInfo *file_info;
 	GString   *sparql;
 	RecursiveMoveData move_data;
-	ProcessData *data;
+	ProcessingTask *task;
 	gchar *source_iri;
 	gchar *display_name;
 	gboolean source_exists;
@@ -2040,14 +2049,18 @@ item_move (TrackerMinerFS *fs,
 
 	g_main_loop_unref (move_data.main_loop);
 
-	data = process_data_new (TRACKER_MINER (fs), file, NULL, NULL, NULL, NULL);
-	fs->private->processing_pool = g_list_prepend (fs->private->processing_pool, data);
-
-	tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (fs)),
-	                                        sparql->str,
-	                                        G_PRIORITY_DEFAULT,
-	                                        NULL,
-	                                        sparql_update_cb, data);
+	/* Add new task to processing pool */
+	task = processing_task_new (file);
+	processing_task_set_sparql (task, sparql->str);
+	/* If process_task() returns FALSE, it means the actual db update was delayed,
+	 * and in this case we need to setup queue handlers again */
+	if (!processing_pool_process_task (fs->private->processing_pool,
+	                                   task,
+	                                   FALSE,
+	                                   processing_pool_task_finished_cb,
+	                                   fs)) {
+		item_queue_handlers_set_up (fs);
+	}
 
 	g_free (uri);
 	g_free (source_uri);
@@ -2153,7 +2166,7 @@ should_wait (TrackerMinerFS *fs,
 	GFile *parent;
 
 	/* Is the item already being processed? */
-	if (process_data_find (fs, file, TRUE)) {
+	if (processing_pool_find_task (fs->private->processing_pool, file, TRUE)) {
 		/* Yes, a previous event on same item currently
 		 * being processed */
 		return TRUE;
@@ -2162,7 +2175,7 @@ should_wait (TrackerMinerFS *fs,
 	/* Is the item's parent being processed right now? */
 	parent = g_file_get_parent (file);
 	if (parent) {
-		if (process_data_find (fs, parent, TRUE)) {
+		if (processing_pool_find_task (fs->private->processing_pool, parent, TRUE)) {
 			/* Yes, a previous event on the parent of this item
 			 * currently being processed */
 			g_object_unref (parent);
@@ -2214,7 +2227,7 @@ item_queue_get_next_file (TrackerMinerFS  *fs,
 		 * info is inserted to the store before the children are
 		 * inspected.
 		 */
-		if (fs->private->processing_pool) {
+		if (processing_pool_get_wait_task_count (fs->private->processing_pool) > 0) {
 			/* Items still being processed */
 			*file = NULL;
 			*source_file = NULL;
@@ -2445,10 +2458,14 @@ item_queue_handlers_cb (gpointer user_data)
 	case QUEUE_NONE:
 		/* Print stats and signal finished */
 		if (!fs->private->is_crawling &&
-		    !fs->private->processing_pool) {
+		    processing_pool_get_wait_task_count (fs->private->processing_pool) == 0 &&
+		    processing_pool_get_process_task_count (fs->private->processing_pool) == 0) {
 			process_stop (fs);
 		}
 
+		/* Flush any possible pending update here */
+		processing_pool_buffer_flush (fs->private->processing_pool);
+
 		tracker_thumbnailer_send ();
 		/* No more files left to process */
 		keep_processing = FALSE;
@@ -2515,7 +2532,7 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
 		return;
 	}
 
-	if (g_list_length (fs->private->processing_pool) >= fs->private->pool_limit) {
+	if (processing_pool_wait_limit_reached (fs->private->processing_pool)) {
 		/* There is no room in the pool for more files */
 		return;
 	}
@@ -3609,6 +3626,26 @@ check_files_removal (GQueue *queue,
 	}
 }
 
+static void
+processing_pool_cancel_foreach (gpointer data,
+                                gpointer user_data)
+{
+	ProcessingTask *task = data;
+	GFile *file = user_data;
+	GFile *task_file;
+	UpdateProcessingTaskContext *ctxt;
+
+	task_file = processing_task_get_file (task);
+	ctxt = processing_task_get_context (task);
+
+	if (ctxt &&
+	    ctxt->cancellable &&
+	    (g_file_equal (task_file, file) ||
+	     g_file_has_prefix (task_file, file))) {
+		g_cancellable_cancel (ctxt->cancellable);
+	}
+}
+
 /**
  * tracker_miner_fs_directory_remove:
  * @fs: a #TrackerMinerFS
@@ -3625,7 +3662,7 @@ tracker_miner_fs_directory_remove (TrackerMinerFS *fs,
 {
 	TrackerMinerFSPrivate *priv;
 	gboolean return_val = FALSE;
-	GList *dirs, *pool;
+	GList *dirs;
 
 	g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), FALSE);
 	g_return_val_if_fail (G_IS_FILE (file), FALSE);
@@ -3683,18 +3720,10 @@ tracker_miner_fs_directory_remove (TrackerMinerFS *fs,
 	check_files_removal (priv->items_updated, file);
 	check_files_removal (priv->items_created, file);
 
-	pool = fs->private->processing_pool;
-
-	while (pool) {
-		ProcessData *data = pool->data;
-
-		if (g_file_equal (data->file, file) ||
-		    g_file_has_prefix (data->file, file)) {
-			g_cancellable_cancel (data->cancellable);
-		}
-
-		pool = pool->next;
-	}
+	/* Cancel all pending tasks on files inside the path given by file */
+	processing_pool_foreach (fs->private->processing_pool,
+	                         processing_pool_cancel_foreach,
+	                         file);
 
 	/* Remove all monitors */
 	tracker_monitor_remove_recursively (fs->private->monitor, file);
@@ -3878,16 +3907,16 @@ tracker_miner_fs_file_notify (TrackerMinerFS *fs,
                               GFile          *file,
                               const GError   *error)
 {
-	ProcessData *data;
+	ProcessingTask *task;
 
 	g_return_if_fail (TRACKER_IS_MINER_FS (fs));
 	g_return_if_fail (G_IS_FILE (file));
 
 	fs->private->total_files_notified++;
 
-	data = process_data_find (fs, file, FALSE);
+	task = processing_pool_find_task (fs->private->processing_pool, file, FALSE);
 
-	if (!data) {
+	if (!task) {
 		gchar *uri;
 
 		uri = g_file_get_uri (file);
@@ -3903,7 +3932,7 @@ tracker_miner_fs_file_notify (TrackerMinerFS *fs,
 		return;
 	}
 
-	item_add_or_update_cb (fs, data, error);
+	item_add_or_update_cb (fs, task, error);
 }
 
 /**
@@ -3982,15 +4011,15 @@ G_CONST_RETURN gchar *
 tracker_miner_fs_get_urn (TrackerMinerFS *fs,
                           GFile          *file)
 {
-	ProcessData *data;
+	ProcessingTask *task;
 
 	g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
 	g_return_val_if_fail (G_IS_FILE (file), NULL);
 
 	/* Check if found in currently processed data */
-	data = process_data_find (fs, file, FALSE);
+	task = processing_pool_find_task (fs->private->processing_pool, file, FALSE);
 
-	if (!data) {
+	if (!task) {
 		gchar *uri;
 
 		uri = g_file_get_uri (file);
@@ -4000,9 +4029,24 @@ tracker_miner_fs_get_urn (TrackerMinerFS *fs,
 		g_free (uri);
 
 		return NULL;
-	}
+	} else {
+		UpdateProcessingTaskContext *ctxt;
+
+		/* We are only storing the URN in the created/updated tasks */
+		ctxt = processing_task_get_context (task);
+		if (!ctxt) {
+			gchar *uri;
+
+			uri = g_file_get_uri (file);
+			g_critical ("File '%s' is being processed, but not as a "
+			            "CREATED/UPDATED task, so cannot get URN",
+			            uri);
+			g_free (uri);
+			return NULL;
+		}
 
-	return data->urn;
+		return ctxt->urn;
+	}
 }
 
 /**
@@ -4055,26 +4099,43 @@ G_CONST_RETURN gchar *
 tracker_miner_fs_get_parent_urn (TrackerMinerFS *fs,
                                  GFile          *file)
 {
-	ProcessData *data;
+	ProcessingTask *task;
 
 	g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
 	g_return_val_if_fail (G_IS_FILE (file), NULL);
 
-	data = process_data_find (fs, file, FALSE);
+	/* Check if found in currently processed data */
+	task = processing_pool_find_task (fs->private->processing_pool, file, FALSE);
 
-	if (!data) {
+	if (!task) {
 		gchar *uri;
 
 		uri = g_file_get_uri (file);
 
 		g_critical ("File '%s' is not being currently processed, "
-		            "so the URN cannot be retrieved.", uri);
+		            "so the parent URN cannot be retrieved.", uri);
 		g_free (uri);
 
 		return NULL;
-	}
+	} else {
+		UpdateProcessingTaskContext *ctxt;
+
+		/* We are only storing the URN in the created/updated tasks */
+		ctxt = processing_task_get_context (task);
+		if (!ctxt) {
+			gchar *uri;
+
+			uri = g_file_get_uri (file);
+			g_critical ("File '%s' is being processed, but not as a "
+			            "CREATED/UPDATED task, so cannot get parent "
+			            "URN",
+			            uri);
+			g_free (uri);
+			return NULL;
+		}
 
-	return data->parent_urn;
+		return ctxt->parent_urn;
+	}
 }
 
 void



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]