[tracker/miner-fs-refactor] libtracker-miner: Use different limits for WAIT and PROCESS tasks



commit aa69ae0667c99de5c404c4f6021de79f7d980be6
Author: Aleksander Morgado <aleksander lanedo com>
Date:   Thu Oct 14 12:01:53 2010 +0200

    libtracker-miner: Use different limits for WAIT and PROCESS tasks

 .../tracker-miner-fs-processing-pool.c             |  182 +++++++++++++-------
 .../tracker-miner-fs-processing-pool.h             |   53 +++---
 src/libtracker-miner/tracker-miner-fs.c            |   55 ++++--
 src/miners/fs/tracker-miner-files.c                |    5 +-
 4 files changed, 191 insertions(+), 104 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index a78599b..3a2543c 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -24,8 +24,9 @@
 
 typedef enum {
 	PROCESSING_TASK_STATUS_NO_POOL,
-	PROCESSING_TASK_STATUS_WAIT,
+	PROCESSING_TASK_STATUS_WAIT = 0,
 	PROCESSING_TASK_STATUS_PROCESS,
+	PROCESSING_TASK_STATUS_LAST
 } ProcessingTaskStatus;
 
 struct _ProcessingTask {
@@ -117,10 +118,10 @@ struct _ProcessingPool {
 	/* Connection to the Store */
 	TrackerSparqlConnection *connection;
 
-	/* The tasks currently being processed */
-	GQueue *tasks;
-	/* The processing pool limit */
-	guint  limit;
+	/* The tasks currently in WAIT or PROCESS status */
+	GQueue *tasks[PROCESSING_TASK_STATUS_LAST];
+	/* The processing pool limits */
+	guint  limit[PROCESSING_TASK_STATUS_LAST];
 };
 
 static void
@@ -133,15 +134,21 @@ pool_queue_free_foreach (gpointer data,
 void
 processing_pool_free (ProcessingPool *pool)
 {
+	guint i;
+
 	if (!pool)
 		return;
 
 	/* Free any pending task here... shouldn't really
 	 * be any */
-	g_queue_foreach (pool->tasks,
-	                 pool_queue_free_foreach,
-	                 NULL);
-	g_queue_free (pool->tasks);
+	for (i = PROCESSING_TASK_STATUS_WAIT;
+	     i < PROCESSING_TASK_STATUS_LAST;
+	     i++) {
+		g_queue_foreach (pool->tasks[i],
+		                 pool_queue_free_foreach,
+		                 NULL);
+		g_queue_free (pool->tasks[i]);
+	}
 
 	g_object_unref (pool->connection);
 	g_free (pool);
@@ -149,33 +156,71 @@ processing_pool_free (ProcessingPool *pool)
 
 ProcessingPool *
 processing_pool_new (TrackerSparqlConnection *connection,
-                     guint                    limit)
+                     guint                    limit_wait,
+                     guint                    limit_process)
 {
 	ProcessingPool *pool;
 
 	pool = g_new0 (ProcessingPool, 1);
 
 	pool->connection = g_object_ref (connection);
-	pool->limit = limit;
-	pool->tasks = g_queue_new ();
+	pool->limit[PROCESSING_TASK_STATUS_WAIT] = limit_wait;
+	pool->limit[PROCESSING_TASK_STATUS_PROCESS] = limit_process;
+
+	pool->tasks[PROCESSING_TASK_STATUS_WAIT] = g_queue_new ();
+	pool->tasks[PROCESSING_TASK_STATUS_PROCESS] = g_queue_new ();
 
-	g_debug ("Processing pool created with a limit of %u tasks", limit);
+	g_debug ("Processing pool created with a limit of "
+	         "%u tasks in WAIT status and "
+	         "%u tasks in PROCESS status",
+	         limit_wait,
+	         limit_process);
 
 	return pool;
 }
 
 void
-processing_pool_set_limit (ProcessingPool *pool,
-                           guint           limit)
+processing_pool_set_wait_limit (ProcessingPool *pool,
+                                guint           limit)
 {
-	g_message ("Processing pool limit is set to %u", limit);
-	pool->limit = limit;
+	g_message ("Processing pool limit for WAIT tasks set to %u", limit);
+	pool->limit[PROCESSING_TASK_STATUS_WAIT] = limit;
+}
+
+void
+processing_pool_set_process_limit (ProcessingPool *pool,
+                                   guint           limit)
+{
+	g_message ("Processing pool limit for PROCESS tasks set to %u", limit);
+	pool->limit[PROCESSING_TASK_STATUS_PROCESS] = limit;
 }
 
 guint
-processing_pool_get_limit (ProcessingPool *pool)
+processing_pool_get_wait_limit (ProcessingPool *pool)
 {
-	return pool->limit;
+	return pool->limit[PROCESSING_TASK_STATUS_WAIT];
+}
+
+guint
+processing_pool_get_process_limit (ProcessingPool *pool)
+{
+	return pool->limit[PROCESSING_TASK_STATUS_PROCESS];
+}
+
+gboolean
+processing_pool_wait_limit_reached (ProcessingPool *pool)
+{
+	return ((g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_WAIT]) >=
+	         pool->limit[PROCESSING_TASK_STATUS_WAIT]) ?
+	        TRUE : FALSE);
+}
+
+gboolean
+processing_pool_process_limit_reached (ProcessingPool *pool)
+{
+	return ((g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_PROCESS]) >=
+	         pool->limit[PROCESSING_TASK_STATUS_PROCESS]) ?
+	        TRUE : FALSE);
 }
 
 ProcessingTask *
@@ -183,28 +228,34 @@ processing_pool_find_task (ProcessingPool *pool,
                            GFile          *file,
                            gboolean        path_search)
 {
-	GList *l;
-
-	for (l = pool->tasks->head; l; l = g_list_next (l)) {
-		ProcessingTask *task = l->data;
-
-		if (!path_search) {
-			/* Different operations for the same file URI could be
-			 * piled up here, each being a different GFile object.
-			 * Miner implementations should really notify on the
-			 * same GFile object that's being passed, so we check for
-			 * pointer equality here, rather than doing path comparisons
-			 */
-			if(task->file == file)
-				return task;
-		} else {
-			/* Note that if there are different GFiles being
-			 * processed for the same file path, we are actually
-			 * returning the first one found, If you want exactly
-			 * the same GFile as the one as input, use the
-			 * process_data_find() method instead */
-			if (g_file_equal (task->file, file))
-				return task;
+	guint i;
+
+	for (i = PROCESSING_TASK_STATUS_WAIT;
+	     i < PROCESSING_TASK_STATUS_PROCESS;
+	     i++) {
+		GList *l;
+
+		for (l = pool->tasks[i]->head; l; l = g_list_next (l)) {
+			ProcessingTask *task = l->data;
+
+			if (!path_search) {
+				/* Different operations for the same file URI could be
+				 * piled up here, each being a different GFile object.
+				 * Miner implementations should really notify on the
+				 * same GFile object that's being passed, so we check for
+				 * pointer equality here, rather than doing path comparisons
+				 */
+				if(task->file == file)
+					return task;
+			} else {
+				/* Note that if there are different GFiles being
+				 * processed for the same file path, we are actually
+				 * returning the first one found, If you want exactly
+				 * the same GFile as the one as input, use the
+				 * process_data_find() method instead */
+				if (g_file_equal (task->file, file))
+					return task;
+			}
 		}
 	}
 
@@ -213,13 +264,6 @@ processing_pool_find_task (ProcessingPool *pool,
 }
 
 gboolean
-processing_pool_limit_reached (ProcessingPool *pool)
-{
-	return (g_queue_get_length (pool->tasks) >= pool->limit ?
-	        TRUE : FALSE);
-}
-
-gboolean
 processing_pool_wait_task (ProcessingPool *pool,
                            ProcessingTask *task)
 {
@@ -230,10 +274,10 @@ processing_pool_wait_task (ProcessingPool *pool,
 
 	/* Push a new task in WAIT status (so just add it to the tasks queue,
 	 * and don't process it. */
-	g_queue_push_head (pool->tasks, task);
+	g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_WAIT], task);
 	task->pool = pool;
 
-	return (!processing_pool_limit_reached (pool));
+	return (!processing_pool_wait_limit_reached (pool));
 }
 
 static void
@@ -276,14 +320,17 @@ processing_pool_process_task (ProcessingPool                     *pool,
 	g_assert (task->sparql != NULL);
 
 	/* First, check if the task was already added as being WAITING */
-	previous = g_queue_find (pool->tasks, task);
+	previous = g_queue_find (pool->tasks[PROCESSING_TASK_STATUS_WAIT], task);
 	if (!previous) {
-		/* Add it to the queue */
-		g_queue_push_head (pool->tasks, task);
+		/* Add it to the PROCESS queue */
+		g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_PROCESS], task);
 		task->pool = pool;
 	} else {
 		/* Make sure it was a WAIT task */
 		g_assert (task->status == PROCESSING_TASK_STATUS_WAIT);
+		/* Move task from WAIT queue to PROCESS queue */
+		g_queue_delete_link (pool->tasks[PROCESSING_TASK_STATUS_WAIT], previous);
+		g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_PROCESS], task);
 	}
 
 	/* Set status of the task as PROCESS */
@@ -300,7 +347,7 @@ processing_pool_process_task (ProcessingPool                     *pool,
 	                                        sparql_update_cb,
 	                                        task);
 
-	return (!processing_pool_limit_reached (pool));
+	return (!processing_pool_process_limit_reached (pool));
 }
 
 void
@@ -313,25 +360,32 @@ processing_pool_remove_task (ProcessingPool *pool,
 	g_assert (pool == task->pool);
 
 	/* Make sure the task was in the pool */
-	in_pool = g_queue_find (pool->tasks, task);
+	in_pool = g_queue_find (pool->tasks[task->status], task);
 	g_assert (in_pool != NULL);
 
-	g_queue_delete_link (pool->tasks, in_pool);
+	g_queue_delete_link (pool->tasks[task->status], in_pool);
 	task->pool = NULL;
+	task->status = PROCESSING_TASK_STATUS_NO_POOL;
+}
+
+guint
+processing_pool_get_wait_task_count (ProcessingPool *pool)
+{
+	return g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_WAIT]);
 }
 
 guint
-processing_pool_get_task_count (ProcessingPool *pool)
+processing_pool_get_process_task_count (ProcessingPool *pool)
 {
-	return g_queue_get_length (pool->tasks);
+	return g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_PROCESS]);
 }
 
 ProcessingTask *
-processing_pool_get_last_wait  (ProcessingPool *pool)
+processing_pool_get_last_wait (ProcessingPool *pool)
 {
 	GList *li;
 
-	for (li = pool->tasks->tail; li; li = g_list_previous (li)) {
+	for (li = pool->tasks[PROCESSING_TASK_STATUS_WAIT]->tail; li; li = g_list_previous (li)) {
 		ProcessingTask *task = li->data;
 
 		if (task->status == PROCESSING_TASK_STATUS_WAIT) {
@@ -346,5 +400,11 @@ processing_pool_foreach (ProcessingPool *pool,
                          GFunc           func,
                          gpointer        user_data)
 {
-	g_queue_foreach (pool->tasks, func, user_data);
+	guint i;
+
+	for (i = PROCESSING_TASK_STATUS_WAIT;
+	     i < PROCESSING_TASK_STATUS_PROCESS;
+	     i++) {
+		g_queue_foreach (pool->tasks[i], func, user_data);
+	}
 }
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index ab2275a..e2274ae 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -47,31 +47,36 @@ void            processing_task_set_sparql  (ProcessingTask *task,
                                              gchar          *sparql);
 
 
-ProcessingPool *processing_pool_new           (TrackerSparqlConnection *connection,
-                                               guint                    limit);
-void            processing_pool_free          (ProcessingPool          *pool);
-void            processing_pool_set_limit     (ProcessingPool          *pool,
-                                               guint                    limit);
-guint           processing_pool_get_limit     (ProcessingPool          *pool);
-ProcessingTask *processing_pool_find_task     (ProcessingPool          *pool,
-                                               GFile                   *file,
-                                               gboolean                 path_search);
-gboolean        processing_pool_limit_reached (ProcessingPool          *pool);
-void            processing_pool_remove_task   (ProcessingPool          *pool,
-                                               ProcessingTask          *task);
-gboolean        processing_pool_wait_task     (ProcessingPool          *pool,
-                                               ProcessingTask          *task);
-gboolean        processing_pool_process_task  (ProcessingPool          *pool,
-                                               ProcessingTask          *task,
-                                               ProcessingPoolTaskFinishedCallback  finished_handler,
-                                               gpointer                 user_data);
+ProcessingPool *processing_pool_new                   (TrackerSparqlConnection *connection,
+                                                       guint                    limit_wait,
+                                                       guint                    limit_process);
+void            processing_pool_free                  (ProcessingPool          *pool);
+void            processing_pool_set_wait_limit        (ProcessingPool          *pool,
+                                                       guint                    limit);
+void            processing_pool_set_process_limit     (ProcessingPool          *pool,
+                                                       guint                    limit);
+guint           processing_pool_get_wait_limit        (ProcessingPool          *pool);
+guint           processing_pool_get_process_limit     (ProcessingPool          *pool);
+ProcessingTask *processing_pool_find_task             (ProcessingPool          *pool,
+                                                       GFile                   *file,
+                                                       gboolean                 path_search);
+gboolean        processing_pool_wait_limit_reached    (ProcessingPool          *pool);
+gboolean        processing_pool_process_limit_reached (ProcessingPool          *pool);
 
-guint           processing_pool_get_task_count (ProcessingPool         *pool);
-ProcessingTask *processing_pool_get_last_wait  (ProcessingPool         *pool);
-
-void            processing_pool_foreach        (ProcessingPool         *pool,
-                                                GFunc                   func,
-                                                gpointer                user_data);
+void            processing_pool_remove_task           (ProcessingPool          *pool,
+                                                       ProcessingTask          *task);
+gboolean        processing_pool_wait_task             (ProcessingPool          *pool,
+                                                       ProcessingTask          *task);
+gboolean        processing_pool_process_task          (ProcessingPool          *pool,
+                                                       ProcessingTask          *task,
+                                                       ProcessingPoolTaskFinishedCallback  finished_handler,
+                                                       gpointer                 user_data);
+guint           processing_pool_get_wait_task_count    (ProcessingPool         *pool);
+guint           processing_pool_get_process_task_count (ProcessingPool         *pool);
+ProcessingTask *processing_pool_get_last_wait          (ProcessingPool         *pool);
+void            processing_pool_foreach                (ProcessingPool         *pool,
+                                                        GFunc                   func,
+                                                        gpointer                user_data);
 
 G_END_DECLS
 
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 687b5a9..f6c46e6 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -39,8 +39,9 @@
 /* If defined will print contents of populated mtime cache while running */
 #undef PRINT_MTIME_CACHE_CONTENTS
 
-/* Default processing pool limit to be set */
-#define DEFAULT_POOL_LIMIT 1
+/* Default processing pool limits to be set */
+#define DEFAULT_WAIT_POOL_LIMIT 1
+#define DEFAULT_PROCESS_POOL_LIMIT 1
 
 /**
  * SECTION:tracker-miner-fs
@@ -195,7 +196,8 @@ enum {
 enum {
 	PROP_0,
 	PROP_THROTTLE,
-	PROP_POOL_LIMIT,
+	PROP_WAIT_POOL_LIMIT,
+	PROP_PROCESS_POOL_LIMIT,
 	PROP_MTIME_CHECKING,
 	PROP_INITIAL_CRAWLING
 };
@@ -320,11 +322,20 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
 	                                                      0, 1, 0,
 	                                                      G_PARAM_READWRITE));
 	g_object_class_install_property (object_class,
-	                                 PROP_POOL_LIMIT,
+	                                 PROP_WAIT_POOL_LIMIT,
+	                                 g_param_spec_uint ("wait-pool-limit",
+	                                                    "Processing pool limit for WAIT tasks",
+	                                                    "Maximum number of files that can be concurrently "
+	                                                    "processed by the upper layer",
+	                                                    1, G_MAXUINT, DEFAULT_WAIT_POOL_LIMIT,
+	                                                    G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
+	g_object_class_install_property (object_class,
+	                                 PROP_PROCESS_POOL_LIMIT,
 	                                 g_param_spec_uint ("process-pool-limit",
-	                                                    "Processing pool limit",
-	                                                    "Number of files that can be concurrently processed",
-	                                                    1, G_MAXUINT, DEFAULT_POOL_LIMIT,
+	                                                    "Processing pool limit for PROCESS tasks",
+	                                                    "Maximum number of SPARQL updates that can be merged "
+	                                                    "in a single connection to the store",
+	                                                    1, G_MAXUINT, DEFAULT_PROCESS_POOL_LIMIT,
 	                                                    G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
 	g_object_class_install_property (object_class,
 	                                 PROP_MTIME_CHECKING,
@@ -567,7 +578,8 @@ tracker_miner_fs_init (TrackerMinerFS *object)
 
 	/* Create processing pool */
 	priv->processing_pool = processing_pool_new (tracker_miner_get_connection (TRACKER_MINER (object)),
-	                                             DEFAULT_POOL_LIMIT);
+	                                             DEFAULT_WAIT_POOL_LIMIT,
+	                                             DEFAULT_PROCESS_POOL_LIMIT);
 
 	/* Set up the crawlers now we have config and hal */
 	priv->crawler = tracker_crawler_new ();
@@ -712,9 +724,13 @@ fs_set_property (GObject      *object,
 		tracker_miner_fs_set_throttle (TRACKER_MINER_FS (object),
 		                               g_value_get_double (value));
 		break;
-	case PROP_POOL_LIMIT:
-		processing_pool_set_limit (fs->private->processing_pool,
-		                           g_value_get_uint (value));
+	case PROP_WAIT_POOL_LIMIT:
+		processing_pool_set_wait_limit (fs->private->processing_pool,
+		                                g_value_get_uint (value));
+		break;
+	case PROP_PROCESS_POOL_LIMIT:
+		processing_pool_set_process_limit (fs->private->processing_pool,
+		                                   g_value_get_uint (value));
 		break;
 	case PROP_MTIME_CHECKING:
 		fs->private->mtime_checking = g_value_get_boolean (value);
@@ -742,9 +758,13 @@ fs_get_property (GObject    *object,
 	case PROP_THROTTLE:
 		g_value_set_double (value, fs->private->throttle);
 		break;
-	case PROP_POOL_LIMIT:
+	case PROP_WAIT_POOL_LIMIT:
+		g_value_set_uint (value,
+		                  processing_pool_get_wait_limit (fs->private->processing_pool));
+		break;
+	case PROP_PROCESS_POOL_LIMIT:
 		g_value_set_uint (value,
-		                  processing_pool_get_limit (fs->private->processing_pool));
+		                  processing_pool_get_process_limit (fs->private->processing_pool));
 		break;
 	case PROP_MTIME_CHECKING:
 		g_value_set_boolean (value, fs->private->mtime_checking);
@@ -1644,7 +1664,7 @@ item_add_or_update (TrackerMinerFS *fs,
 	if (do_process_file (fs, task)) {
 		fs->private->total_files_processed++;
 
-		if (processing_pool_limit_reached (priv->processing_pool)) {
+		if (processing_pool_wait_limit_reached (priv->processing_pool)) {
 			retval = FALSE;
 		}
 	}
@@ -2191,7 +2211,7 @@ item_queue_get_next_file (TrackerMinerFS  *fs,
 		 * info is inserted to the store before the children are
 		 * inspected.
 		 */
-		if (processing_pool_get_task_count (fs->private->processing_pool) > 0) {
+		if (processing_pool_get_wait_task_count (fs->private->processing_pool) > 0) {
 			/* Items still being processed */
 			*file = NULL;
 			*source_file = NULL;
@@ -2422,7 +2442,8 @@ item_queue_handlers_cb (gpointer user_data)
 	case QUEUE_NONE:
 		/* Print stats and signal finished */
 		if (!fs->private->is_crawling &&
-		    processing_pool_get_task_count (fs->private->processing_pool) == 0) {
+		    processing_pool_get_wait_task_count (fs->private->processing_pool) == 0 &&
+		    processing_pool_get_process_task_count (fs->private->processing_pool) == 0) {
 			process_stop (fs);
 		}
 
@@ -2492,7 +2513,7 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
 		return;
 	}
 
-	if (processing_pool_limit_reached (fs->private->processing_pool)) {
+	if (processing_pool_wait_limit_reached (fs->private->processing_pool)) {
 		/* There is no room in the pool for more files */
 		return;
 	}
diff --git a/src/miners/fs/tracker-miner-files.c b/src/miners/fs/tracker-miner-files.c
index c2a596e..e93cea3 100644
--- a/src/miners/fs/tracker-miner-files.c
+++ b/src/miners/fs/tracker-miner-files.c
@@ -1866,7 +1866,7 @@ extractor_create_proxy (DBusGConnection *connection)
 		 * Assuming that the files which need more time to get extracted are PDFs
 		 * using libpoppler, we already have a limit in the PDF extractor not to
 		 * spend more than 5s extraction contents. And, assuming the default
-		 * value of 10 in process-pool-limit, it means we may end up queueing up
+		 * value of 10 in wait-pool-limit, it means we may end up queueing up
 		 * to 10 PDF files which may need 5s each, so in order not to have dbus
 		 * timeouts in this case, any value greater than 5*10 would be good.
 		 */
@@ -2460,7 +2460,8 @@ tracker_miner_files_new (TrackerConfig *config)
 	return g_object_new (TRACKER_TYPE_MINER_FILES,
 	                     "name", "Files",
 	                     "config", config,
-	                     "process-pool-limit", 10,
+	                     "wait-pool-limit", 10,
+	                     "process-pool-limit", 100,
 	                     "mtime-checking", should_check_mtime (config),
 	                     NULL);
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]