[tracker] TrackerMinerFS: make it possible to process files parallelly.



commit 5d2e503a523978055eb5bec2ee7b25187c48b42c
Author: Carlos Garnacho <carlos lanedo com>
Date:   Wed Oct 7 17:35:55 2009 +0200

    TrackerMinerFS: make it possible to process files parallelly.
    
    Now there is a pool of files being processed, controlled by the
    TrackerMinerFS::process-pool-limit (default value of 1), which
    specifies the maximum number of files that can be processed at
    the same time. Code flow has changed so no new files are processed
    until there is room in the pool.

 src/libtracker-miner/tracker-miner-fs.c |  149 ++++++++++++++++++++++--------
 1 files changed, 109 insertions(+), 40 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 5731425..a5a009e 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -51,6 +51,11 @@ typedef struct {
 	gboolean  recurse;
 } DirectoryData;
 
+typedef struct {
+	GFile *file;
+	GCancellable *cancellable;
+} ProcessData;
+
 struct TrackerMinerFSPrivate {
 	TrackerMonitor *monitor;
 	TrackerCrawler *crawler;
@@ -71,11 +76,11 @@ struct TrackerMinerFSPrivate {
 	guint           crawl_directories_id;
 	guint		item_queues_handler_id;
 
-	GFile          *current_file;
-	GCancellable   *cancellable;
-
 	gdouble         throttle;
 
+	GList          *processing_pool;
+	guint           pool_limit;
+
 	/* Status */
 	guint           been_started : 1;
 	guint           been_crawled : 1;
@@ -114,7 +119,8 @@ enum {
 
 enum {
 	PROP_0,
-	PROP_THROTTLE
+	PROP_THROTTLE,
+	PROP_POOL_LIMIT
 };
 
 static void           fs_finalize                  (GObject        *object);
@@ -217,6 +223,13 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
 							      "Modifier for the indexing speed, 0 is max speed",
 							      0, 1, 0,
 							      G_PARAM_READWRITE));
+	g_object_class_install_property (object_class,
+					 PROP_POOL_LIMIT,
+					 g_param_spec_uint ("process-pool-limit",
+							    "Processing pool limit",
+							    "Number of files that can be concurrently processed",
+							    1, G_MAXUINT, 1,
+							    G_PARAM_READWRITE));
 	/**
 	 * TrackerMinerFS::check-file:
 	 * @miner_fs: the #TrackerMinerFS
@@ -382,6 +395,44 @@ tracker_miner_fs_init (TrackerMinerFS *object)
 	priv->quark_ignore_file = g_quark_from_static_string ("tracker-ignore-file");
 }
 
+static ProcessData *
+process_data_new (GFile        *file,
+		  GCancellable *cancellable)
+{
+	ProcessData *data;
+
+	data = g_slice_new (ProcessData);
+	data->file = g_object_ref (file);
+	data->cancellable = g_object_ref (cancellable);
+
+	return data;
+}
+
+static void
+process_data_free (ProcessData *data)
+{
+	g_object_unref (data->file);
+	g_object_unref (data->cancellable);
+	g_slice_free (ProcessData, data);
+}
+
+static ProcessData *
+process_data_find (TrackerMinerFS *fs,
+		   GFile          *file)
+{
+	GList *l;
+
+	for (l = fs->private->processing_pool; l; l = l->next) {
+		ProcessData *data = l->data;
+
+		if (g_file_equal (data->file, file)) {
+			return data;
+		}
+	}
+
+	return NULL;
+}
+
 static void
 fs_finalize (GObject *object)
 {
@@ -403,15 +454,14 @@ fs_finalize (GObject *object)
 	g_object_unref (priv->crawler);
 	g_object_unref (priv->monitor);
 
-	if (priv->cancellable) {
-		g_object_unref (priv->cancellable);
-	}
-
 	if (priv->directories) {
 		g_list_foreach (priv->directories, (GFunc) directory_data_free, NULL);
 		g_list_free (priv->directories);
 	}
 
+	g_list_foreach (priv->processing_pool, (GFunc) process_data_free, NULL);
+	g_list_free (priv->processing_pool);
+
 	g_queue_foreach (priv->items_moved, (GFunc) item_moved_data_free, NULL);
 	g_queue_free (priv->items_moved);
 
@@ -433,11 +483,16 @@ fs_set_property (GObject      *object,
 		 const GValue *value,
 		 GParamSpec   *pspec)
 {
+	TrackerMinerFS *fs = TRACKER_MINER_FS (object);
+
 	switch (prop_id) {
 	case PROP_THROTTLE:
 		tracker_miner_fs_set_throttle (TRACKER_MINER_FS (object),
 					       g_value_get_double (value));
 		break;
+	case PROP_POOL_LIMIT:
+		fs->private->pool_limit = g_value_get_uint (value);
+		break;
 	default:
 		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
 		break;
@@ -458,6 +513,9 @@ fs_get_property (GObject    *object,
 	case PROP_THROTTLE:
 		g_value_set_double (value, fs->private->throttle);
 		break;
+	case PROP_POOL_LIMIT:
+		g_value_set_uint (value, fs->private->pool_limit);
+		break;
 	default:
 		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
 		break;
@@ -655,6 +713,7 @@ item_add_or_update_cb (TrackerMinerFS       *fs,
 		       const GError         *error,
 		       gpointer              user_data)
 {
+	ProcessData *data;
 	gchar *uri;
 
 	uri = g_file_get_uri (file);
@@ -680,14 +739,12 @@ item_add_or_update_cb (TrackerMinerFS       *fs,
 		}
 	}
 
-	if (fs->private->cancellable) {
-		g_object_unref (fs->private->cancellable);
-		fs->private->cancellable = NULL;
-	}
+	data = process_data_find (fs, file);
 
-	if (fs->private->current_file) {
-		g_object_unref (fs->private->current_file);
-		fs->private->current_file = NULL;
+	if (data) {
+		process_data_free (data);
+		fs->private->processing_pool =
+			g_list_remove (fs->private->processing_pool, data);
 	}
 
 	/* Can be NULL on error */
@@ -697,41 +754,51 @@ item_add_or_update_cb (TrackerMinerFS       *fs,
 
 	g_free (uri);
 
-	/* Processing is now done, continue with other files */
-	item_queue_handlers_set_up (fs);
+	if (g_list_length (fs->private->processing_pool) < fs->private->pool_limit) {
+		/* There is room in the pool for more files */
+		item_queue_handlers_set_up (fs);
+	}
 }
 
 static gboolean
 item_add_or_update (TrackerMinerFS *fs,
 		    GFile          *file)
 {
+	TrackerMinerFSPrivate *priv;
 	TrackerSparqlBuilder *sparql;
+	GCancellable *cancellable;
 	gboolean processing;
 
-	if (fs->private->cancellable) {
-		g_debug ("Cancellable for older operation still around, destroying");
-		g_object_unref (fs->private->cancellable);
-	}
-
-	fs->private->cancellable = g_cancellable_new ();
+	priv = fs->private;
+	cancellable = g_cancellable_new ();
 	sparql = tracker_sparql_builder_new_update ();
 
 	processing = TRACKER_MINER_FS_GET_CLASS (fs)->process_file (fs, file, sparql,
-								    fs->private->cancellable,
+								    cancellable,
 								    item_add_or_update_cb,
 								    NULL);
 
 	if (!processing) {
 		g_object_unref (sparql);
-		g_object_unref (fs->private->cancellable);
-		fs->private->cancellable = NULL;
+		g_object_unref (cancellable);
 
 		return TRUE;
 	} else {
-		fs->private->current_file = g_object_ref (file);
-	}
+		ProcessData *data;
+		guint length;
 
-	return FALSE;
+		data = process_data_new (file, cancellable);
+		priv->processing_pool = g_list_prepend (priv->processing_pool, data);
+		length = g_list_length (priv->processing_pool);
+
+		g_object_unref (cancellable);
+
+		if (length >= priv->pool_limit) {
+			return FALSE;
+		} else {
+			return TRUE;
+		}
+	}
 }
 
 static gboolean
@@ -1015,7 +1082,8 @@ item_queue_handlers_cb (gpointer user_data)
 	switch (queue) {
 	case QUEUE_NONE:
 		/* Print stats and signal finished */
-		if (!fs->private->is_crawling) {
+		if (!fs->private->is_crawling &&
+		    !fs->private->processing_pool) {
 			process_stop (fs);
 		}
 
@@ -1492,7 +1560,7 @@ crawler_finished_cb (TrackerCrawler *crawler,
 	fs->private->total_files_ignored += files_ignored;
 
 	g_message ("%s crawling files after %2.2f seconds",
-		   was_interrupted ? "Stoped" : "Finished",
+		   was_interrupted ? "Stopped" : "Finished",
 		   g_timer_elapsed (fs->private->timer, NULL));
 	g_message ("  Found %d directories, ignored %d directories",
 		   directories_found,
@@ -1671,7 +1739,7 @@ tracker_miner_fs_remove_directory (TrackerMinerFS *fs,
 {
 	TrackerMinerFSPrivate *priv;
 	gboolean return_val = FALSE;
-	GList *dirs;
+	GList *dirs, *pool;
 
 	g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), FALSE);
 	g_return_val_if_fail (G_IS_FILE (file), FALSE);
@@ -1713,14 +1781,15 @@ tracker_miner_fs_remove_directory (TrackerMinerFS *fs,
 	check_files_removal (priv->items_updated, file);
 	check_files_removal (priv->items_created, file);
 
-	if (priv->current_file &&
-	    priv->cancellable &&
-	    (g_file_equal (priv->current_file, file) ||
-	     g_file_has_prefix (priv->current_file, file))) {
-		/* Cancel processing if currently processed file is
-		 * inside the removed directory.
-		 */
-		g_cancellable_cancel (priv->cancellable);
+	pool = fs->private->processing_pool;
+
+	while (pool) {
+		ProcessData *data = pool->data;
+
+		if (g_file_equal (data->file, file) ||
+		    g_file_has_prefix (data->file, file)) {
+			g_cancellable_cancel (data->cancellable);
+		}
 	}
 
 	return return_val;



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]