[tracker/tracker-0.14] libtracker-miner: Prevent item queue handlers from running when blocked



commit 78a889dd380e390a22b02a193b13013875a1796f
Author: Sam Thursfield <sam thursfield codethink co uk>
Date:   Wed Jun 27 14:41:29 2012 +0100

    libtracker-miner: Prevent item queue handlers from running when blocked
    
    If we pop the next item from a queue and find that either that file or
    its parent is still being processed, the item queue handler must wait
    until the outstanding tasks finish and are committed.
    
    In this case we stop the item_queue_handlers_cb() timeout from
    executing, but it is often restarted early as there are many triggers
    for item_queue_handlers_set_up(), and the callback then pops the file
    again, discovers it is still blocked and reenqueues it. This is a waste
    of time and also makes it hard to implement infinite loop detection
    reliably.
    
    We now remember the file that is blocking progress, and
    item_queue_handlers_set_up() will not start the item queue handlers
    until sparql_buffer_task_finished_cb() has been called for the file
    in question.

 src/libtracker-miner/tracker-miner-fs.c |   53 +++++++++++++++++++++++++++++-
 1 files changed, 51 insertions(+), 2 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 950b1b8..fb44d5c 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -165,6 +165,7 @@ struct _TrackerMinerFSPrivate {
 	GTimer         *extraction_timer;
 
 	guint           item_queues_handler_id;
+	GFile          *item_queue_blocker;
 
 	gdouble         throttle;
 
@@ -650,6 +651,10 @@ fs_finalize (GObject *object)
 	if (priv->item_queues_handler_id) {
 		g_source_remove (priv->item_queues_handler_id);
 		priv->item_queues_handler_id = 0;
+
+		if (priv->item_queue_blocker) {
+			g_object_unref (priv->item_queue_blocker);
+		}
 	}
 
 	tracker_file_notifier_stop (priv->file_notifier);
@@ -986,6 +991,21 @@ item_writeback_data_free (ItemWritebackData *data)
 	g_slice_free (ItemWritebackData, data);
 }
 
+static gboolean
+item_queue_is_blocked_by_file (TrackerMinerFS *fs,
+                               GFile *file)
+{
+	g_return_val_if_fail (G_IS_FILE (file), FALSE);
+
+	if (fs->priv->item_queue_blocker != NULL &&
+	    (fs->priv->item_queue_blocker == file ||
+	     g_file_equal (fs->priv->item_queue_blocker, file))) {
+		return TRUE;
+	}
+
+	return FALSE;
+}
+
 static void
 sparql_buffer_task_finished_cb (GObject      *object,
                                 GAsyncResult *result,
@@ -993,6 +1013,8 @@ sparql_buffer_task_finished_cb (GObject      *object,
 {
 	TrackerMinerFS *fs;
 	TrackerMinerFSPrivate *priv;
+	TrackerTask *task;
+	GFile *task_file;
 	GError *error = NULL;
 
 	fs = user_data;
@@ -1005,7 +1027,22 @@ sparql_buffer_task_finished_cb (GObject      *object,
 		g_error_free (error);
 	}
 
-	item_queue_handlers_set_up (fs);
+	task = g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (result));
+	task_file = tracker_task_get_file (task);
+
+	if (item_queue_is_blocked_by_file (fs, task_file)) {
+		g_object_unref (priv->item_queue_blocker);
+		priv->item_queue_blocker = NULL;
+	}
+
+	if (priv->item_queue_blocker != NULL) {
+		if (tracker_task_pool_get_size (TRACKER_TASK_POOL (object)) > 0) {
+			tracker_sparql_buffer_flush (TRACKER_SPARQL_BUFFER (object),
+			                             "Item queue still blocked after flush");
+		}
+	} else {
+		item_queue_handlers_set_up (fs);
+	}
 }
 
 static UpdateProcessingTaskContext *
@@ -1199,6 +1236,10 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 		                            ctxt->priority,
 		                            sparql_buffer_task_finished_cb,
 		                            fs);
+
+		if (item_queue_is_blocked_by_file (fs, task_file)) {
+			tracker_sparql_buffer_flush (fs->priv->sparql_buffer, "Current file is blocking item queue");
+		}
 	}
 
 	if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
@@ -1708,6 +1749,7 @@ should_wait (TrackerMinerFS *fs,
 	    tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), file)) {
 		/* Yes, a previous event on same item currently
 		 * being processed */
+		fs->priv->item_queue_blocker = g_object_ref (file);
 		return TRUE;
 	}
 
@@ -1718,7 +1760,7 @@ should_wait (TrackerMinerFS *fs,
 		    tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), parent)) {
 			/* Yes, a previous event on the parent of this item
 			 * currently being processed */
-			g_object_unref (parent);
+			fs->priv->item_queue_blocker = parent;
 			return TRUE;
 		}
 
@@ -2033,6 +2075,7 @@ item_queue_handlers_cb (gpointer user_data)
 		 * on with the queues... */
 		tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
 		                             "Queue handlers WAIT");
+
 		return FALSE;
 	}
 
@@ -2267,6 +2310,12 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
 		return;
 	}
 
+	if (fs->priv->item_queue_blocker) {
+		trace_eq ("   cancelled: item queue blocked waiting for file '%s'",
+		          g_file_get_path (fs->priv->item_queue_blocker));
+		return;
+	}
+
 	/* Already sent max number of tasks to tracker-extract/writeback? */
 	if (tracker_task_pool_limit_reached (fs->priv->task_pool) ||
 	    tracker_task_pool_limit_reached (fs->priv->writeback_pool)) {



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]