[tracker/threaded-extractor: 3/3] tracker-extract: Rework stats reporting and task cancellation



commit a5c922ff8a02a9a2e1676abcafbc5e7290a1a004
Author: Carlos Garnacho <carlos lanedo com>
Date:   Tue Jun 21 14:44:31 2011 +0200

    tracker-extract: Rework stats reporting and task cancellation
    
    Both things share the same threading model, so these are now dealt
    with together.

 src/tracker-extract/tracker-controller.c |   17 +---
 src/tracker-extract/tracker-extract.c    |  171 +++++++++++++++++++-----------
 2 files changed, 113 insertions(+), 75 deletions(-)
---
diff --git a/src/tracker-extract/tracker-controller.c b/src/tracker-extract/tracker-controller.c
index 492fae7..5ec1a7a 100644
--- a/src/tracker-extract/tracker-controller.c
+++ b/src/tracker-extract/tracker-controller.c
@@ -319,20 +319,9 @@ cancel_tasks_in_file (TrackerController *controller,
 
 		if (g_file_equal (task_file, file) ||
 		    g_file_has_prefix (task_file, file)) {
-			/* Mount path contains one of the files being processed */
-			if (!elem->next) {
-				/* The last element in the list is
-				 * the one currently being processed,
-				 * so exit abruptly.
-				 */
-				g_message ("Cancelled task ('%s') is currently being processed, quitting",
-				           data->uri);
-				_exit (0);
-			} else {
-				g_message ("Cancelling not yet processed task ('%s')",
-				           data->uri);
-				g_cancellable_cancel (data->cancellable);
-			}
+			/* Mount path contains some file being processed */
+			g_message ("Cancelling task ('%s')", data->uri);
+			g_cancellable_cancel (data->cancellable);
 		}
 
 		g_object_unref (task_file);
diff --git a/src/tracker-extract/tracker-extract.c b/src/tracker-extract/tracker-extract.c
index 752157a..7f9cc08 100644
--- a/src/tracker-extract/tracker-extract.c
+++ b/src/tracker-extract/tracker-extract.c
@@ -81,13 +81,13 @@ typedef struct {
 } StatisticsData;
 
 typedef struct {
-	TrackerExtract *extract;
-	GModule *module;
-	guint success : 1;
-} StatsReportData;
-
-typedef struct {
 	GHashTable *statistics_data;
+	GList *running_tasks;
+
+	/* used to maintain the running tasks
+	 * and stats from different threads
+	 */
+	GMutex *task_mutex;
 
 	/* module -> thread awareness enum for initialized modules */
 	GHashTable *modules;
@@ -115,6 +115,9 @@ typedef struct {
 	gchar *mimetype;
 	TrackerExtractMetadataFunc func;
 	GModule *module;
+
+	guint signal_id;
+	guint success : 1;
 } TrackerExtractTask;
 
 static void tracker_extract_finalize (GObject *object);
@@ -158,6 +161,8 @@ tracker_extract_init (TrackerExtract *object)
 	priv->single_thread_extractors = g_hash_table_new (NULL, NULL);
 	priv->thread_pool = g_thread_pool_new ((GFunc) get_metadata,
 	                                       NULL, 10, TRUE, NULL);
+
+	priv->task_mutex = g_mutex_new ();
 }
 
 static void
@@ -195,6 +200,8 @@ report_statistics (GObject *object)
 
 	priv = TRACKER_EXTRACT_GET_PRIVATE (object);
 
+	g_mutex_lock (priv->task_mutex);
+
 	g_message ("--------------------------------------------------");
 	g_message ("Statistics:");
 
@@ -225,6 +232,8 @@ report_statistics (GObject *object)
 	}
 
 	g_message ("--------------------------------------------------");
+
+	g_mutex_unlock (priv->task_mutex);
 }
 
 TrackerExtract *
@@ -250,42 +259,41 @@ tracker_extract_new (gboolean     disable_shutdown,
 	return object;
 }
 
-static gboolean
-report_stats_cb (StatsReportData *report_data)
+static void
+notify_task_finish (TrackerExtractTask *task,
+                    gboolean            success)
 {
+	TrackerExtract *extract;
 	TrackerExtractPrivate *priv;
 	StatisticsData *stats_data;
 
-	priv = TRACKER_EXTRACT_GET_PRIVATE (report_data->extract);
-	stats_data = g_hash_table_lookup (priv->statistics_data, report_data->module);
+	extract = task->extract;
+	priv = TRACKER_EXTRACT_GET_PRIVATE (extract);
+
+	/* Reports and ongoing tasks may be
+	 * accessed from other threads.
+	 */
+	g_mutex_lock (priv->task_mutex);
+
+	stats_data = g_hash_table_lookup (priv->statistics_data,
+					  task->module);
 
 	if (!stats_data) {
 		stats_data = g_slice_new0 (StatisticsData);
-		g_hash_table_insert (priv->statistics_data, report_data->module, stats_data);
+		g_hash_table_insert (priv->statistics_data,
+				     task->module,
+				     stats_data);
 	}
 
 	stats_data->extracted_count++;
 
-	if (!report_data->success) {
+	if (!success) {
 		stats_data->failed_count++;
 	}
 
-	return FALSE;
-}
-
-static void
-report_stats (TrackerExtractTask *task,
-              gboolean            success)
-{
-	StatsReportData *data;
+	priv->running_tasks = g_list_remove (priv->running_tasks, task);
 
-	data = g_slice_new0 (StatsReportData);
-	data->extract = task->extract;
-	data->module = task->module;
-	data->success = success;
-
-	/* Send to main thread, where stats hashtable is maintained */
-	g_idle_add ((GSourceFunc) report_stats_cb, data);
+	g_mutex_unlock (priv->task_mutex);
 }
 
 static gboolean
@@ -300,7 +308,7 @@ get_file_metadata (TrackerExtractTask     *task,
 #ifdef HAVE_LIBSTREAMANALYZER
 	gchar *content_type = NULL;
 #endif
-	gint items;
+	gint items = 0;
 
 	g_debug ("Extracting...");
 
@@ -353,8 +361,6 @@ get_file_metadata (TrackerExtractTask     *task,
 	 */
 	if (mime_used) {
 		if (task->func) {
-			gint items;
-
 			g_debug ("  Using %s...", g_module_name (task->module));
 
 			(task->func) (task->file, mime_used, preupdate, statements, where);
@@ -364,32 +370,22 @@ get_file_metadata (TrackerExtractTask     *task,
 			if (items > 0) {
 				tracker_sparql_builder_insert_close (statements);
 
-				*preupdate_out = preupdate;
-				*statements_out = statements;
-				*where_out = g_string_free (where, FALSE);
-
 				g_debug ("Done (%d items)", items);
 
-				report_stats (task, TRUE);
-			} else {
-				report_stats (task, FALSE);
+				task->success = TRUE;
 			}
 		}
 
 		g_free (mime_used);
 	}
 
-	items = tracker_sparql_builder_get_length (statements);
-
-	if (items > 0) {
-		tracker_sparql_builder_insert_close (statements);
-	}
-
 	*preupdate_out = preupdate;
 	*statements_out = statements;
 	*where_out = g_string_free (where, FALSE);
 
-	g_debug ("No extractor or failed (%d items)", items);
+	if (items == 0) {
+		g_debug ("No extractor or failed");
+	}
 
 	return TRUE;
 }
@@ -409,6 +405,29 @@ tracker_extract_info_free (TrackerExtractInfo *info)
 	g_slice_free (TrackerExtractInfo, info);
 }
 
+/* This function is called on the thread calling g_cancellable_cancel() */
+static void
+task_cancellable_cancelled_cb (GCancellable       *cancellable,
+                               TrackerExtractTask *task)
+{
+	TrackerExtractPrivate *priv;
+	TrackerExtract *extract;
+
+	extract = task->extract;
+	priv = TRACKER_EXTRACT_GET_PRIVATE (extract);
+
+	g_mutex_lock (priv->task_mutex);
+
+	if (g_list_find (priv->running_tasks, task)) {
+		g_message ("Cancelled task for '%s' was currently being "
+		           "processed, _exit()ing immediately",
+		           task->file);
+		_exit (0);
+	}
+
+	g_mutex_unlock (priv->task_mutex);
+}
+
 static TrackerExtractTask *
 extract_task_new (TrackerExtract *extract,
                   const gchar    *uri,
@@ -418,16 +437,7 @@ extract_task_new (TrackerExtract *extract,
 {
 	TrackerExtractTask *task;
 
-	task = g_slice_new0 (TrackerExtractTask);
-	task->cancellable = cancellable;
-	task->res = (res) ? g_object_ref (res) : NULL;
-	task->file = g_strdup (uri);
-	task->mimetype = g_strdup (mimetype);
-	task->extract = extract;
-
-	if (mimetype) {
-		task->mimetype = g_strdup (mimetype);
-	} else {
+	if (!mimetype) {
 		GFile *file;
 		GFileInfo *info;
 
@@ -438,25 +448,50 @@ extract_task_new (TrackerExtract *extract,
 		                          NULL, NULL);
 
 		if (info) {
-			task->mimetype = g_strdup (g_file_info_get_content_type (info));
+			mimetype = g_strdup (g_file_info_get_content_type (info));
+			g_object_unref (info);
 		} else {
 			g_warning ("Could not get mimetype for '%s'", uri);
+			return NULL;
 		}
 
-		g_object_unref (info);
 		g_object_unref (file);
 	}
 
+	task = g_slice_new0 (TrackerExtractTask);
+	task->cancellable = (cancellable) ? g_object_ref (cancellable) : NULL;
+	task->res = (res) ? g_object_ref (res) : NULL;
+	task->file = g_strdup (uri);
+	task->mimetype = g_strdup (mimetype);
+	task->extract = extract;
+
+	if (task->cancellable) {
+		task->signal_id = g_cancellable_connect (cancellable,
+		                                         G_CALLBACK (task_cancellable_cancelled_cb),
+		                                         task, NULL);
+	}
+
 	return task;
 }
 
 static void
 extract_task_free (TrackerExtractTask *task)
 {
+	if (task->cancellable &&
+	    task->signal_id != 0) {
+		g_cancellable_disconnect (task->cancellable, task->signal_id);
+	}
+
+	notify_task_finish (task, task->success);
+
 	if (task->res) {
 		g_object_unref (task->res);
 	}
 
+	if (task->cancellable) {
+		g_object_unref (task->cancellable);
+	}
+
 	g_free (task->file);
 	g_free (task->mimetype);
 	g_slice_free (TrackerExtractTask, task);
@@ -479,6 +514,7 @@ get_metadata (TrackerExtractTask *task)
 		                                 TRACKER_DBUS_ERROR, 0,
 		                                 "Extraction of '%s' was cancelled",
 		                                 task->file);
+
 		extract_task_free (task);
 		return FALSE;
 	}
@@ -571,6 +607,10 @@ dispatch_task_cb (TrackerExtractTask *task)
 		g_hash_table_insert (priv->modules, module, GUINT_TO_POINTER (thread_awareness));
 	}
 
+	g_mutex_lock (priv->task_mutex);
+	priv->running_tasks = g_list_prepend (priv->running_tasks, task);
+	g_mutex_unlock (priv->task_mutex);
+
 	switch (thread_awareness) {
 	case TRACKER_MODULE_NONE:
 		/* Error out */
@@ -662,10 +702,13 @@ tracker_extract_file (TrackerExtract      *extract,
 	res = g_simple_async_result_new (G_OBJECT (extract), cb, user_data, NULL);
 
 	task = extract_task_new (extract, file, mimetype, cancellable, G_ASYNC_RESULT (res));
-	g_idle_add ((GSourceFunc) dispatch_task_cb, task);
 
-	/* task takes a ref */
-	g_object_unref (res);
+	if (task) {
+		g_idle_add ((GSourceFunc) dispatch_task_cb, task);
+
+		/* task takes a ref */
+		g_object_unref (res);
+	}
 }
 
 void
@@ -688,11 +731,17 @@ tracker_extract_get_metadata_by_cmdline (TrackerExtract *object,
 
 	task = extract_task_new (object, uri, mime, NULL, NULL);
 
-	tracker_extract_module_manager_get_for_mimetype (task->mimetype, &init_func, NULL, &task->func);
+	if (!task) {
+		return;
+	}
+
+	task->module = tracker_extract_module_manager_get_for_mimetype (task->mimetype, &init_func, NULL, &task->func);
 
 	if (init_func) {
+		TrackerModuleThreadAwareness ignore;
+
 		/* Initialize module for this single run */
-		(init_func) (NULL, NULL);
+		(init_func) (&ignore, NULL);
 	}
 
 	if (get_file_metadata (task, &preupdate, &statements, &where)) {



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]