[tracker/miner-priority-queues-0.10: 12/22] tracker-miner-fs: Use TrackerTaskPool for the extraction pool



commit 705ebfd1465761f5fda0a37b072d162ba58e33c8
Author: Carlos Garnacho <carlos lanedo com>
Date:   Mon Jul 4 12:16:31 2011 +0200

    tracker-miner-fs: Use TrackerTaskPool for the extraction pool

 src/libtracker-miner/tracker-miner-fs.c |  148 +++++++++++++++++-------------
 1 files changed, 84 insertions(+), 64 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 7f2b750..5987afe 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -33,6 +33,7 @@
 #include "tracker-thumbnailer.h"
 #include "tracker-miner-fs-processing-pool.h"
 #include "tracker-priority-queue.h"
+#include "tracker-task-pool.h"
 
 /* If defined will print the tree from GNode while running */
 #ifdef CRAWLED_TREE_ENABLE_TRACE
@@ -132,6 +133,7 @@ typedef struct {
 } DirectoryData;
 
 typedef struct {
+	GFile *file;
 	gchar *urn;
 	gchar *parent_urn;
 	GCancellable *cancellable;
@@ -199,6 +201,11 @@ struct _TrackerMinerFSPrivate {
 
 	gdouble         throttle;
 
+	/* Extraction tasks */
+	TrackerTaskPool *task_pool;
+	GList *extraction_tasks;
+
+	/* Sparql insertion tasks */
 	TrackerProcessingPool *processing_pool;
 
 	/* URI mtime cache */
@@ -363,7 +370,7 @@ static void           tracker_miner_fs_directory_add_internal (TrackerMinerFS *f
 static gboolean       miner_fs_has_children_without_parent (TrackerMinerFS *fs,
                                                             GFile          *file);
 
-static void           processing_pool_cancel_foreach          (gpointer        data,
+static void           task_pool_cancel_foreach                (gpointer        data,
                                                                gpointer        user_data);
 
 
@@ -688,7 +695,8 @@ tracker_miner_fs_init (TrackerMinerFS *object)
 	                                                        (GDestroyNotify) g_free,
 	                                                        (GDestroyNotify) NULL);
 
-	/* Create processing pool */
+	/* Create processing pools */
+	priv->task_pool = tracker_task_pool_new (DEFAULT_WAIT_POOL_LIMIT);
 	priv->processing_pool = tracker_processing_pool_new (object,
 	                                                     DEFAULT_WAIT_POOL_LIMIT,
 	                                                     DEFAULT_READY_POOL_LIMIT,
@@ -794,9 +802,12 @@ fs_finalize (GObject *object)
 	tracker_priority_queue_unref (priv->crawled_directories);
 
 	/* Cancel every pending task */
-	tracker_processing_pool_foreach (priv->processing_pool,
-	                                 processing_pool_cancel_foreach,
-	                                 NULL);
+	tracker_task_pool_foreach (priv->task_pool,
+	                           task_pool_cancel_foreach,
+	                           NULL);
+	g_object_unref (priv->task_pool);
+	g_list_free (priv->extraction_tasks);
+
 	tracker_processing_pool_free (priv->processing_pool);
 
 	tracker_priority_queue_foreach (priv->items_moved,
@@ -858,8 +869,8 @@ fs_set_property (GObject      *object,
 		                               g_value_get_double (value));
 		break;
 	case PROP_WAIT_POOL_LIMIT:
-		tracker_processing_pool_set_wait_limit (fs->priv->processing_pool,
-		                                        g_value_get_uint (value));
+		tracker_task_pool_set_limit (fs->priv->task_pool,
+		                             g_value_get_uint (value));
 		break;
 	case PROP_READY_POOL_LIMIT:
 		tracker_processing_pool_set_ready_limit (fs->priv->processing_pool,
@@ -897,7 +908,7 @@ fs_get_property (GObject    *object,
 		break;
 	case PROP_WAIT_POOL_LIMIT:
 		g_value_set_uint (value,
-		                  tracker_processing_pool_get_wait_limit (fs->priv->processing_pool));
+		                  tracker_task_pool_get_limit (fs->priv->task_pool));
 		break;
 	case PROP_READY_POOL_LIMIT:
 		g_value_set_uint (value,
@@ -1621,8 +1632,8 @@ update_processing_task_context_free (UpdateProcessingTaskContext *ctxt)
 }
 
 static gboolean
-do_process_file (TrackerMinerFS        *fs,
-                 TrackerProcessingTask *task)
+do_process_file (TrackerMinerFS *fs,
+                 TrackerTask    *task)
 {
 	TrackerMinerFSPrivate *priv;
 	gboolean processing;
@@ -1631,8 +1642,8 @@ do_process_file (TrackerMinerFS        *fs,
 	GFile *task_file;
 	UpdateProcessingTaskContext *ctxt;
 
-	ctxt = tracker_processing_task_get_context (task);
-	task_file = tracker_processing_task_get_file (task);
+	ctxt = tracker_task_get_data (task);
+	task_file = tracker_task_get_file (task);
 	uri = g_file_get_uri (task_file);
 	priv = fs->priv;
 
@@ -1659,7 +1670,7 @@ do_process_file (TrackerMinerFS        *fs,
 		/* Re-fetch data, since it might have been
 		 * removed in broken implementations
 		 */
-		task = tracker_processing_pool_find_task (priv->processing_pool, task_file, FALSE);
+		task = tracker_task_pool_find (priv->task_pool, task_file);
 
 		g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
 
@@ -1669,8 +1680,9 @@ do_process_file (TrackerMinerFS        *fs,
 			            "tracker_miner_fs_file_notify(), this is an "
 			            "implementation error", G_OBJECT_TYPE_NAME (fs), uri);
 		} else {
-			tracker_processing_pool_remove_task (priv->processing_pool, task);
-			tracker_processing_task_unref (task);
+			tracker_task_pool_remove (priv->task_pool, task);
+			priv->extraction_tasks = g_list_remove (priv->extraction_tasks, task);
+			tracker_task_unref (task);
 		}
 	}
 
@@ -1680,22 +1692,28 @@ do_process_file (TrackerMinerFS        *fs,
 }
 
 static void
-item_add_or_update_cb (TrackerMinerFS        *fs,
-                       TrackerProcessingTask *task,
-                       const GError          *error)
+item_add_or_update_cb (TrackerMinerFS *fs,
+                       TrackerTask    *extraction_task,
+                       const GError   *error)
 {
 	UpdateProcessingTaskContext *ctxt;
+	TrackerProcessingTask *task;
 	GFile *task_file;
 	gchar *uri;
 
-	ctxt = tracker_processing_task_get_context (task);
-	task_file = tracker_processing_task_get_file (task);
+	ctxt = tracker_task_get_data (extraction_task);
+	task_file = tracker_task_get_file (extraction_task);
 	uri = g_file_get_uri (task_file);
 
 	if (error) {
-		TrackerProcessingTask *first_item_task;
+		TrackerTask *first_item_task = NULL;
+		GList *first_task;
 
-		first_item_task = tracker_processing_pool_get_last_wait (fs->priv->processing_pool);
+		first_task = g_list_last (fs->priv->extraction_tasks);
+
+		if (first_task) {
+			first_item_task = first_task->data;
+		}
 
 		/* Perhaps this is too specific to TrackerMinerFiles, if the extractor
 		 * is choking on some file, the miner will get a timeout for all files
@@ -1703,7 +1721,7 @@ item_add_or_update_cb (TrackerMinerFS        *fs,
 		 * is the first one that was added to the processing pool, so we retry
 		 * the others.
 		 */
-		if (task != first_item_task &&
+		if (extraction_task != first_item_task &&
 		    (error->code == G_DBUS_ERROR_NO_REPLY ||
 		     error->code == G_DBUS_ERROR_TIMEOUT ||
 		     error->code == G_DBUS_ERROR_TIMED_OUT)) {
@@ -1713,7 +1731,7 @@ item_add_or_update_cb (TrackerMinerFS        *fs,
 			g_object_unref (ctxt->builder);
 			ctxt->builder = tracker_sparql_builder_new_update ();
 
-			do_process_file (fs, task);
+			do_process_file (fs, extraction_task);
 			g_free (uri);
 
 			return;
@@ -1724,8 +1742,10 @@ item_add_or_update_cb (TrackerMinerFS        *fs,
 
 			if (error->code == G_IO_ERROR_CANCELLED) {
 				/* Cancelled is cancelled, just move along in this case */
-				tracker_processing_pool_remove_task (fs->priv->processing_pool, task);
-				tracker_processing_task_unref (task);
+				tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
+				fs->priv->extraction_tasks = g_list_remove (fs->priv->extraction_tasks,
+									    extraction_task);
+				tracker_task_unref (extraction_task);
 
 				item_queue_handlers_set_up (fs);
 				return;
@@ -1733,6 +1753,12 @@ item_add_or_update_cb (TrackerMinerFS        *fs,
 		}
 	}
 
+	tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
+	fs->priv->extraction_tasks = g_list_remove (fs->priv->extraction_tasks,
+						    extraction_task);
+
+	task = tracker_processing_task_new (task_file);
+
 	if (ctxt->urn) {
 		gboolean attribute_update_only;
 
@@ -1807,6 +1833,8 @@ item_add_or_update_cb (TrackerMinerFS        *fs,
 		item_queue_handlers_set_up (fs);
 	}
 
+	tracker_task_unref (extraction_task);
+
 	g_free (uri);
 }
 
@@ -1818,7 +1846,7 @@ item_add_or_update (TrackerMinerFS *fs,
 	TrackerSparqlBuilder *sparql;
 	GCancellable *cancellable;
 	gboolean retval;
-	TrackerProcessingTask *task;
+	TrackerTask *task;
 	GFile *parent;
 	const gchar *urn;
 	const gchar *parent_urn = NULL;
@@ -1871,21 +1899,20 @@ item_add_or_update (TrackerMinerFS *fs,
 
 	/* Create task and add it to the pool as a WAIT task (we need to extract
 	 * the file metadata and such) */
-	task = tracker_processing_task_new (file);
 	ctxt = update_processing_task_context_new (TRACKER_MINER (fs),
 	                                           urn,
 	                                           parent_urn,
 	                                           cancellable,
 	                                           sparql);
-	tracker_processing_task_set_context (task,
-	                                     ctxt,
-	                                     (GFreeFunc) update_processing_task_context_free);
-	tracker_processing_pool_push_wait_task (priv->processing_pool, task);
+	task = tracker_task_new (file, ctxt,
+	                         (GDestroyNotify) update_processing_task_context_free);
+	tracker_task_pool_add (priv->task_pool, task);
+	priv->extraction_tasks = g_list_prepend (priv->extraction_tasks, task);
 
 	if (do_process_file (fs, task)) {
 		fs->priv->total_files_processed++;
 
-		if (tracker_processing_pool_wait_limit_reached (priv->processing_pool)) {
+		if (tracker_task_pool_limit_reached (priv->task_pool)) {
 			retval = FALSE;
 		}
 	}
@@ -2435,9 +2462,7 @@ should_wait (TrackerMinerFS *fs,
 	GFile *parent;
 
 	/* Is the item already being processed? */
-	if (tracker_processing_pool_find_task (fs->priv->processing_pool,
-	                                       file,
-	                                       TRUE)) {
+	if (tracker_task_pool_find (fs->priv->task_pool, file)) {
 		/* Yes, a previous event on same item currently
 		 * being processed */
 		return TRUE;
@@ -2446,9 +2471,7 @@ should_wait (TrackerMinerFS *fs,
 	/* Is the item's parent being processed right now? */
 	parent = g_file_get_parent (file);
 	if (parent) {
-		if (tracker_processing_pool_find_task (fs->priv->processing_pool,
-		                                       parent,
-		                                       TRUE)) {
+		if (tracker_task_pool_find (fs->priv->task_pool, parent)) {
 			/* Yes, a previous event on the parent of this item
 			 * currently being processed */
 			g_object_unref (parent);
@@ -2503,7 +2526,7 @@ item_queue_get_next_file (TrackerMinerFS  *fs,
 	    !tracker_priority_queue_is_empty (fs->priv->crawled_directories)) {
 
 		trace_eq ("Created items queue empty, but still crawling (%d tasks in WAIT state)",
-		          tracker_processing_pool_get_wait_task_count (fs->priv->processing_pool));
+		          tracker_task_pool_get_size (fs->priv->task_pool));
 
 		/* The items_created queue is empty, but there are pending
 		 * items from the crawler to be processed. We feed the queue
@@ -2511,7 +2534,7 @@ item_queue_get_next_file (TrackerMinerFS  *fs,
 		 * info is inserted to the store before the children are
 		 * inspected.
 		 */
-		if (tracker_processing_pool_get_wait_task_count (fs->priv->processing_pool) > 0) {
+		if (tracker_task_pool_get_size (fs->priv->task_pool) > 0) {
 			/* Items still being processed */
 			*file = NULL;
 			*source_file = NULL;
@@ -2842,6 +2865,7 @@ item_queue_handlers_cb (gpointer user_data)
 	case QUEUE_NONE:
 		/* Print stats and signal finished */
 		if (!fs->priv->is_crawling &&
+		    tracker_task_pool_get_size (fs->priv->task_pool) == 0 &&
 		    tracker_processing_pool_get_total_task_count (fs->priv->processing_pool) == 0) {
 			process_stop (fs);
 		}
@@ -2936,7 +2960,7 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
 	}
 
 	/* Already sent max number of tasks to tracker-extract? */
-	if (tracker_processing_pool_wait_limit_reached (fs->priv->processing_pool)) {
+	if (tracker_task_pool_limit_reached (fs->priv->task_pool)) {
 		return;
 	}
 
@@ -4275,16 +4299,16 @@ tracker_miner_fs_directory_add (TrackerMinerFS *fs,
 }
 
 static void
-processing_pool_cancel_foreach (gpointer data,
-                                gpointer user_data)
+task_pool_cancel_foreach (gpointer data,
+                          gpointer user_data)
 {
-	TrackerProcessingTask *task = data;
+	TrackerTask *task = data;
 	GFile *file = user_data;
 	GFile *task_file;
 	UpdateProcessingTaskContext *ctxt;
 
-	task_file = tracker_processing_task_get_file (task);
-	ctxt = tracker_processing_task_get_context (task);
+	ctxt = tracker_task_get_data (task);
+	task_file = tracker_task_get_file (task);
 
 	if (ctxt &&
 	    ctxt->cancellable &&
@@ -4324,9 +4348,9 @@ tracker_miner_fs_directory_remove (TrackerMinerFS *fs,
 	g_debug ("Removing directory");
 
 	/* Cancel all pending tasks on files inside the path given by file */
-	tracker_processing_pool_foreach (priv->processing_pool,
-	                                 processing_pool_cancel_foreach,
-	                                 file);
+	tracker_task_pool_foreach (priv->task_pool,
+	                           task_pool_cancel_foreach,
+	                           file);
 
 	g_debug ("  Cancelled processing pool tasks at %f\n", g_timer_elapsed (timer, NULL));
 
@@ -4630,16 +4654,14 @@ tracker_miner_fs_file_notify (TrackerMinerFS *fs,
                               GFile          *file,
                               const GError   *error)
 {
-	TrackerProcessingTask *task;
+	TrackerTask *task;
 
 	g_return_if_fail (TRACKER_IS_MINER_FS (fs));
 	g_return_if_fail (G_IS_FILE (file));
 
 	fs->priv->total_files_notified++;
 
-	task = tracker_processing_pool_find_task (fs->priv->processing_pool,
-	                                          file,
-	                                          FALSE);
+	task = tracker_task_pool_find (fs->priv->task_pool, file);
 
 	if (!task) {
 		gchar *uri;
@@ -4742,15 +4764,13 @@ G_CONST_RETURN gchar *
 tracker_miner_fs_get_urn (TrackerMinerFS *fs,
                           GFile          *file)
 {
-	TrackerProcessingTask *task;
+	TrackerTask *task;
 
 	g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
 	g_return_val_if_fail (G_IS_FILE (file), NULL);
 
 	/* Check if found in currently processed data */
-	task = tracker_processing_pool_find_task (fs->priv->processing_pool,
-	                                          file,
-	                                          FALSE);
+	task = tracker_task_pool_find (fs->priv->task_pool, file);
 
 	if (!task) {
 		gchar *uri;
@@ -4766,7 +4786,8 @@ tracker_miner_fs_get_urn (TrackerMinerFS *fs,
 		UpdateProcessingTaskContext *ctxt;
 
 		/* We are only storing the URN in the created/updated tasks */
-		ctxt = tracker_processing_task_get_context (task);
+		ctxt = tracker_task_get_data (task);
+
 		if (!ctxt) {
 			gchar *uri;
 
@@ -4834,15 +4855,13 @@ G_CONST_RETURN gchar *
 tracker_miner_fs_get_parent_urn (TrackerMinerFS *fs,
                                  GFile          *file)
 {
-	TrackerProcessingTask *task;
+	TrackerTask *task;
 
 	g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
 	g_return_val_if_fail (G_IS_FILE (file), NULL);
 
 	/* Check if found in currently processed data */
-	task = tracker_processing_pool_find_task (fs->priv->processing_pool,
-	                                          file,
-	                                          FALSE);
+	task = tracker_task_pool_find (fs->priv->task_pool, file);
 
 	if (!task) {
 		gchar *uri;
@@ -4858,7 +4877,8 @@ tracker_miner_fs_get_parent_urn (TrackerMinerFS *fs,
 		UpdateProcessingTaskContext *ctxt;
 
 		/* We are only storing the URN in the created/updated tasks */
-		ctxt = tracker_processing_task_get_context (task);
+		ctxt = tracker_task_get_data (task);
+
 		if (!ctxt) {
 			gchar *uri;
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]