[tracker/wip/miner-priority-queues: 12/19] tracker-miner-fs: Use TrackerTaskPool for the extraction pool



commit 77038048f9c22f34648597420db57de8fee2f41f
Author: Carlos Garnacho <carlos lanedo com>
Date:   Mon Jul 4 12:16:31 2011 +0200

    tracker-miner-fs: Use TrackerTaskPool for the extraction pool

 src/libtracker-miner/tracker-miner-fs.c |  124 ++++++++++++++++---------------
 1 files changed, 63 insertions(+), 61 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 66754fd..dff0f5e 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -33,6 +33,7 @@
 #include "tracker-thumbnailer.h"
 #include "tracker-miner-fs-processing-pool.h"
 #include "tracker-priority-queue.h"
+#include "tracker-task-pool.h"
 
 /* If defined will print the tree from GNode while running */
 #ifdef CRAWLED_TREE_ENABLE_TRACE
@@ -132,6 +133,7 @@ typedef struct {
 } DirectoryData;
 
 typedef struct {
+	GFile *file;
 	gchar *urn;
 	gchar *parent_urn;
 	GCancellable *cancellable;
@@ -199,6 +201,10 @@ struct _TrackerMinerFSPrivate {
 
 	gdouble         throttle;
 
+	/* Extraction tasks */
+	TrackerTaskPool *task_pool;
+
+	/* Sparql insertion tasks */
 	TrackerProcessingPool *processing_pool;
 
 	/* URI mtime cache */
@@ -363,7 +369,7 @@ static void           tracker_miner_fs_directory_add_internal (TrackerMinerFS *f
 static gboolean       miner_fs_has_children_without_parent (TrackerMinerFS *fs,
                                                             GFile          *file);
 
-static void           processing_pool_cancel_foreach          (gpointer        data,
+static void           task_pool_cancel_foreach                (gpointer        data,
                                                                gpointer        user_data);
 
 
@@ -688,7 +694,8 @@ tracker_miner_fs_init (TrackerMinerFS *object)
 	                                                        (GDestroyNotify) g_free,
 	                                                        (GDestroyNotify) NULL);
 
-	/* Create processing pool */
+	/* Create processing pools */
+	priv->task_pool = tracker_task_pool_new (DEFAULT_WAIT_POOL_LIMIT);
 	priv->processing_pool = tracker_processing_pool_new (object,
 	                                                     DEFAULT_WAIT_POOL_LIMIT,
 	                                                     DEFAULT_READY_POOL_LIMIT,
@@ -794,9 +801,11 @@ fs_finalize (GObject *object)
 	tracker_priority_queue_unref (priv->crawled_directories);
 
 	/* Cancel every pending task */
-	tracker_processing_pool_foreach (priv->processing_pool,
-	                                 processing_pool_cancel_foreach,
-	                                 NULL);
+	tracker_task_pool_foreach (priv->task_pool,
+	                           task_pool_cancel_foreach,
+	                           NULL);
+	g_object_unref (priv->task_pool);
+
 	tracker_processing_pool_free (priv->processing_pool);
 
 	tracker_priority_queue_foreach (priv->items_moved,
@@ -858,8 +867,8 @@ fs_set_property (GObject      *object,
 		                               g_value_get_double (value));
 		break;
 	case PROP_WAIT_POOL_LIMIT:
-		tracker_processing_pool_set_wait_limit (fs->priv->processing_pool,
-		                                        g_value_get_uint (value));
+		tracker_task_pool_set_limit (fs->priv->task_pool,
+		                             g_value_get_uint (value));
 		break;
 	case PROP_READY_POOL_LIMIT:
 		tracker_processing_pool_set_ready_limit (fs->priv->processing_pool,
@@ -897,7 +906,7 @@ fs_get_property (GObject    *object,
 		break;
 	case PROP_WAIT_POOL_LIMIT:
 		g_value_set_uint (value,
-		                  tracker_processing_pool_get_wait_limit (fs->priv->processing_pool));
+		                  tracker_task_pool_get_limit (fs->priv->task_pool));
 		break;
 	case PROP_READY_POOL_LIMIT:
 		g_value_set_uint (value,
@@ -1624,8 +1633,8 @@ update_processing_task_context_free (UpdateProcessingTaskContext *ctxt)
 }
 
 static gboolean
-do_process_file (TrackerMinerFS        *fs,
-                 TrackerProcessingTask *task)
+do_process_file (TrackerMinerFS *fs,
+                 TrackerTask    *task)
 {
 	TrackerMinerFSPrivate *priv;
 	gboolean processing;
@@ -1634,8 +1643,8 @@ do_process_file (TrackerMinerFS        *fs,
 	GFile *task_file;
 	UpdateProcessingTaskContext *ctxt;
 
-	ctxt = tracker_processing_task_get_context (task);
-	task_file = tracker_processing_task_get_file (task);
+	ctxt = tracker_task_get_data (task);
+	task_file = tracker_task_get_file (task);
 	uri = g_file_get_uri (task_file);
 	priv = fs->priv;
 
@@ -1662,7 +1671,7 @@ do_process_file (TrackerMinerFS        *fs,
 		/* Re-fetch data, since it might have been
 		 * removed in broken implementations
 		 */
-		task = tracker_processing_pool_find_task (priv->processing_pool, task_file, FALSE);
+		task = tracker_task_pool_find (priv->task_pool, task_file);
 
 		g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
 
@@ -1672,8 +1681,8 @@ do_process_file (TrackerMinerFS        *fs,
 			            "tracker_miner_fs_file_notify(), this is an "
 			            "implementation error", G_OBJECT_TYPE_NAME (fs), uri);
 		} else {
-			tracker_processing_pool_remove_task (priv->processing_pool, task);
-			tracker_processing_task_unref (task);
+			tracker_task_pool_remove (priv->task_pool, task);
+			tracker_task_unref (task);
 		}
 	}
 
@@ -1683,26 +1692,26 @@ do_process_file (TrackerMinerFS        *fs,
 }
 
 static void
-item_add_or_update_cb (TrackerMinerFS        *fs,
-                       TrackerProcessingTask *task,
-                       const GError          *error)
+item_add_or_update_cb (TrackerMinerFS *fs,
+                       TrackerTask    *extraction_task,
+                       const GError   *error)
 {
 	UpdateProcessingTaskContext *ctxt;
+	TrackerProcessingTask *task;
 	GFile *task_file;
 	gchar *uri;
 
-	ctxt = tracker_processing_task_get_context (task);
-	task_file = tracker_processing_task_get_file (task);
+	ctxt = tracker_task_get_data (extraction_task);
+	task_file = tracker_task_get_file (extraction_task);
 	uri = g_file_get_uri (task_file);
 
+	tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
+
 	if (error) {
 		g_message ("Could not process '%s': %s", uri, error->message);
 
 		fs->priv->total_files_notified_error++;
 
-		tracker_processing_pool_remove_task (fs->priv->processing_pool, task);
-		tracker_processing_task_unref (task);
-
 		item_queue_handlers_set_up (fs);
 	} else {
 		if (ctxt->urn) {
@@ -1770,6 +1779,8 @@ item_add_or_update_cb (TrackerMinerFS        *fs,
 		}
 	}
 
+	tracker_task_unref (extraction_task);
+
 	g_free (uri);
 }
 
@@ -1781,7 +1792,7 @@ item_add_or_update (TrackerMinerFS *fs,
 	TrackerSparqlBuilder *sparql;
 	GCancellable *cancellable;
 	gboolean retval;
-	TrackerProcessingTask *task;
+	TrackerTask *task;
 	GFile *parent;
 	const gchar *urn;
 	const gchar *parent_urn = NULL;
@@ -1834,21 +1845,19 @@ item_add_or_update (TrackerMinerFS *fs,
 
 	/* Create task and add it to the pool as a WAIT task (we need to extract
 	 * the file metadata and such) */
-	task = tracker_processing_task_new (file);
 	ctxt = update_processing_task_context_new (TRACKER_MINER (fs),
 	                                           urn,
 	                                           parent_urn,
 	                                           cancellable,
 	                                           sparql);
-	tracker_processing_task_set_context (task,
-	                                     ctxt,
-	                                     (GFreeFunc) update_processing_task_context_free);
-	tracker_processing_pool_push_wait_task (priv->processing_pool, task);
+	task = tracker_task_new (file, ctxt,
+	                         (GDestroyNotify) update_processing_task_context_free);
+	tracker_task_pool_add (priv->task_pool, task);
 
 	if (do_process_file (fs, task)) {
 		fs->priv->total_files_processed++;
 
-		if (tracker_processing_pool_wait_limit_reached (priv->processing_pool)) {
+		if (tracker_task_pool_limit_reached (priv->task_pool)) {
 			retval = FALSE;
 		}
 	}
@@ -2398,9 +2407,7 @@ should_wait (TrackerMinerFS *fs,
 	GFile *parent;
 
 	/* Is the item already being processed? */
-	if (tracker_processing_pool_find_task (fs->priv->processing_pool,
-	                                       file,
-	                                       TRUE)) {
+	if (tracker_task_pool_find (fs->priv->task_pool, file)) {
 		/* Yes, a previous event on same item currently
 		 * being processed */
 		return TRUE;
@@ -2409,9 +2416,7 @@ should_wait (TrackerMinerFS *fs,
 	/* Is the item's parent being processed right now? */
 	parent = g_file_get_parent (file);
 	if (parent) {
-		if (tracker_processing_pool_find_task (fs->priv->processing_pool,
-		                                       parent,
-		                                       TRUE)) {
+		if (tracker_task_pool_find (fs->priv->task_pool, parent)) {
 			/* Yes, a previous event on the parent of this item
 			 * currently being processed */
 			g_object_unref (parent);
@@ -2466,7 +2471,7 @@ item_queue_get_next_file (TrackerMinerFS  *fs,
 	    !tracker_priority_queue_is_empty (fs->priv->crawled_directories)) {
 
 		trace_eq ("Created items queue empty, but still crawling (%d tasks in WAIT state)",
-		          tracker_processing_pool_get_wait_task_count (fs->priv->processing_pool));
+		          tracker_task_pool_get_size (fs->priv->task_pool));
 
 		/* The items_created queue is empty, but there are pending
 		 * items from the crawler to be processed. We feed the queue
@@ -2474,7 +2479,7 @@ item_queue_get_next_file (TrackerMinerFS  *fs,
 		 * info is inserted to the store before the children are
 		 * inspected.
 		 */
-		if (tracker_processing_pool_get_wait_task_count (fs->priv->processing_pool) > 0) {
+		if (tracker_task_pool_get_size (fs->priv->task_pool) > 0) {
 			/* Items still being processed */
 			*file = NULL;
 			*source_file = NULL;
@@ -2813,6 +2818,7 @@ item_queue_handlers_cb (gpointer user_data)
 	case QUEUE_NONE:
 		/* Print stats and signal finished */
 		if (!fs->priv->is_crawling &&
+		    tracker_task_pool_get_size (fs->priv->task_pool) == 0 &&
 		    tracker_processing_pool_get_total_task_count (fs->priv->processing_pool) == 0) {
 			process_stop (fs);
 		}
@@ -2907,7 +2913,7 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
 	}
 
 	/* Already sent max number of tasks to tracker-extract? */
-	if (tracker_processing_pool_wait_limit_reached (fs->priv->processing_pool)) {
+	if (tracker_task_pool_limit_reached (fs->priv->task_pool)) {
 		return;
 	}
 
@@ -4249,16 +4255,16 @@ tracker_miner_fs_directory_add (TrackerMinerFS *fs,
 }
 
 static void
-processing_pool_cancel_foreach (gpointer data,
-                                gpointer user_data)
+task_pool_cancel_foreach (gpointer data,
+                          gpointer user_data)
 {
-	TrackerProcessingTask *task = data;
+	TrackerTask *task = data;
 	GFile *file = user_data;
 	GFile *task_file;
 	UpdateProcessingTaskContext *ctxt;
 
-	task_file = tracker_processing_task_get_file (task);
-	ctxt = tracker_processing_task_get_context (task);
+	ctxt = tracker_task_get_data (task);
+	task_file = tracker_task_get_file (task);
 
 	if (ctxt &&
 	    ctxt->cancellable &&
@@ -4298,9 +4304,9 @@ tracker_miner_fs_directory_remove (TrackerMinerFS *fs,
 	g_debug ("Removing directory");
 
 	/* Cancel all pending tasks on files inside the path given by file */
-	tracker_processing_pool_foreach (priv->processing_pool,
-	                                 processing_pool_cancel_foreach,
-	                                 file);
+	tracker_task_pool_foreach (priv->task_pool,
+	                           task_pool_cancel_foreach,
+	                           file);
 
 	g_debug ("  Cancelled processing pool tasks at %f\n", g_timer_elapsed (timer, NULL));
 
@@ -4604,16 +4610,14 @@ tracker_miner_fs_file_notify (TrackerMinerFS *fs,
                               GFile          *file,
                               const GError   *error)
 {
-	TrackerProcessingTask *task;
+	TrackerTask *task;
 
 	g_return_if_fail (TRACKER_IS_MINER_FS (fs));
 	g_return_if_fail (G_IS_FILE (file));
 
 	fs->priv->total_files_notified++;
 
-	task = tracker_processing_pool_find_task (fs->priv->processing_pool,
-	                                          file,
-	                                          FALSE);
+	task = tracker_task_pool_find (fs->priv->task_pool, file);
 
 	if (!task) {
 		gchar *uri;
@@ -4716,15 +4720,13 @@ G_CONST_RETURN gchar *
 tracker_miner_fs_get_urn (TrackerMinerFS *fs,
                           GFile          *file)
 {
-	TrackerProcessingTask *task;
+	TrackerTask *task;
 
 	g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
 	g_return_val_if_fail (G_IS_FILE (file), NULL);
 
 	/* Check if found in currently processed data */
-	task = tracker_processing_pool_find_task (fs->priv->processing_pool,
-	                                          file,
-	                                          FALSE);
+	task = tracker_task_pool_find (fs->priv->task_pool, file);
 
 	if (!task) {
 		gchar *uri;
@@ -4740,7 +4742,8 @@ tracker_miner_fs_get_urn (TrackerMinerFS *fs,
 		UpdateProcessingTaskContext *ctxt;
 
 		/* We are only storing the URN in the created/updated tasks */
-		ctxt = tracker_processing_task_get_context (task);
+		ctxt = tracker_task_get_data (task);
+
 		if (!ctxt) {
 			gchar *uri;
 
@@ -4808,15 +4811,13 @@ G_CONST_RETURN gchar *
 tracker_miner_fs_get_parent_urn (TrackerMinerFS *fs,
                                  GFile          *file)
 {
-	TrackerProcessingTask *task;
+	TrackerTask *task;
 
 	g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
 	g_return_val_if_fail (G_IS_FILE (file), NULL);
 
 	/* Check if found in currently processed data */
-	task = tracker_processing_pool_find_task (fs->priv->processing_pool,
-	                                          file,
-	                                          FALSE);
+	task = tracker_task_pool_find (fs->priv->task_pool, file);
 
 	if (!task) {
 		gchar *uri;
@@ -4832,7 +4833,8 @@ tracker_miner_fs_get_parent_urn (TrackerMinerFS *fs,
 		UpdateProcessingTaskContext *ctxt;
 
 		/* We are only storing the URN in the created/updated tasks */
-		ctxt = tracker_processing_task_get_context (task);
+		ctxt = tracker_task_get_data (task);
+
 		if (!ctxt) {
 			gchar *uri;
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]