[tracker/miner-fs-refactor-multi-insert: 2/16] libtracker-miner: Integrate the new processing pool
- From: Aleksander Morgado <aleksm src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-fs-refactor-multi-insert: 2/16] libtracker-miner: Integrate the new processing pool
- Date: Thu, 21 Oct 2010 13:37:30 +0000 (UTC)
commit 7a6226bb34e3b499d2474b29c40adcd49a636783
Author: Aleksander Morgado <aleksander lanedo com>
Date: Thu Oct 14 16:17:23 2010 +0200
libtracker-miner: Integrate the new processing pool
src/libtracker-miner/tracker-miner-fs.c | 471 +++++++++++++++++--------------
1 files changed, 266 insertions(+), 205 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 47965b3..7aa0081 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -30,6 +30,7 @@
#include "tracker-monitor.h"
#include "tracker-utils.h"
#include "tracker-thumbnailer.h"
+#include "tracker-miner-fs-processing-pool.h"
/* If defined will print the tree from GNode while running */
#undef ENABLE_TREE_DEBUGGING
@@ -38,6 +39,10 @@
/* If defined will print contents of populated mtime cache while running */
#undef PRINT_MTIME_CACHE_CONTENTS
+/* Default processing pool limits to be set */
+#define DEFAULT_WAIT_POOL_LIMIT 1
+#define DEFAULT_PROCESS_POOL_LIMIT 1
+
/**
* SECTION:tracker-miner-fs
* @short_description: Abstract base class for filesystem miners
@@ -63,13 +68,12 @@ typedef struct {
} DirectoryData;
typedef struct {
- GFile *file;
gchar *urn;
gchar *parent_urn;
GCancellable *cancellable;
TrackerSparqlBuilder *builder;
TrackerMiner *miner;
-} ProcessData;
+} UpdateProcessingTaskContext;
typedef struct {
GMainLoop *main_loop;
@@ -126,8 +130,7 @@ struct _TrackerMinerFSPrivate {
gdouble throttle;
- GList *processing_pool;
- guint pool_limit;
+ ProcessingPool *processing_pool;
/* URI mtime cache */
GFile *current_mtime_cache_parent;
@@ -193,7 +196,8 @@ enum {
enum {
PROP_0,
PROP_THROTTLE,
- PROP_POOL_LIMIT,
+ PROP_WAIT_POOL_LIMIT,
+ PROP_PROCESS_POOL_LIMIT,
PROP_MTIME_CHECKING,
PROP_INITIAL_CRAWLING
};
@@ -318,11 +322,20 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
0, 1, 0,
G_PARAM_READWRITE));
g_object_class_install_property (object_class,
- PROP_POOL_LIMIT,
+ PROP_WAIT_POOL_LIMIT,
+ g_param_spec_uint ("wait-pool-limit",
+ "Processing pool limit for WAIT tasks",
+ "Maximum number of files that can be concurrently "
+ "processed by the upper layer",
+ 1, G_MAXUINT, DEFAULT_WAIT_POOL_LIMIT,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
+ g_object_class_install_property (object_class,
+ PROP_PROCESS_POOL_LIMIT,
g_param_spec_uint ("process-pool-limit",
- "Processing pool limit",
- "Number of files that can be concurrently processed",
- 1, G_MAXUINT, 1,
+ "Processing pool limit for PROCESS tasks",
+ "Maximum number of SPARQL updates that can be merged "
+ "in a single connection to the store",
+ 1, G_MAXUINT, DEFAULT_PROCESS_POOL_LIMIT,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
g_object_class_install_property (object_class,
PROP_MTIME_CHECKING,
@@ -563,6 +576,11 @@ tracker_miner_fs_init (TrackerMinerFS *object)
(GDestroyNotify) g_free,
(GDestroyNotify) NULL);
+ /* Create processing pool */
+ priv->processing_pool = processing_pool_new (tracker_miner_get_connection (TRACKER_MINER (object)),
+ DEFAULT_WAIT_POOL_LIMIT,
+ DEFAULT_PROCESS_POOL_LIMIT);
+
/* Set up the crawlers now we have config and hal */
priv->crawler = tracker_crawler_new ();
@@ -619,84 +637,6 @@ tracker_miner_fs_init (TrackerMinerFS *object)
priv->dirs_without_parent = NULL;
}
-static ProcessData *
-process_data_new (TrackerMiner *miner,
- GFile *file,
- const gchar *urn,
- const gchar *parent_urn,
- GCancellable *cancellable,
- TrackerSparqlBuilder *builder)
-{
- ProcessData *data;
-
- data = g_slice_new0 (ProcessData);
- data->miner = miner;
- data->file = g_object_ref (file);
- data->urn = g_strdup (urn);
- data->parent_urn = g_strdup (parent_urn);
-
- if (cancellable) {
- data->cancellable = g_object_ref (cancellable);
- }
-
- if (builder) {
- data->builder = g_object_ref (builder);
- }
-
- return data;
-}
-
-static void
-process_data_free (ProcessData *data)
-{
- g_object_unref (data->file);
- g_free (data->urn);
- g_free (data->parent_urn);
-
- if (data->cancellable) {
- g_object_unref (data->cancellable);
- }
-
- if (data->builder) {
- g_object_unref (data->builder);
- }
-
- g_slice_free (ProcessData, data);
-}
-
-static ProcessData *
-process_data_find (TrackerMinerFS *fs,
- GFile *file,
- gboolean path_search)
-{
- GList *l;
-
- for (l = fs->private->processing_pool; l; l = l->next) {
- ProcessData *data = l->data;
-
- if (!path_search) {
- /* Different operations for the same file URI could be
- * piled up here, each being a different GFile object.
- * Miner implementations should really notify on the
- * same GFile object that's being passed, so we check for
- * pointer equality here, rather than doing path comparisons
- */
- if(data->file == file)
- return data;
- } else {
- /* Note that if there are different GFiles being
- * processed for the same file path, we are actually
- * returning the first one found, If you want exactly
- * the same GFile as the one as input, use the
- * process_data_find() method instead */
- if (g_file_equal (data->file, file))
- return data;
- }
- }
-
- return NULL;
-}
-
static void
fs_finalize (GObject *object)
{
@@ -737,8 +677,7 @@ fs_finalize (GObject *object)
g_queue_foreach (priv->crawled_directories, (GFunc) crawled_directory_data_free, NULL);
g_queue_free (priv->crawled_directories);
- g_list_foreach (priv->processing_pool, (GFunc) process_data_free, NULL);
- g_list_free (priv->processing_pool);
+ processing_pool_free (priv->processing_pool);
g_queue_foreach (priv->items_moved, (GFunc) item_moved_data_free, NULL);
g_queue_free (priv->items_moved);
@@ -785,9 +724,13 @@ fs_set_property (GObject *object,
tracker_miner_fs_set_throttle (TRACKER_MINER_FS (object),
g_value_get_double (value));
break;
- case PROP_POOL_LIMIT:
- fs->private->pool_limit = g_value_get_uint (value);
- g_message ("Miner process pool limit is set to %d", fs->private->pool_limit);
+ case PROP_WAIT_POOL_LIMIT:
+ processing_pool_set_wait_limit (fs->private->processing_pool,
+ g_value_get_uint (value));
+ break;
+ case PROP_PROCESS_POOL_LIMIT:
+ processing_pool_set_process_limit (fs->private->processing_pool,
+ g_value_get_uint (value));
break;
case PROP_MTIME_CHECKING:
fs->private->mtime_checking = g_value_get_boolean (value);
@@ -815,8 +758,13 @@ fs_get_property (GObject *object,
case PROP_THROTTLE:
g_value_set_double (value, fs->private->throttle);
break;
- case PROP_POOL_LIMIT:
- g_value_set_uint (value, fs->private->pool_limit);
+ case PROP_WAIT_POOL_LIMIT:
+ g_value_set_uint (value,
+ processing_pool_get_wait_limit (fs->private->processing_pool));
+ break;
+ case PROP_PROCESS_POOL_LIMIT:
+ g_value_set_uint (value,
+ processing_pool_get_process_limit (fs->private->processing_pool));
break;
case PROP_MTIME_CHECKING:
g_value_set_boolean (value, fs->private->mtime_checking);
@@ -1048,42 +996,39 @@ item_moved_data_free (ItemMovedData *data)
}
static void
-sparql_update_cb (GObject *object,
- GAsyncResult *result,
- gpointer user_data)
+processing_pool_task_finished_cb (ProcessingTask *task,
+ gpointer user_data,
+ const GError *error)
{
TrackerMinerFS *fs;
TrackerMinerFSPrivate *priv;
- ProcessData *data;
- GError *error = NULL;
- tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
-
- data = user_data;
- fs = TRACKER_MINER_FS (data->miner);
+ fs = user_data;
priv = fs->private;
if (error) {
g_critical ("Could not execute sparql: %s", error->message);
priv->total_files_notified_error++;
- g_error_free (error);
} else {
if (fs->private->current_iri_cache_parent) {
GFile *parent;
+ GFile *task_file;
+
+ task_file = processing_task_get_file (task);
/* Note: parent may be NULL if the file represents
* the root directory of the file system (applies to
* .gvfs mounts also!) */
- parent = g_file_get_parent (data->file);
+ parent = g_file_get_parent (task_file);
if (parent) {
if (g_file_equal (parent, fs->private->current_iri_cache_parent) &&
- g_hash_table_lookup (fs->private->iri_cache, data->file) == NULL) {
+ g_hash_table_lookup (fs->private->iri_cache, task_file) == NULL) {
/* Item is processed, add an empty element for the processed GFile,
* in case it is again processed before the cache expires
*/
g_hash_table_insert (fs->private->iri_cache,
- g_object_ref (data->file),
+ g_object_ref (task_file),
NULL);
}
@@ -1092,9 +1037,6 @@ sparql_update_cb (GObject *object,
}
}
- priv->processing_pool = g_list_remove (priv->processing_pool, data);
- process_data_free (data);
-
item_queue_handlers_set_up (fs);
}
@@ -1457,30 +1399,80 @@ iri_cache_invalidate (TrackerMinerFS *fs,
g_hash_table_remove (fs->private->iri_cache, file);
}
+static UpdateProcessingTaskContext *
+update_process_task_context_new (TrackerMiner *miner,
+ const gchar *urn,
+ const gchar *parent_urn,
+ GCancellable *cancellable,
+ TrackerSparqlBuilder *builder)
+{
+ UpdateProcessingTaskContext *ctxt;
+
+ ctxt = g_slice_new0 (UpdateProcessingTaskContext);
+ ctxt->miner = miner;
+ ctxt->urn = g_strdup (urn);
+ ctxt->parent_urn = g_strdup (parent_urn);
+
+ if (cancellable) {
+ ctxt->cancellable = g_object_ref (cancellable);
+ }
+
+ if (builder) {
+ ctxt->builder = g_object_ref (builder);
+ }
+
+ return ctxt;
+}
+
+static void
+update_process_task_context_free (UpdateProcessingTaskContext *ctxt)
+{
+ g_free (ctxt->urn);
+ g_free (ctxt->parent_urn);
+
+ if (ctxt->cancellable) {
+ g_object_unref (ctxt->cancellable);
+ }
+
+ if (ctxt->builder) {
+ g_object_unref (ctxt->builder);
+ }
+
+ g_slice_free (UpdateProcessingTaskContext, ctxt);
+}
+
static gboolean
do_process_file (TrackerMinerFS *fs,
- ProcessData *data)
+ ProcessingTask *task)
{
TrackerMinerFSPrivate *priv;
gboolean processing;
gboolean attribute_update_only;
gchar *uri;
+ GFile *task_file;
+ UpdateProcessingTaskContext *ctxt;
- uri = g_file_get_uri (data->file);
+ ctxt = processing_task_get_context (task);
+ task_file = processing_task_get_file (task);
+ uri = g_file_get_uri (task_file);
priv = fs->private;
- attribute_update_only = GPOINTER_TO_INT (g_object_get_qdata (G_OBJECT (data->file),
+ attribute_update_only = GPOINTER_TO_INT (g_object_get_qdata (G_OBJECT (task_file),
priv->quark_attribute_updated));
if (!attribute_update_only) {
g_debug ("Processing file '%s'...", uri);
g_signal_emit (fs, signals[PROCESS_FILE], 0,
- data->file, data->builder, data->cancellable,
+ task_file,
+ ctxt->builder,
+ ctxt->cancellable,
&processing);
} else {
g_debug ("Processing attributes in file '%s'...", uri);
g_signal_emit (fs, signals[PROCESS_FILE_ATTRIBUTES], 0,
- data->file, data->builder, data->cancellable,
+ task_file,
+ ctxt->builder,
+ ctxt->cancellable,
&processing);
}
@@ -1488,18 +1480,18 @@ do_process_file (TrackerMinerFS *fs,
/* Re-fetch data, since it might have been
* removed in broken implementations
*/
- data = process_data_find (fs, data->file, FALSE);
+ task = processing_pool_find_task (priv->processing_pool, task_file, FALSE);
g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
- if (!data) {
+ if (!task) {
g_critical ("%s has returned FALSE in ::process-file for '%s', "
"but it seems that this file has been processed through "
"tracker_miner_fs_file_notify(), this is an "
"implementation error", G_OBJECT_TYPE_NAME (fs), uri);
} else {
- priv->processing_pool = g_list_remove (priv->processing_pool, data);
- process_data_free (data);
+ processing_pool_remove_task (priv->processing_pool, task);
+ processing_task_free (task);
}
}
@@ -1510,19 +1502,21 @@ do_process_file (TrackerMinerFS *fs,
static void
item_add_or_update_cb (TrackerMinerFS *fs,
- ProcessData *data,
+ ProcessingTask *task,
const GError *error)
{
+ UpdateProcessingTaskContext *ctxt;
+ GFile *task_file;
gchar *uri;
- uri = g_file_get_uri (data->file);
+ ctxt = processing_task_get_context (task);
+ task_file = processing_task_get_file (task);
+ uri = g_file_get_uri (task_file);
if (error) {
- ProcessData *first_item_data;
- GList *last;
+ ProcessingTask *first_item_task;
- last = g_list_last (fs->private->processing_pool);
- first_item_data = last->data;
+ first_item_task = processing_pool_get_last_wait (fs->private->processing_pool);
/* Perhaps this is too specific to TrackerMinerFiles, if the extractor
* is choking on some file, the miner will get a timeout for all files
@@ -1530,38 +1524,38 @@ item_add_or_update_cb (TrackerMinerFS *fs,
* is the first one that was added to the processing pool, so we retry
* the others.
*/
- if (data != first_item_data &&
+ if (task != first_item_task &&
(error->code == DBUS_GERROR_NO_REPLY ||
error->code == DBUS_GERROR_TIMEOUT ||
error->code == DBUS_GERROR_TIMED_OUT)) {
g_debug (" Got DBus timeout error on '%s', but it could not be caused by it. Retrying file.", uri);
/* Reset the TrackerSparqlBuilder */
- g_object_unref (data->builder);
- data->builder = tracker_sparql_builder_new_update ();
+ g_object_unref (ctxt->builder);
+ ctxt->builder = tracker_sparql_builder_new_update ();
- do_process_file (fs, data);
+ do_process_file (fs, task);
} else {
g_message ("Could not process '%s': %s", uri, error->message);
fs->private->total_files_notified_error++;
- fs->private->processing_pool =
- g_list_remove (fs->private->processing_pool, data);
- process_data_free (data);
+
+ processing_pool_remove_task (fs->private->processing_pool, task);
+ processing_task_free (task);
item_queue_handlers_set_up (fs);
}
} else {
gchar *full_sparql;
- if (data->urn) {
+ if (ctxt->urn) {
gboolean attribute_update_only;
- attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (data->file),
+ attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (task_file),
fs->private->quark_attribute_updated));
g_debug ("Updating item '%s' with urn '%s'%s",
uri,
- data->urn,
+ ctxt->urn,
attribute_update_only ? " (attributes only)" : "");
if (!attribute_update_only) {
@@ -1570,28 +1564,33 @@ item_add_or_update_cb (TrackerMinerFS *fs,
full_sparql = g_strdup_printf ("DELETE { GRAPH <%s> { <%s> ?p ?o } } "
"WHERE { GRAPH <%s> { <%s> ?p ?o FILTER (?p != rdf:type) } } %s",
TRACKER_MINER_FS_GRAPH_URN,
- data->urn,
+ ctxt->urn,
TRACKER_MINER_FS_GRAPH_URN,
- data->urn,
- tracker_sparql_builder_get_result (data->builder));
+ ctxt->urn,
+ tracker_sparql_builder_get_result (ctxt->builder));
} else {
/* Do not drop graph if only updating attributes, the SPARQL builder
* will already contain the necessary DELETE statements for the properties
* being updated */
- full_sparql = g_strdup (tracker_sparql_builder_get_result (data->builder));
+ full_sparql = g_strdup (tracker_sparql_builder_get_result (ctxt->builder));
}
} else {
g_debug ("Creating new item '%s'", uri);
/* new file */
- full_sparql = g_strdup (tracker_sparql_builder_get_result (data->builder));
+ full_sparql = g_strdup (tracker_sparql_builder_get_result (ctxt->builder));
}
- tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (fs)),
- full_sparql,
- G_PRIORITY_DEFAULT,
- NULL,
- sparql_update_cb, data);
+ processing_task_set_sparql (task, full_sparql);
+ /* If process_task() returns FALSE, it means the actual db update was delayed,
+ * and in this case we need to setup queue handlers again */
+ if (!processing_pool_process_task (fs->private->processing_pool,
+ task,
+ TRUE, /* buffer! */
+ processing_pool_task_finished_cb,
+ fs)) {
+ item_queue_handlers_set_up (fs);
+ }
g_free (full_sparql);
}
@@ -1606,7 +1605,7 @@ item_add_or_update (TrackerMinerFS *fs,
TrackerSparqlBuilder *sparql;
GCancellable *cancellable;
gboolean retval;
- ProcessData *data;
+ ProcessingTask *task;
GFile *parent;
const gchar *urn;
const gchar *parent_urn = NULL;
@@ -1655,17 +1654,22 @@ item_add_or_update (TrackerMinerFS *fs,
* updated the cache, or without a proper nfo:belongsToContainer */
urn = iri_cache_lookup (fs, file, TRUE);
- data = process_data_new (TRACKER_MINER (fs), file, urn, parent_urn, cancellable, sparql);
- priv->processing_pool = g_list_prepend (priv->processing_pool, data);
-
- if (do_process_file (fs, data)) {
- guint length;
-
- length = g_list_length (priv->processing_pool);
-
+ /* Create task and add it to the pool as a WAIT task (we need to extract
+ * the file metadata and such) */
+ task = processing_task_new (file);
+ processing_task_set_context (task,
+ update_process_task_context_new (TRACKER_MINER (fs),
+ urn,
+ parent_urn,
+ cancellable,
+ sparql),
+ (GFreeFunc) update_process_task_context_free);
+ processing_pool_wait_task (priv->processing_pool, task);
+
+ if (do_process_file (fs, task)) {
fs->private->total_files_processed++;
- if (length >= priv->pool_limit) {
+ if (processing_pool_wait_limit_reached (priv->processing_pool)) {
retval = FALSE;
}
}
@@ -1684,7 +1688,7 @@ item_remove (TrackerMinerFS *fs,
GString *sparql;
gchar *uri;
gchar *mime = NULL;
- ProcessData *data;
+ ProcessingTask *task;
iri_cache_invalidate (fs, file);
uri = g_file_get_uri (file);
@@ -1724,14 +1728,19 @@ item_remove (TrackerMinerFS *fs,
"}",
uri);
- data = process_data_new (TRACKER_MINER (fs), file, NULL, NULL, NULL, NULL);
- fs->private->processing_pool = g_list_prepend (fs->private->processing_pool, data);
+ /* Add new task to processing pool */
+ task = processing_task_new (file);
+ processing_task_set_sparql (task, sparql->str);
+ /* If process_task() returns FALSE, it means the actual db update was delayed,
+ * and in this case we need to setup queue handlers again */
+ if (!processing_pool_process_task (fs->private->processing_pool,
+ task,
+ FALSE,
+ processing_pool_task_finished_cb,
+ fs)) {
+ item_queue_handlers_set_up (fs);
+ }
- tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (fs)),
- sparql->str,
- G_PRIORITY_DEFAULT,
- NULL,
- sparql_update_cb, data);
g_string_free (sparql, TRUE);
g_free (uri);
@@ -1923,7 +1932,7 @@ item_move (TrackerMinerFS *fs,
GFileInfo *file_info;
GString *sparql;
RecursiveMoveData move_data;
- ProcessData *data;
+ ProcessingTask *task;
gchar *source_iri;
gchar *display_name;
gboolean source_exists;
@@ -2040,14 +2049,18 @@ item_move (TrackerMinerFS *fs,
g_main_loop_unref (move_data.main_loop);
- data = process_data_new (TRACKER_MINER (fs), file, NULL, NULL, NULL, NULL);
- fs->private->processing_pool = g_list_prepend (fs->private->processing_pool, data);
-
- tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (fs)),
- sparql->str,
- G_PRIORITY_DEFAULT,
- NULL,
- sparql_update_cb, data);
+ /* Add new task to processing pool */
+ task = processing_task_new (file);
+ processing_task_set_sparql (task, sparql->str);
+ /* If process_task() returns FALSE, it means the actual db update was delayed,
+ * and in this case we need to setup queue handlers again */
+ if (!processing_pool_process_task (fs->private->processing_pool,
+ task,
+ FALSE,
+ processing_pool_task_finished_cb,
+ fs)) {
+ item_queue_handlers_set_up (fs);
+ }
g_free (uri);
g_free (source_uri);
@@ -2153,7 +2166,7 @@ should_wait (TrackerMinerFS *fs,
GFile *parent;
/* Is the item already being processed? */
- if (process_data_find (fs, file, TRUE)) {
+ if (processing_pool_find_task (fs->private->processing_pool, file, TRUE)) {
/* Yes, a previous event on same item currently
* being processed */
return TRUE;
@@ -2162,7 +2175,7 @@ should_wait (TrackerMinerFS *fs,
/* Is the item's parent being processed right now? */
parent = g_file_get_parent (file);
if (parent) {
- if (process_data_find (fs, parent, TRUE)) {
+ if (processing_pool_find_task (fs->private->processing_pool, parent, TRUE)) {
/* Yes, a previous event on the parent of this item
* currently being processed */
g_object_unref (parent);
@@ -2214,7 +2227,7 @@ item_queue_get_next_file (TrackerMinerFS *fs,
* info is inserted to the store before the children are
* inspected.
*/
- if (fs->private->processing_pool) {
+ if (processing_pool_get_wait_task_count (fs->private->processing_pool) > 0) {
/* Items still being processed */
*file = NULL;
*source_file = NULL;
@@ -2445,10 +2458,14 @@ item_queue_handlers_cb (gpointer user_data)
case QUEUE_NONE:
/* Print stats and signal finished */
if (!fs->private->is_crawling &&
- !fs->private->processing_pool) {
+ processing_pool_get_wait_task_count (fs->private->processing_pool) == 0 &&
+ processing_pool_get_process_task_count (fs->private->processing_pool) == 0) {
process_stop (fs);
}
+ /* Flush any possible pending update here */
+ processing_pool_buffer_flush (fs->private->processing_pool);
+
tracker_thumbnailer_send ();
/* No more files left to process */
keep_processing = FALSE;
@@ -2515,7 +2532,7 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
return;
}
- if (g_list_length (fs->private->processing_pool) >= fs->private->pool_limit) {
+ if (processing_pool_wait_limit_reached (fs->private->processing_pool)) {
/* There is no room in the pool for more files */
return;
}
@@ -3609,6 +3626,26 @@ check_files_removal (GQueue *queue,
}
}
+static void
+processing_pool_cancel_foreach (gpointer data,
+ gpointer user_data)
+{
+ ProcessingTask *task = data;
+ GFile *file = user_data;
+ GFile *task_file;
+ UpdateProcessingTaskContext *ctxt;
+
+ task_file = processing_task_get_file (task);
+ ctxt = processing_task_get_context (task);
+
+ if (ctxt &&
+ ctxt->cancellable &&
+ (g_file_equal (task_file, file) ||
+ g_file_has_prefix (task_file, file))) {
+ g_cancellable_cancel (ctxt->cancellable);
+ }
+}
+
/**
* tracker_miner_fs_directory_remove:
* @fs: a #TrackerMinerFS
@@ -3625,7 +3662,7 @@ tracker_miner_fs_directory_remove (TrackerMinerFS *fs,
{
TrackerMinerFSPrivate *priv;
gboolean return_val = FALSE;
- GList *dirs, *pool;
+ GList *dirs;
g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), FALSE);
g_return_val_if_fail (G_IS_FILE (file), FALSE);
@@ -3683,18 +3720,10 @@ tracker_miner_fs_directory_remove (TrackerMinerFS *fs,
check_files_removal (priv->items_updated, file);
check_files_removal (priv->items_created, file);
- pool = fs->private->processing_pool;
-
- while (pool) {
- ProcessData *data = pool->data;
-
- if (g_file_equal (data->file, file) ||
- g_file_has_prefix (data->file, file)) {
- g_cancellable_cancel (data->cancellable);
- }
-
- pool = pool->next;
- }
+ /* Cancel all pending tasks on files inside the path given by file */
+ processing_pool_foreach (fs->private->processing_pool,
+ processing_pool_cancel_foreach,
+ file);
/* Remove all monitors */
tracker_monitor_remove_recursively (fs->private->monitor, file);
@@ -3878,16 +3907,16 @@ tracker_miner_fs_file_notify (TrackerMinerFS *fs,
GFile *file,
const GError *error)
{
- ProcessData *data;
+ ProcessingTask *task;
g_return_if_fail (TRACKER_IS_MINER_FS (fs));
g_return_if_fail (G_IS_FILE (file));
fs->private->total_files_notified++;
- data = process_data_find (fs, file, FALSE);
+ task = processing_pool_find_task (fs->private->processing_pool, file, FALSE);
- if (!data) {
+ if (!task) {
gchar *uri;
uri = g_file_get_uri (file);
@@ -3903,7 +3932,7 @@ tracker_miner_fs_file_notify (TrackerMinerFS *fs,
return;
}
- item_add_or_update_cb (fs, data, error);
+ item_add_or_update_cb (fs, task, error);
}
/**
@@ -3982,15 +4011,15 @@ G_CONST_RETURN gchar *
tracker_miner_fs_get_urn (TrackerMinerFS *fs,
GFile *file)
{
- ProcessData *data;
+ ProcessingTask *task;
g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
g_return_val_if_fail (G_IS_FILE (file), NULL);
/* Check if found in currently processed data */
- data = process_data_find (fs, file, FALSE);
+ task = processing_pool_find_task (fs->private->processing_pool, file, FALSE);
- if (!data) {
+ if (!task) {
gchar *uri;
uri = g_file_get_uri (file);
@@ -4000,9 +4029,24 @@ tracker_miner_fs_get_urn (TrackerMinerFS *fs,
g_free (uri);
return NULL;
- }
+ } else {
+ UpdateProcessingTaskContext *ctxt;
+
+ /* We are only storing the URN in the created/updated tasks */
+ ctxt = processing_task_get_context (task);
+ if (!ctxt) {
+ gchar *uri;
+
+ uri = g_file_get_uri (file);
+ g_critical ("File '%s' is being processed, but not as a "
+ "CREATED/UPDATED task, so cannot get URN",
+ uri);
+ g_free (uri);
+ return NULL;
+ }
- return data->urn;
+ return ctxt->urn;
+ }
}
/**
@@ -4055,26 +4099,43 @@ G_CONST_RETURN gchar *
tracker_miner_fs_get_parent_urn (TrackerMinerFS *fs,
GFile *file)
{
- ProcessData *data;
+ ProcessingTask *task;
g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
g_return_val_if_fail (G_IS_FILE (file), NULL);
- data = process_data_find (fs, file, FALSE);
+ /* Check if found in currently processed data */
+ task = processing_pool_find_task (fs->private->processing_pool, file, FALSE);
- if (!data) {
+ if (!task) {
gchar *uri;
uri = g_file_get_uri (file);
g_critical ("File '%s' is not being currently processed, "
- "so the URN cannot be retrieved.", uri);
+ "so the parent URN cannot be retrieved.", uri);
g_free (uri);
return NULL;
- }
+ } else {
+ UpdateProcessingTaskContext *ctxt;
+
+ /* We are only storing the URN in the created/updated tasks */
+ ctxt = processing_task_get_context (task);
+ if (!ctxt) {
+ gchar *uri;
+
+ uri = g_file_get_uri (file);
+ g_critical ("File '%s' is being processed, but not as a "
+ "CREATED/UPDATED task, so cannot get parent "
+ "URN",
+ uri);
+ g_free (uri);
+ return NULL;
+ }
- return data->parent_urn;
+ return ctxt->parent_urn;
+ }
}
void
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]