[tracker] tracker-extract: Rework stats reporting and task cancellation
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker] tracker-extract: Rework stats reporting and task cancellation
- Date: Wed, 13 Jul 2011 14:15:23 +0000 (UTC)
commit da1a44743158998dadbcf310d3c491744bdb19c8
Author: Carlos Garnacho <carlos lanedo com>
Date: Tue Jun 21 14:44:31 2011 +0200
tracker-extract: Rework stats reporting and task cancellation
Both things share the same threading model, so these are now dealt
with together.
src/tracker-extract/tracker-controller.c | 17 +---
src/tracker-extract/tracker-extract.c | 171 +++++++++++++++++++-----------
2 files changed, 113 insertions(+), 75 deletions(-)
---
diff --git a/src/tracker-extract/tracker-controller.c b/src/tracker-extract/tracker-controller.c
index 492fae7..5ec1a7a 100644
--- a/src/tracker-extract/tracker-controller.c
+++ b/src/tracker-extract/tracker-controller.c
@@ -319,20 +319,9 @@ cancel_tasks_in_file (TrackerController *controller,
if (g_file_equal (task_file, file) ||
g_file_has_prefix (task_file, file)) {
- /* Mount path contains one of the files being processed */
- if (!elem->next) {
- /* The last element in the list is
- * the one currently being processed,
- * so exit abruptly.
- */
- g_message ("Cancelled task ('%s') is currently being processed, quitting",
- data->uri);
- _exit (0);
- } else {
- g_message ("Cancelling not yet processed task ('%s')",
- data->uri);
- g_cancellable_cancel (data->cancellable);
- }
+ /* Mount path contains some file being processed */
+ g_message ("Cancelling task ('%s')", data->uri);
+ g_cancellable_cancel (data->cancellable);
}
g_object_unref (task_file);
diff --git a/src/tracker-extract/tracker-extract.c b/src/tracker-extract/tracker-extract.c
index 752157a..7f9cc08 100644
--- a/src/tracker-extract/tracker-extract.c
+++ b/src/tracker-extract/tracker-extract.c
@@ -81,13 +81,13 @@ typedef struct {
} StatisticsData;
typedef struct {
- TrackerExtract *extract;
- GModule *module;
- guint success : 1;
-} StatsReportData;
-
-typedef struct {
GHashTable *statistics_data;
+ GList *running_tasks;
+
+ /* used to maintain the running tasks
+ * and stats from different threads
+ */
+ GMutex *task_mutex;
/* module -> thread awareness enum for initialized modules */
GHashTable *modules;
@@ -115,6 +115,9 @@ typedef struct {
gchar *mimetype;
TrackerExtractMetadataFunc func;
GModule *module;
+
+ guint signal_id;
+ guint success : 1;
} TrackerExtractTask;
static void tracker_extract_finalize (GObject *object);
@@ -158,6 +161,8 @@ tracker_extract_init (TrackerExtract *object)
priv->single_thread_extractors = g_hash_table_new (NULL, NULL);
priv->thread_pool = g_thread_pool_new ((GFunc) get_metadata,
NULL, 10, TRUE, NULL);
+
+ priv->task_mutex = g_mutex_new ();
}
static void
@@ -195,6 +200,8 @@ report_statistics (GObject *object)
priv = TRACKER_EXTRACT_GET_PRIVATE (object);
+ g_mutex_lock (priv->task_mutex);
+
g_message ("--------------------------------------------------");
g_message ("Statistics:");
@@ -225,6 +232,8 @@ report_statistics (GObject *object)
}
g_message ("--------------------------------------------------");
+
+ g_mutex_unlock (priv->task_mutex);
}
TrackerExtract *
@@ -250,42 +259,41 @@ tracker_extract_new (gboolean disable_shutdown,
return object;
}
-static gboolean
-report_stats_cb (StatsReportData *report_data)
+static void
+notify_task_finish (TrackerExtractTask *task,
+ gboolean success)
{
+ TrackerExtract *extract;
TrackerExtractPrivate *priv;
StatisticsData *stats_data;
- priv = TRACKER_EXTRACT_GET_PRIVATE (report_data->extract);
- stats_data = g_hash_table_lookup (priv->statistics_data, report_data->module);
+ extract = task->extract;
+ priv = TRACKER_EXTRACT_GET_PRIVATE (extract);
+
+ /* Reports and ongoing tasks may be
+ * accessed from other threads.
+ */
+ g_mutex_lock (priv->task_mutex);
+
+ stats_data = g_hash_table_lookup (priv->statistics_data,
+ task->module);
if (!stats_data) {
stats_data = g_slice_new0 (StatisticsData);
- g_hash_table_insert (priv->statistics_data, report_data->module, stats_data);
+ g_hash_table_insert (priv->statistics_data,
+ task->module,
+ stats_data);
}
stats_data->extracted_count++;
- if (!report_data->success) {
+ if (!success) {
stats_data->failed_count++;
}
- return FALSE;
-}
-
-static void
-report_stats (TrackerExtractTask *task,
- gboolean success)
-{
- StatsReportData *data;
+ priv->running_tasks = g_list_remove (priv->running_tasks, task);
- data = g_slice_new0 (StatsReportData);
- data->extract = task->extract;
- data->module = task->module;
- data->success = success;
-
- /* Send to main thread, where stats hashtable is maintained */
- g_idle_add ((GSourceFunc) report_stats_cb, data);
+ g_mutex_unlock (priv->task_mutex);
}
static gboolean
@@ -300,7 +308,7 @@ get_file_metadata (TrackerExtractTask *task,
#ifdef HAVE_LIBSTREAMANALYZER
gchar *content_type = NULL;
#endif
- gint items;
+ gint items = 0;
g_debug ("Extracting...");
@@ -353,8 +361,6 @@ get_file_metadata (TrackerExtractTask *task,
*/
if (mime_used) {
if (task->func) {
- gint items;
-
g_debug (" Using %s...", g_module_name (task->module));
(task->func) (task->file, mime_used, preupdate, statements, where);
@@ -364,32 +370,22 @@ get_file_metadata (TrackerExtractTask *task,
if (items > 0) {
tracker_sparql_builder_insert_close (statements);
- *preupdate_out = preupdate;
- *statements_out = statements;
- *where_out = g_string_free (where, FALSE);
-
g_debug ("Done (%d items)", items);
- report_stats (task, TRUE);
- } else {
- report_stats (task, FALSE);
+ task->success = TRUE;
}
}
g_free (mime_used);
}
- items = tracker_sparql_builder_get_length (statements);
-
- if (items > 0) {
- tracker_sparql_builder_insert_close (statements);
- }
-
*preupdate_out = preupdate;
*statements_out = statements;
*where_out = g_string_free (where, FALSE);
- g_debug ("No extractor or failed (%d items)", items);
+ if (items == 0) {
+ g_debug ("No extractor or failed");
+ }
return TRUE;
}
@@ -409,6 +405,29 @@ tracker_extract_info_free (TrackerExtractInfo *info)
g_slice_free (TrackerExtractInfo, info);
}
+/* This function is called on the thread calling g_cancellable_cancel() */
+static void
+task_cancellable_cancelled_cb (GCancellable *cancellable,
+ TrackerExtractTask *task)
+{
+ TrackerExtractPrivate *priv;
+ TrackerExtract *extract;
+
+ extract = task->extract;
+ priv = TRACKER_EXTRACT_GET_PRIVATE (extract);
+
+ g_mutex_lock (priv->task_mutex);
+
+ if (g_list_find (priv->running_tasks, task)) {
+ g_message ("Cancelled task for '%s' was currently being "
+ "processed, _exit()ing immediately",
+ task->file);
+ _exit (0);
+ }
+
+ g_mutex_unlock (priv->task_mutex);
+}
+
static TrackerExtractTask *
extract_task_new (TrackerExtract *extract,
const gchar *uri,
@@ -418,16 +437,7 @@ extract_task_new (TrackerExtract *extract,
{
TrackerExtractTask *task;
- task = g_slice_new0 (TrackerExtractTask);
- task->cancellable = cancellable;
- task->res = (res) ? g_object_ref (res) : NULL;
- task->file = g_strdup (uri);
- task->mimetype = g_strdup (mimetype);
- task->extract = extract;
-
- if (mimetype) {
- task->mimetype = g_strdup (mimetype);
- } else {
+ if (!mimetype) {
GFile *file;
GFileInfo *info;
@@ -438,25 +448,50 @@ extract_task_new (TrackerExtract *extract,
NULL, NULL);
if (info) {
- task->mimetype = g_strdup (g_file_info_get_content_type (info));
+ mimetype = g_strdup (g_file_info_get_content_type (info));
+ g_object_unref (info);
} else {
g_warning ("Could not get mimetype for '%s'", uri);
+ return NULL;
}
- g_object_unref (info);
g_object_unref (file);
}
+ task = g_slice_new0 (TrackerExtractTask);
+ task->cancellable = (cancellable) ? g_object_ref (cancellable) : NULL;
+ task->res = (res) ? g_object_ref (res) : NULL;
+ task->file = g_strdup (uri);
+ task->mimetype = g_strdup (mimetype);
+ task->extract = extract;
+
+ if (task->cancellable) {
+ task->signal_id = g_cancellable_connect (cancellable,
+ G_CALLBACK (task_cancellable_cancelled_cb),
+ task, NULL);
+ }
+
return task;
}
static void
extract_task_free (TrackerExtractTask *task)
{
+ if (task->cancellable &&
+ task->signal_id != 0) {
+ g_cancellable_disconnect (task->cancellable, task->signal_id);
+ }
+
+ notify_task_finish (task, task->success);
+
if (task->res) {
g_object_unref (task->res);
}
+ if (task->cancellable) {
+ g_object_unref (task->cancellable);
+ }
+
g_free (task->file);
g_free (task->mimetype);
g_slice_free (TrackerExtractTask, task);
@@ -479,6 +514,7 @@ get_metadata (TrackerExtractTask *task)
TRACKER_DBUS_ERROR, 0,
"Extraction of '%s' was cancelled",
task->file);
+
extract_task_free (task);
return FALSE;
}
@@ -571,6 +607,10 @@ dispatch_task_cb (TrackerExtractTask *task)
g_hash_table_insert (priv->modules, module, GUINT_TO_POINTER (thread_awareness));
}
+ g_mutex_lock (priv->task_mutex);
+ priv->running_tasks = g_list_prepend (priv->running_tasks, task);
+ g_mutex_unlock (priv->task_mutex);
+
switch (thread_awareness) {
case TRACKER_MODULE_NONE:
/* Error out */
@@ -662,10 +702,13 @@ tracker_extract_file (TrackerExtract *extract,
res = g_simple_async_result_new (G_OBJECT (extract), cb, user_data, NULL);
task = extract_task_new (extract, file, mimetype, cancellable, G_ASYNC_RESULT (res));
- g_idle_add ((GSourceFunc) dispatch_task_cb, task);
- /* task takes a ref */
- g_object_unref (res);
+ if (task) {
+ g_idle_add ((GSourceFunc) dispatch_task_cb, task);
+
+ /* task takes a ref */
+ g_object_unref (res);
+ }
}
void
@@ -688,11 +731,17 @@ tracker_extract_get_metadata_by_cmdline (TrackerExtract *object,
task = extract_task_new (object, uri, mime, NULL, NULL);
- tracker_extract_module_manager_get_for_mimetype (task->mimetype, &init_func, NULL, &task->func);
+ if (!task) {
+ return;
+ }
+
+ task->module = tracker_extract_module_manager_get_for_mimetype (task->mimetype, &init_func, NULL, &task->func);
if (init_func) {
+ TrackerModuleThreadAwareness ignore;
+
/* Initialize module for this single run */
- (init_func) (NULL, NULL);
+ (init_func) (&ignore, NULL);
}
if (get_file_metadata (task, &preupdate, &statements, &where)) {
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]