[tracker/miner-fs-refactor-multi-insert: 8/14] libtracker-miner: Split internal PROCESS queue into two: READY and PROCESSING



commit 6f96d0838aa7bf533d1fd1445f360a01a5ad7636
Author: Aleksander Morgado <aleksander lanedo com>
Date:   Tue Oct 19 18:40:01 2010 +0200

    libtracker-miner: Split internal PROCESS queue into two: READY and PROCESSING
    
     * When having a single queue for items in the SPARQL buffer and items being
       pushed to the store; as the limit of the queue was a single one, we were
       actually having an undesired behaviour: While an array-update of 100 items
       was being done, every new task pushed to the processing pool was not
       buffered, as the limit of the queue was reached.
    
     * So now, the READY queue holds the tasks being buffered, and the PROCESSING
       queue holds the items currently being pushed to the store.

 .../tracker-miner-fs-processing-pool.c             |  175 ++++++++++++--------
 .../tracker-miner-fs-processing-pool.h             |   13 +-
 src/libtracker-miner/tracker-miner-fs.c            |   67 ++++----
 src/miners/fs/tracker-miner-files.c                |    4 +-
 4 files changed, 151 insertions(+), 108 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index 88b71fb..3f4b021 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -22,7 +22,7 @@
  * How the processing pool works.
  *
  * 1. This processing pool is used to determine which files are being currently
- *    processed by tracker-miner-fs, and there are currently 2 kind of tasks
+ *    processed by tracker-miner-fs, and there are currently 3 kind of tasks
  *    considered in this pool:
  *   1.1. "WAIT" tasks are those used to specify tasks which still do not have a
  *        full SPARQL built. Currently, tasks in the WAIT status could be:
@@ -32,8 +32,10 @@
  *         o Tasks added while the upper layer is actually processing the given
  *           files (until the tracker_miner_fs_file_notify() is called by the
  *           upper layer).
- *   1.2. "PROCESS" tasks are those used to specify tasks which have a proper
- *         SPARQL string ready to be pushed to tracker-store.
+ *   1.2. "READY" tasks are those used to specify tasks which have a proper
+ *         SPARQL string ready to be pushed to tracker-store. If
+ *   1.3. "PROCESSING" tasks are those used to specify tasks that are currently
+ *         being pushed to the store.
  *
  * 2. The current possible flows for tasks added to the processing pool are:
  *   2.1. Full SPARQL is ready before pushing the task to the pool. This is
@@ -41,57 +43,64 @@
  *        like this:
  *         - processing_task_new() to create a new task
  *         - processing_task_set_sparql() to set the full SPARQL in the task.
- *         - processing_pool_process_task() to push the newly created task
- *           into the processing pool as a "PROCESS" task.
+ *         - processing_pool_push_ready_task() to push the newly created task
+ *           into the processing pool as a "READY" task.
  *
  *   2.2. The full SPARQL is still not available, as the upper layers need to
  *        process the file (like extracting metadata using tracker-extract
  *        in the case of TrackerMinerFiles). This case would correspond to
  *        CREATED or UPDATED events:
  *         - processing_task_new() to create a new task
- *         - processing_pool_wait_task() to push the newly created task into
- *           the processing pool as a "WAIT" task.
+ *         - processing_pool_push_wait_task() to push the newly created task
+ *           into the processing pool as a "WAIT" task.
  *         - processing_task_set_sparql() to set the full SPARQL in the task
  *           (when the upper layers finished building it).
- *         - processing_pool_process_task() to push the newly created task
- *           into the processing pool as a "PROCESS" task.
+ *         - processing_pool_push_ready_task() to push the newly created task
+ *           into the processing pool as a "READY" task.
+ *
+ *   2.3. Note that "PROCESSING" tasks are an internal status of the pool, the
+ *        user of the processing pool cannot push a task with this status.
  *
  * 3. The number of tasks pushed to the pull as "WAIT" tasks is limited to the
  *    number set while creating the pool. This value corresponds to the
- *    "wait-pool-limit" property in the TrackerMinerFS object, and currently is
- *    set to 1 for TrackerMinerApplications and to 10 to TrackerMinerFiles. In
- *    the case of TrackerMinerFiles, this number specifies the maximum number of
- *    extraction requests that can be managed in parallel.
+ *    "processing-pool-wait-limit" property in the TrackerMinerFS object, and
+ *    currently is set to 1 for TrackerMinerApplications and to 10 to
+ *    TrackerMinerFiles. In the case of TrackerMinerFiles, this number specifies
+ *    the maximum number of extraction requests that can be managed in parallel.
  *
- * 4. The number of tasks pushed to the pull as "PROCESS" tasks is limited to
+ * 4. The number of tasks pushed to the pull as "READY" tasks is limited to
  *    the number set while creating the pool. This value corresponds to the
- *    "process-pool-limit" property in the TrackerMinerFS object, and currently
- *    is set to 1 for TrackerMinerApplications and to 100 to TrackerMinerFiles.
- *    In the case of TrackerMinerFiles, this number specifies the maximum number
- *    of SPARQL updates that can be merged into a single multi-insert SPARQL
- *    connection.
+ *    "processing-pool-ready-limit" property in the TrackerMinerFS object, and
+ *    currently is set to 1 for TrackerMinerApplications and to 100 to
+ *    TrackerMinerFiles. In the case of TrackerMinerFiles, this number specifies
+ *    the maximum number of SPARQL updates that can be merged into a single
+ *    multi-insert SPARQL connection.
  *
- * 5. When a task is pushed to the pool as a "PROCESS" task, the pool will be in
+ * 5. When a task is pushed to the pool as a "READY" task, the pool will be in
  *    charge of executing the SPARQL update into the store.
  *
- * 6. If buffering was requested when processing_pool_process_task() was used to
- *    push the new task in the pool as a "PROCESS" task, this task will be added
- *    internally into a SPARQL buffer. This SPARQL buffer will be flushed
+ * 6. If buffering was requested when processing_pool_push_ready_task() was used
+ *    to push the new task in the pool as a "READY" task, this task will be
+ *    added internally into a SPARQL buffer. This SPARQL buffer will be flushed
  *    (pushing all collected SPARQL updates into the store) if one of these
  *    conditions is met:
  *      (a) The file corresponding to the task pushed doesn't have a parent.
  *      (b) The parent of the file corresponding to the task pushed is different
  *          to the parent of the last file pushed to the buffer.
- *      (c) The limit for "PROCESS" tasks in the pool was reached.
+ *      (c) The limit for "READY" tasks in the pool was reached.
  *      (d) The buffer was not flushed in the last MAX_SPARQL_BUFFER_TIME (=15)
  *          seconds.
  *    The buffer is flushed using a single multi-insert SPARQL connection. This
  *    means that an array of SPARQLs is sent to tracker-store, which replies
  *    with an array of GErrors specifying which update failed, if any.
+ *    Once the flushing operation in the buffer is started, the tasks are then
+ *    converted to "PROCESSING" state, until the reply from the store is
+ *    received.
  *
- * 7. If buffering is not requested when processing_pool_process_task() is
+ * 7. If buffering is not requested when processing_pool_push_ready_task() is
  *    called, first the previous buffer is flushed (if any) and then the current
- *    task is updated in the store.
+ *    task is updated in the store, so this task goes directly from "READY" to
+ *    "PROCESSING" state without going through the intermediate buffer.
  *
  * 8. May the gods be with you if you need to fix a bug in here.
  *
@@ -108,7 +117,8 @@
 typedef enum {
 	PROCESSING_TASK_STATUS_NO_POOL,
 	PROCESSING_TASK_STATUS_WAIT = 0,
-	PROCESSING_TASK_STATUS_PROCESS,
+	PROCESSING_TASK_STATUS_READY,
+	PROCESSING_TASK_STATUS_PROCESSING,
 	PROCESSING_TASK_STATUS_LAST
 } ProcessingTaskStatus;
 
@@ -263,7 +273,7 @@ processing_pool_free (ProcessingPool *pool)
 ProcessingPool *
 processing_pool_new (TrackerSparqlConnection *connection,
                      guint                    limit_wait,
-                     guint                    limit_process)
+                     guint                    limit_ready)
 {
 	ProcessingPool *pool;
 
@@ -271,16 +281,19 @@ processing_pool_new (TrackerSparqlConnection *connection,
 
 	pool->connection = g_object_ref (connection);
 	pool->limit[PROCESSING_TASK_STATUS_WAIT] = limit_wait;
-	pool->limit[PROCESSING_TASK_STATUS_PROCESS] = limit_process;
+	pool->limit[PROCESSING_TASK_STATUS_READY] = limit_ready;
+	/* convenience limit, not really used currently */
+	pool->limit[PROCESSING_TASK_STATUS_PROCESSING] = G_MAXUINT;
 
 	pool->tasks[PROCESSING_TASK_STATUS_WAIT] = g_queue_new ();
-	pool->tasks[PROCESSING_TASK_STATUS_PROCESS] = g_queue_new ();
+	pool->tasks[PROCESSING_TASK_STATUS_READY] = g_queue_new ();
+	pool->tasks[PROCESSING_TASK_STATUS_PROCESSING] = g_queue_new ();
 
 	g_debug ("Processing pool created with a limit of "
 	         "%u tasks in WAIT status and "
-	         "%u tasks in PROCESS status",
+	         "%u tasks in READY status",
 	         limit_wait,
-	         limit_process);
+	         limit_ready);
 
 	return pool;
 }
@@ -294,11 +307,11 @@ processing_pool_set_wait_limit (ProcessingPool *pool,
 }
 
 void
-processing_pool_set_process_limit (ProcessingPool *pool,
-                                   guint           limit)
+processing_pool_set_ready_limit (ProcessingPool *pool,
+                                 guint           limit)
 {
-	g_message ("Processing pool limit for PROCESS tasks set to %u", limit);
-	pool->limit[PROCESSING_TASK_STATUS_PROCESS] = limit;
+	g_message ("Processing pool limit for READY tasks set to %u", limit);
+	pool->limit[PROCESSING_TASK_STATUS_READY] = limit;
 }
 
 guint
@@ -308,9 +321,9 @@ processing_pool_get_wait_limit (ProcessingPool *pool)
 }
 
 guint
-processing_pool_get_process_limit (ProcessingPool *pool)
+processing_pool_get_ready_limit (ProcessingPool *pool)
 {
-	return pool->limit[PROCESSING_TASK_STATUS_PROCESS];
+	return pool->limit[PROCESSING_TASK_STATUS_READY];
 }
 
 gboolean
@@ -322,10 +335,10 @@ processing_pool_wait_limit_reached (ProcessingPool *pool)
 }
 
 gboolean
-processing_pool_process_limit_reached (ProcessingPool *pool)
+processing_pool_ready_limit_reached (ProcessingPool *pool)
 {
-	return ((g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_PROCESS]) >=
-	         pool->limit[PROCESSING_TASK_STATUS_PROCESS]) ?
+	return ((g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_READY]) >=
+	         pool->limit[PROCESSING_TASK_STATUS_READY]) ?
 	        TRUE : FALSE);
 }
 
@@ -370,8 +383,8 @@ processing_pool_find_task (ProcessingPool *pool,
 }
 
 void
-processing_pool_wait_task (ProcessingPool *pool,
-                           ProcessingTask *task)
+processing_pool_push_wait_task (ProcessingPool *pool,
+                                ProcessingTask *task)
 {
 	g_assert (task->status == PROCESSING_TASK_STATUS_NO_POOL);
 
@@ -487,6 +500,17 @@ processing_pool_buffer_flush (ProcessingPool *pool)
 		ProcessingTask *task;
 
 		task = g_ptr_array_index (pool->sparql_buffer, i);
+
+		/* Make sure it was a READY task */
+		g_assert (task->status == PROCESSING_TASK_STATUS_READY);
+
+		/* Remove the task from the READY queue and add it to the
+		 * PROCESSING one. */
+		processing_pool_remove_task (pool, task);
+		task->status = PROCESSING_TASK_STATUS_PROCESSING;
+		task->pool = pool;
+		g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_PROCESSING], task);
+
 		/* Add original string, not a duplicate */
 		g_ptr_array_add (sparql_array, task->sparql);
 	}
@@ -517,11 +541,11 @@ processing_pool_buffer_flush (ProcessingPool *pool)
 }
 
 gboolean
-processing_pool_process_task (ProcessingPool                     *pool,
-                              ProcessingTask                     *task,
-                              gboolean                            buffer,
-                              ProcessingPoolTaskFinishedCallback  finished_handler,
-                              gpointer                            user_data)
+processing_pool_push_ready_task (ProcessingPool                     *pool,
+                                 ProcessingTask                     *task,
+                                 gboolean                            buffer,
+                                 ProcessingPoolTaskFinishedCallback  finished_handler,
+                                 gpointer                            user_data)
 {
 	GList *previous;
 
@@ -530,31 +554,32 @@ processing_pool_process_task (ProcessingPool                     *pool,
 
 	/* First, check if the task was already added as being WAITING */
 	previous = g_queue_find (pool->tasks[PROCESSING_TASK_STATUS_WAIT], task);
-	if (!previous) {
-		/* Add it to the PROCESS queue */
-		g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_PROCESS], task);
-		task->pool = pool;
-	} else {
+	if (previous) {
 		/* Make sure it was a WAIT task */
 		g_assert (task->status == PROCESSING_TASK_STATUS_WAIT);
-		/* Move task from WAIT queue to PROCESS queue */
+		/* Remove task from WAIT queue */
 		g_queue_delete_link (pool->tasks[PROCESSING_TASK_STATUS_WAIT], previous);
-		g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_PROCESS], task);
+	} else {
+		/* Set pool */
+		task->pool = pool;
 	}
 
-	/* Set status of the task as PROCESS */
-	task->status = PROCESSING_TASK_STATUS_PROCESS;
-
 	task->finished_handler = finished_handler;
 	task->finished_user_data = user_data;
 
-	/* If buffering not requested, flush previous buffer and then the new update */
-	if (!buffer) {
-		g_debug ("(Processing Pool) Pushed PROCESS task %p for file '%s'",
+	/* If buffering not requested, OR the limit of READY tasks is actually 1,
+	 * flush previous buffer (if any) and then the new update */
+	if (!buffer || pool->limit[PROCESSING_TASK_STATUS_READY] == 1) {
+		g_debug ("(Processing Pool) Pushed READY/PROCESSING task %p for file '%s'",
 		         task, task->file_uri);
 
 		/* Flush previous */
 		processing_pool_buffer_flush (pool);
+
+		/* Set status of the task as PROCESSING (No READY status here!) */
+		task->status = PROCESSING_TASK_STATUS_PROCESSING;
+		g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_PROCESSING], task);
+
 		/* And update the new one */
 		tracker_sparql_connection_update_async (pool->connection,
 		                                        task->sparql,
@@ -568,6 +593,10 @@ processing_pool_process_task (ProcessingPool                     *pool,
 		GFile *parent;
 		gboolean flushed = FALSE;
 
+		/* Set status of the task as READY */
+		task->status = PROCESSING_TASK_STATUS_READY;
+		g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_READY], task);
+
 		/* Get parent of this file we're updating/creating */
 		parent = g_file_get_parent (task->file);
 
@@ -582,7 +611,7 @@ processing_pool_process_task (ProcessingPool                     *pool,
 			pool->sparql_buffer_current_parent = g_object_ref (parent);
 		}
 
-		g_debug ("(Processing Pool) Pushed PROCESS task %p for file '%s' into array %p",
+		g_debug ("(Processing Pool) Pushed READY task %p for file '%s' into array %p",
 		         task, task->file_uri, pool->sparql_buffer);
 
 		/* Add task to array */
@@ -591,12 +620,12 @@ processing_pool_process_task (ProcessingPool                     *pool,
 		/* Flush buffer if:
 		 *  - Last item has no parent
 		 *  - Parent change was detected
-		 *  - 'limit_process' items reached
+		 *  - Maximum number of READY items reached
 		 *  - Not flushed in the last MAX_SPARQL_BUFFER_TIME seconds
 		 */
 		if (!parent ||
 		    !g_file_equal (parent, pool->sparql_buffer_current_parent) ||
-		    processing_pool_process_limit_reached (pool) ||
+		    processing_pool_ready_limit_reached (pool) ||
 		    (time (NULL) - pool->sparql_buffer_start_time > MAX_SPARQL_BUFFER_TIME)) {
 			/* Flush! */
 			processing_pool_buffer_flush (pool);
@@ -635,9 +664,23 @@ processing_pool_get_wait_task_count (ProcessingPool *pool)
 }
 
 guint
-processing_pool_get_process_task_count (ProcessingPool *pool)
+processing_pool_get_ready_task_count (ProcessingPool *pool)
+{
+	return g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_READY]);
+}
+
+guint
+processing_pool_get_total_task_count (ProcessingPool *pool)
 {
-	return g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_PROCESS]);
+	guint total = 0;
+	guint i;
+
+	for (i = PROCESSING_TASK_STATUS_WAIT;
+	     i < PROCESSING_TASK_STATUS_LAST;
+	     i++) {
+		total += g_queue_get_length (pool->tasks[i]);
+	}
+	return total;
 }
 
 ProcessingTask *
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index 28c7c8a..939d526 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -53,27 +53,28 @@ ProcessingPool *processing_pool_new                   (TrackerSparqlConnection *
 void            processing_pool_free                  (ProcessingPool          *pool);
 void            processing_pool_set_wait_limit        (ProcessingPool          *pool,
                                                        guint                    limit);
-void            processing_pool_set_process_limit     (ProcessingPool          *pool,
+void            processing_pool_set_ready_limit       (ProcessingPool          *pool,
                                                        guint                    limit);
 guint           processing_pool_get_wait_limit        (ProcessingPool          *pool);
-guint           processing_pool_get_process_limit     (ProcessingPool          *pool);
+guint           processing_pool_get_ready_limit       (ProcessingPool          *pool);
 ProcessingTask *processing_pool_find_task             (ProcessingPool          *pool,
                                                        GFile                   *file,
                                                        gboolean                 path_search);
 gboolean        processing_pool_wait_limit_reached    (ProcessingPool          *pool);
-gboolean        processing_pool_process_limit_reached (ProcessingPool          *pool);
+gboolean        processing_pool_ready_limit_reached   (ProcessingPool          *pool);
 
 void            processing_pool_remove_task           (ProcessingPool          *pool,
                                                        ProcessingTask          *task);
-void            processing_pool_wait_task             (ProcessingPool          *pool,
+void            processing_pool_push_wait_task        (ProcessingPool          *pool,
                                                        ProcessingTask          *task);
-gboolean        processing_pool_process_task          (ProcessingPool          *pool,
+gboolean        processing_pool_push_ready_task       (ProcessingPool          *pool,
                                                        ProcessingTask          *task,
                                                        gboolean                 buffer,
                                                        ProcessingPoolTaskFinishedCallback  finished_handler,
                                                        gpointer                 user_data);
 guint           processing_pool_get_wait_task_count    (ProcessingPool         *pool);
-guint           processing_pool_get_process_task_count (ProcessingPool         *pool);
+guint           processing_pool_get_ready_task_count   (ProcessingPool         *pool);
+guint           processing_pool_get_total_task_count   (ProcessingPool         *pool);
 ProcessingTask *processing_pool_get_last_wait          (ProcessingPool         *pool);
 void            processing_pool_foreach                (ProcessingPool         *pool,
                                                         GFunc                   func,
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index b15632f..39e95ec 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -41,7 +41,7 @@
 
 /* Default processing pool limits to be set */
 #define DEFAULT_WAIT_POOL_LIMIT 1
-#define DEFAULT_PROCESS_POOL_LIMIT 1
+#define DEFAULT_READY_POOL_LIMIT 1
 
 /**
  * SECTION:tracker-miner-fs
@@ -197,7 +197,7 @@ enum {
 	PROP_0,
 	PROP_THROTTLE,
 	PROP_WAIT_POOL_LIMIT,
-	PROP_PROCESS_POOL_LIMIT,
+	PROP_READY_POOL_LIMIT,
 	PROP_MTIME_CHECKING,
 	PROP_INITIAL_CRAWLING
 };
@@ -323,19 +323,19 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
 	                                                      G_PARAM_READWRITE));
 	g_object_class_install_property (object_class,
 	                                 PROP_WAIT_POOL_LIMIT,
-	                                 g_param_spec_uint ("wait-pool-limit",
+	                                 g_param_spec_uint ("processing-pool-wait-limit",
 	                                                    "Processing pool limit for WAIT tasks",
 	                                                    "Maximum number of files that can be concurrently "
 	                                                    "processed by the upper layer",
 	                                                    1, G_MAXUINT, DEFAULT_WAIT_POOL_LIMIT,
 	                                                    G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
 	g_object_class_install_property (object_class,
-	                                 PROP_PROCESS_POOL_LIMIT,
-	                                 g_param_spec_uint ("process-pool-limit",
-	                                                    "Processing pool limit for PROCESS tasks",
+	                                 PROP_READY_POOL_LIMIT,
+	                                 g_param_spec_uint ("processing-pool-ready-limit",
+	                                                    "Processing pool limit for READY tasks",
 	                                                    "Maximum number of SPARQL updates that can be merged "
 	                                                    "in a single connection to the store",
-	                                                    1, G_MAXUINT, DEFAULT_PROCESS_POOL_LIMIT,
+	                                                    1, G_MAXUINT, DEFAULT_READY_POOL_LIMIT,
 	                                                    G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
 	g_object_class_install_property (object_class,
 	                                 PROP_MTIME_CHECKING,
@@ -579,7 +579,7 @@ tracker_miner_fs_init (TrackerMinerFS *object)
 	/* Create processing pool */
 	priv->processing_pool = processing_pool_new (tracker_miner_get_connection (TRACKER_MINER (object)),
 	                                             DEFAULT_WAIT_POOL_LIMIT,
-	                                             DEFAULT_PROCESS_POOL_LIMIT);
+	                                             DEFAULT_READY_POOL_LIMIT);
 
 	/* Set up the crawlers now we have config and hal */
 	priv->crawler = tracker_crawler_new ();
@@ -728,9 +728,9 @@ fs_set_property (GObject      *object,
 		processing_pool_set_wait_limit (fs->private->processing_pool,
 		                                g_value_get_uint (value));
 		break;
-	case PROP_PROCESS_POOL_LIMIT:
-		processing_pool_set_process_limit (fs->private->processing_pool,
-		                                   g_value_get_uint (value));
+	case PROP_READY_POOL_LIMIT:
+		processing_pool_set_ready_limit (fs->private->processing_pool,
+		                                 g_value_get_uint (value));
 		break;
 	case PROP_MTIME_CHECKING:
 		fs->private->mtime_checking = g_value_get_boolean (value);
@@ -762,9 +762,9 @@ fs_get_property (GObject    *object,
 		g_value_set_uint (value,
 		                  processing_pool_get_wait_limit (fs->private->processing_pool));
 		break;
-	case PROP_PROCESS_POOL_LIMIT:
+	case PROP_READY_POOL_LIMIT:
 		g_value_set_uint (value,
-		                  processing_pool_get_process_limit (fs->private->processing_pool));
+		                  processing_pool_get_ready_limit (fs->private->processing_pool));
 		break;
 	case PROP_MTIME_CHECKING:
 		g_value_set_boolean (value, fs->private->mtime_checking);
@@ -1582,13 +1582,13 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 		}
 
 		processing_task_set_sparql (task, full_sparql);
-		/* If process_task() returns FALSE, it means the actual db update was delayed,
+		/* If push_ready_task() returns FALSE, it means the actual db update was delayed,
 		 * and in this case we need to setup queue handlers again */
-		if (!processing_pool_process_task (fs->private->processing_pool,
-		                                   task,
-		                                   TRUE, /* buffer! */
-		                                   processing_pool_task_finished_cb,
-		                                   fs)) {
+		if (!processing_pool_push_ready_task (fs->private->processing_pool,
+		                                      task,
+		                                      TRUE, /* buffer! */
+		                                      processing_pool_task_finished_cb,
+		                                      fs)) {
 			item_queue_handlers_set_up (fs);
 		}
 		g_free (full_sparql);
@@ -1664,7 +1664,7 @@ item_add_or_update (TrackerMinerFS *fs,
 	                                                              cancellable,
 	                                                              sparql),
 	                             (GFreeFunc) update_process_task_context_free);
-	processing_pool_wait_task (priv->processing_pool, task);
+	processing_pool_push_wait_task (priv->processing_pool, task);
 
 	if (do_process_file (fs, task)) {
 		fs->private->total_files_processed++;
@@ -1731,13 +1731,13 @@ item_remove (TrackerMinerFS *fs,
 	/* Add new task to processing pool */
 	task = processing_task_new (file);
 	processing_task_set_sparql (task, sparql->str);
-	/* If process_task() returns FALSE, it means the actual db update was delayed,
+	/* If push_ready_task() returns FALSE, it means the actual db update was delayed,
 	 * and in this case we need to setup queue handlers again */
-	if (!processing_pool_process_task (fs->private->processing_pool,
-	                                   task,
-	                                   FALSE,
-	                                   processing_pool_task_finished_cb,
-	                                   fs)) {
+	if (!processing_pool_push_ready_task (fs->private->processing_pool,
+	                                      task,
+	                                      FALSE,
+	                                      processing_pool_task_finished_cb,
+	                                      fs)) {
 		item_queue_handlers_set_up (fs);
 	}
 
@@ -2052,13 +2052,13 @@ item_move (TrackerMinerFS *fs,
 	/* Add new task to processing pool */
 	task = processing_task_new (file);
 	processing_task_set_sparql (task, sparql->str);
-	/* If process_task() returns FALSE, it means the actual db update was delayed,
+	/* If push_ready_task() returns FALSE, it means the actual db update was delayed,
 	 * and in this case we need to setup queue handlers again */
-	if (!processing_pool_process_task (fs->private->processing_pool,
-	                                   task,
-	                                   FALSE,
-	                                   processing_pool_task_finished_cb,
-	                                   fs)) {
+	if (!processing_pool_push_ready_task (fs->private->processing_pool,
+	                                      task,
+	                                      FALSE,
+	                                      processing_pool_task_finished_cb,
+	                                      fs)) {
 		item_queue_handlers_set_up (fs);
 	}
 
@@ -2465,8 +2465,7 @@ item_queue_handlers_cb (gpointer user_data)
 	case QUEUE_NONE:
 		/* Print stats and signal finished */
 		if (!fs->private->is_crawling &&
-		    processing_pool_get_wait_task_count (fs->private->processing_pool) == 0 &&
-		    processing_pool_get_process_task_count (fs->private->processing_pool) == 0) {
+		    processing_pool_get_total_task_count (fs->private->processing_pool) == 0) {
 			process_stop (fs);
 		}
 
diff --git a/src/miners/fs/tracker-miner-files.c b/src/miners/fs/tracker-miner-files.c
index 2b98c02..0580079 100644
--- a/src/miners/fs/tracker-miner-files.c
+++ b/src/miners/fs/tracker-miner-files.c
@@ -2460,8 +2460,8 @@ tracker_miner_files_new (TrackerConfig *config)
 	return g_object_new (TRACKER_TYPE_MINER_FILES,
 	                     "name", "Files",
 	                     "config", config,
-	                     "wait-pool-limit", 10,
-	                     "process-pool-limit", 100,
+	                     "processing-pool-wait-limit", 10,
+	                     "processing-pool-ready-limit", 100,
 	                     "mtime-checking", should_check_mtime (config),
 	                     NULL);
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]