[tracker] libtracker-miner: Improve traces in processing pool, report tasks in timeout



commit 6a32bb9ee6d0fefc9e9ba9b8c9ebe67302ec8cdd
Author: Aleksander Morgado <aleksander lanedo com>
Date:   Tue Dec 14 16:13:25 2010 +0100

    libtracker-miner: Improve traces in processing pool, report tasks in timeout

 .../tracker-miner-fs-processing-pool.c             |   98 +++++++++++++++++---
 .../tracker-miner-fs-processing-pool.h             |    5 +-
 src/libtracker-miner/tracker-miner-fs.c            |    3 +-
 3 files changed, 90 insertions(+), 16 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index 5078dc9..cf2fbfc 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -113,6 +113,7 @@
 /* If defined, will dump additional traces */
 #ifdef PROCESSING_POOL_ENABLE_TRACE
 #warning Processing pool traces are enabled
+#define POOL_STATUS_TRACE_TIMEOUT_SECS 10
 #define trace(message, ...) g_debug (message, ##__VA_ARGS__)
 #else
 #define trace(...)
@@ -157,6 +158,8 @@ struct _TrackerProcessingTask {
 };
 
 struct _TrackerProcessingPool {
+	/* Owner of the pool */
+	GObject *owner;
 	/* Connection to the Store */
 	TrackerSparqlConnection *connection;
 
@@ -169,6 +172,12 @@ struct _TrackerProcessingPool {
 	GPtrArray *sparql_buffer;
 	GFile *sparql_buffer_current_parent;
 	time_t sparql_buffer_start_time;
+
+	/* Timeout to notify status of the queues, if traces
+	 * enabled only. */
+#ifdef PROCESSING_POOL_ENABLE_TRACE
+	guint timeout_id;
+#endif /* PROCESSING_POOL_ENABLE_TRACE */
 };
 
 /*------------------- PROCESSING TASK ----------------------*/
@@ -271,6 +280,43 @@ tracker_processing_task_set_sparql_string (TrackerProcessingTask *task,
 
 /*------------------- PROCESSING POOL ----------------------*/
 
+#ifdef PROCESSING_POOL_ENABLE_TRACE
+static const gchar *queue_names [TRACKER_PROCESSING_TASK_STATUS_LAST] = {
+	"WAIT",
+	"READY",
+	"PROCESSING"
+};
+
+static gboolean
+pool_status_trace_timeout_cb (gpointer data)
+{
+	TrackerProcessingPool *pool = data;
+	guint i;
+
+	trace ("(Processing Pool %s) ------------",
+	       G_OBJECT_TYPE_NAME (pool->owner));
+	for (i = TRACKER_PROCESSING_TASK_STATUS_WAIT;
+	     i < TRACKER_PROCESSING_TASK_STATUS_LAST;
+	     i++) {
+		GList *l;
+
+		l = g_queue_peek_head_link (pool->tasks[i]);
+		trace ("(Processing Pool %s) Queue %s has %u tasks",
+		       G_OBJECT_TYPE_NAME (pool->owner),
+		       queue_names[i],
+		       g_list_length (l));
+		while (l) {
+			trace ("(Processing Pool %s)     Task %p in queue %s",
+			       G_OBJECT_TYPE_NAME (pool->owner),
+			       l->data,
+			       queue_names[i]);
+			l = g_list_next (l);
+		}
+	}
+	return TRUE;
+}
+#endif /* PROCESSING_POOL_ENABLE_TRACE */
+
 static void
 pool_queue_free_foreach (gpointer data,
                          gpointer user_data)
@@ -292,6 +338,11 @@ tracker_processing_pool_free (TrackerProcessingPool *pool)
 	if (!pool)
 		return;
 
+#ifdef PROCESSING_POOL_ENABLE_TRACE
+	if (pool->timeout_id)
+		g_source_remove (pool->timeout_id);
+#endif /* PROCESSING_POOL_ENABLE_TRACE */
+
 	/* Free any pending task here... shouldn't really
 	 * be any */
 	for (i = TRACKER_PROCESSING_TASK_STATUS_WAIT;
@@ -316,7 +367,8 @@ tracker_processing_pool_free (TrackerProcessingPool *pool)
 }
 
 TrackerProcessingPool *
-tracker_processing_pool_new (TrackerSparqlConnection *connection,
+tracker_processing_pool_new (GObject                 *owner,
+                             TrackerSparqlConnection *connection,
                              guint                    limit_wait,
                              guint                    limit_ready)
 {
@@ -324,6 +376,7 @@ tracker_processing_pool_new (TrackerSparqlConnection *connection,
 
 	pool = g_new0 (TrackerProcessingPool, 1);
 
+	pool->owner = owner;
 	pool->connection = g_object_ref (connection);
 	pool->limit[TRACKER_PROCESSING_TASK_STATUS_WAIT] = limit_wait;
 	pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] = limit_ready;
@@ -340,6 +393,12 @@ tracker_processing_pool_new (TrackerSparqlConnection *connection,
 	         limit_wait,
 	         limit_ready);
 
+#ifdef PROCESSING_POOL_ENABLE_TRACE
+	pool->timeout_id = g_timeout_add_seconds (POOL_STATUS_TRACE_TIMEOUT_SECS,
+	                                          pool_status_trace_timeout_cb,
+	                                          pool);
+#endif /* PROCESSING_POOL_ENABLE_TRACE */
+
 	return pool;
 }
 
@@ -347,7 +406,8 @@ void
 tracker_processing_pool_set_wait_limit (TrackerProcessingPool *pool,
                                         guint                  limit)
 {
-	g_message ("Processing pool limit for WAIT tasks set to %u", limit);
+	g_message ("Processing pool limit for WAIT tasks set to %u",
+	           limit);
 	pool->limit[TRACKER_PROCESSING_TASK_STATUS_WAIT] = limit;
 }
 
@@ -355,7 +415,8 @@ void
 tracker_processing_pool_set_ready_limit (TrackerProcessingPool *pool,
                                          guint                  limit)
 {
-	g_message ("Processing pool limit for READY tasks set to %u", limit);
+	g_message ("Processing pool limit for READY tasks set to %u",
+	           limit);
 	pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] = limit;
 }
 
@@ -437,8 +498,10 @@ tracker_processing_pool_push_wait_task (TrackerProcessingPool *pool,
 	task->status = TRACKER_PROCESSING_TASK_STATUS_WAIT;
 
 
-	trace ("(Processing Pool) Pushed WAIT task %p for file '%s'",
-	       task, task->file_uri);
+	trace ("(Processing Pool %s) Pushed WAIT task %p for file '%s'",
+	       G_OBJECT_TYPE_NAME (pool->owner),
+	       task,
+	       task->file_uri);
 
 	/* Push a new task in WAIT status (so just add it to the tasks queue,
 	 * and don't process it. */
@@ -459,7 +522,8 @@ tracker_processing_pool_sparql_update_cb (GObject      *object,
 	task = user_data;
 
 	trace ("(Processing Pool) Finished update of task %p for file '%s'",
-	       task, task->file_uri);
+	       task,
+	       task->file_uri);
 
 	/* Before calling user-provided callback, REMOVE the task from the pool;
 	 * as the user-provided callback may actually modify the pool again */
@@ -493,7 +557,7 @@ tracker_processing_pool_sparql_update_array_cb (GObject      *object,
 	                                                                     result,
 	                                                                     &global_error);
 	if (global_error) {
-		g_critical ("(Sparql buffer) Could not execute array-update of tasks %p with '%u' items: %s",
+		g_critical ("(Processing Pool) Could not execute array-update of tasks %p with '%u' items: %s",
 		            sparql_array,
 		            sparql_array->len,
 		            global_error->message);
@@ -560,7 +624,8 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 		                   task->sparql_string);
 	}
 
-	trace ("(Processing Pool) Flushing array-update of tasks %p with %u items (%s)",
+	trace ("(Processing Pool %s) Flushing array-update of tasks %p with %u items (%s)",
+	       G_OBJECT_TYPE_NAME (pool->owner),
 	       pool->sparql_buffer,
 	       pool->sparql_buffer->len,
 	       reason ? reason : "Unknown reason");
@@ -617,8 +682,10 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
 	/* If buffering not requested, OR the limit of READY tasks is actually 1,
 	 * flush previous buffer (if any) and then the new update */
 	if (!buffer || pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1) {
-		trace ("(Processing Pool) Pushed READY/PROCESSING task %p for file '%s'",
-		       task, task->file_uri);
+		trace ("(Processing Pool %s) Pushed READY/PROCESSING task %p for file '%s'",
+		       G_OBJECT_TYPE_NAME (pool->owner),
+		       task,
+		       task->file_uri);
 
 		/* Flush previous */
 		tracker_processing_pool_buffer_flush (pool,
@@ -628,7 +695,9 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
 		task->status = TRACKER_PROCESSING_TASK_STATUS_PROCESSING;
 		g_queue_push_head (pool->tasks[TRACKER_PROCESSING_TASK_STATUS_PROCESSING], task);
 
-		trace ("(Processing Pool) Flushing single task %p", task);
+		trace ("(Processing Pool %s) Flushing single task %p",
+		       G_OBJECT_TYPE_NAME (pool->owner),
+		       task);
 
 		/* And update the new one */
 		tracker_sparql_connection_update_async (pool->connection,
@@ -664,8 +733,11 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
 			pool->sparql_buffer_current_parent = g_object_ref (parent);
 		}
 
-		trace ("(Processing Pool) Pushed READY task %p for file '%s' into array %p",
-		       task, task->file_uri, pool->sparql_buffer);
+		trace ("(Processing Pool %s) Pushed READY task %p for file '%s' into array %p",
+		       G_OBJECT_TYPE_NAME (pool->owner),
+		       task,
+		       task->file_uri,
+		       pool->sparql_buffer);
 
 		/* Add task to array */
 		g_ptr_array_add (pool->sparql_buffer, task);
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index c8838f2..b7166e7 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -36,7 +36,7 @@ typedef void  (* TrackerProcessingPoolTaskFinishedCallback) (TrackerProcessingTa
                                                              const GError          *error);
 
 
-TrackerProcessingTask *tracker_processing_task_new               (GFile          *file);
+TrackerProcessingTask *tracker_processing_task_new               (GFile                 *file);
 void                   tracker_processing_task_free              (TrackerProcessingTask *task);
 GFile                 *tracker_processing_task_get_file          (TrackerProcessingTask *task);
 gpointer               tracker_processing_task_get_context       (TrackerProcessingTask *task);
@@ -49,7 +49,8 @@ void                   tracker_processing_task_set_sparql_string (TrackerProcess
                                                                   gchar                 *sparql_string);
 
 
-TrackerProcessingPool *tracker_processing_pool_new                   (TrackerSparqlConnection *connection,
+TrackerProcessingPool *tracker_processing_pool_new                   (GObject                 *owner,
+                                                                      TrackerSparqlConnection *connection,
                                                                       guint                    limit_wait,
                                                                       guint                    limit_process);
 void                   tracker_processing_pool_free                  (TrackerProcessingPool   *pool);
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 7640956..4c2d333 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -578,7 +578,8 @@ tracker_miner_fs_init (TrackerMinerFS *object)
 	                                                        (GDestroyNotify) NULL);
 
 	/* Create processing pool */
-	priv->processing_pool = tracker_processing_pool_new (tracker_miner_get_connection (TRACKER_MINER (object)),
+	priv->processing_pool = tracker_processing_pool_new (G_OBJECT (object),
+	                                                     tracker_miner_get_connection (TRACKER_MINER (object)),
 	                                                     DEFAULT_WAIT_POOL_LIMIT,
 	                                                     DEFAULT_READY_POOL_LIMIT);
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]