[tracker/miner-priority-queues-0.10: 12/22] tracker-miner-fs: Use TrackerTaskPool for the extraction pool
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-priority-queues-0.10: 12/22] tracker-miner-fs: Use TrackerTaskPool for the extraction pool
- Date: Thu, 21 Jul 2011 12:38:30 +0000 (UTC)
commit 705ebfd1465761f5fda0a37b072d162ba58e33c8
Author: Carlos Garnacho <carlos lanedo com>
Date: Mon Jul 4 12:16:31 2011 +0200
tracker-miner-fs: Use TrackerTaskPool for the extraction pool
src/libtracker-miner/tracker-miner-fs.c | 148 +++++++++++++++++-------------
1 files changed, 84 insertions(+), 64 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 7f2b750..5987afe 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -33,6 +33,7 @@
#include "tracker-thumbnailer.h"
#include "tracker-miner-fs-processing-pool.h"
#include "tracker-priority-queue.h"
+#include "tracker-task-pool.h"
/* If defined will print the tree from GNode while running */
#ifdef CRAWLED_TREE_ENABLE_TRACE
@@ -132,6 +133,7 @@ typedef struct {
} DirectoryData;
typedef struct {
+ GFile *file;
gchar *urn;
gchar *parent_urn;
GCancellable *cancellable;
@@ -199,6 +201,11 @@ struct _TrackerMinerFSPrivate {
gdouble throttle;
+ /* Extraction tasks */
+ TrackerTaskPool *task_pool;
+ GList *extraction_tasks;
+
+ /* Sparql insertion tasks */
TrackerProcessingPool *processing_pool;
/* URI mtime cache */
@@ -363,7 +370,7 @@ static void tracker_miner_fs_directory_add_internal (TrackerMinerFS *f
static gboolean miner_fs_has_children_without_parent (TrackerMinerFS *fs,
GFile *file);
-static void processing_pool_cancel_foreach (gpointer data,
+static void task_pool_cancel_foreach (gpointer data,
gpointer user_data);
@@ -688,7 +695,8 @@ tracker_miner_fs_init (TrackerMinerFS *object)
(GDestroyNotify) g_free,
(GDestroyNotify) NULL);
- /* Create processing pool */
+ /* Create processing pools */
+ priv->task_pool = tracker_task_pool_new (DEFAULT_WAIT_POOL_LIMIT);
priv->processing_pool = tracker_processing_pool_new (object,
DEFAULT_WAIT_POOL_LIMIT,
DEFAULT_READY_POOL_LIMIT,
@@ -794,9 +802,12 @@ fs_finalize (GObject *object)
tracker_priority_queue_unref (priv->crawled_directories);
/* Cancel every pending task */
- tracker_processing_pool_foreach (priv->processing_pool,
- processing_pool_cancel_foreach,
- NULL);
+ tracker_task_pool_foreach (priv->task_pool,
+ task_pool_cancel_foreach,
+ NULL);
+ g_object_unref (priv->task_pool);
+ g_list_free (priv->extraction_tasks);
+
tracker_processing_pool_free (priv->processing_pool);
tracker_priority_queue_foreach (priv->items_moved,
@@ -858,8 +869,8 @@ fs_set_property (GObject *object,
g_value_get_double (value));
break;
case PROP_WAIT_POOL_LIMIT:
- tracker_processing_pool_set_wait_limit (fs->priv->processing_pool,
- g_value_get_uint (value));
+ tracker_task_pool_set_limit (fs->priv->task_pool,
+ g_value_get_uint (value));
break;
case PROP_READY_POOL_LIMIT:
tracker_processing_pool_set_ready_limit (fs->priv->processing_pool,
@@ -897,7 +908,7 @@ fs_get_property (GObject *object,
break;
case PROP_WAIT_POOL_LIMIT:
g_value_set_uint (value,
- tracker_processing_pool_get_wait_limit (fs->priv->processing_pool));
+ tracker_task_pool_get_limit (fs->priv->task_pool));
break;
case PROP_READY_POOL_LIMIT:
g_value_set_uint (value,
@@ -1621,8 +1632,8 @@ update_processing_task_context_free (UpdateProcessingTaskContext *ctxt)
}
static gboolean
-do_process_file (TrackerMinerFS *fs,
- TrackerProcessingTask *task)
+do_process_file (TrackerMinerFS *fs,
+ TrackerTask *task)
{
TrackerMinerFSPrivate *priv;
gboolean processing;
@@ -1631,8 +1642,8 @@ do_process_file (TrackerMinerFS *fs,
GFile *task_file;
UpdateProcessingTaskContext *ctxt;
- ctxt = tracker_processing_task_get_context (task);
- task_file = tracker_processing_task_get_file (task);
+ ctxt = tracker_task_get_data (task);
+ task_file = tracker_task_get_file (task);
uri = g_file_get_uri (task_file);
priv = fs->priv;
@@ -1659,7 +1670,7 @@ do_process_file (TrackerMinerFS *fs,
/* Re-fetch data, since it might have been
* removed in broken implementations
*/
- task = tracker_processing_pool_find_task (priv->processing_pool, task_file, FALSE);
+ task = tracker_task_pool_find (priv->task_pool, task_file);
g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
@@ -1669,8 +1680,9 @@ do_process_file (TrackerMinerFS *fs,
"tracker_miner_fs_file_notify(), this is an "
"implementation error", G_OBJECT_TYPE_NAME (fs), uri);
} else {
- tracker_processing_pool_remove_task (priv->processing_pool, task);
- tracker_processing_task_unref (task);
+ tracker_task_pool_remove (priv->task_pool, task);
+ priv->extraction_tasks = g_list_remove (priv->extraction_tasks, task);
+ tracker_task_unref (task);
}
}
@@ -1680,22 +1692,28 @@ do_process_file (TrackerMinerFS *fs,
}
static void
-item_add_or_update_cb (TrackerMinerFS *fs,
- TrackerProcessingTask *task,
- const GError *error)
+item_add_or_update_cb (TrackerMinerFS *fs,
+ TrackerTask *extraction_task,
+ const GError *error)
{
UpdateProcessingTaskContext *ctxt;
+ TrackerProcessingTask *task;
GFile *task_file;
gchar *uri;
- ctxt = tracker_processing_task_get_context (task);
- task_file = tracker_processing_task_get_file (task);
+ ctxt = tracker_task_get_data (extraction_task);
+ task_file = tracker_task_get_file (extraction_task);
uri = g_file_get_uri (task_file);
if (error) {
- TrackerProcessingTask *first_item_task;
+ TrackerTask *first_item_task = NULL;
+ GList *first_task;
- first_item_task = tracker_processing_pool_get_last_wait (fs->priv->processing_pool);
+ first_task = g_list_last (fs->priv->extraction_tasks);
+
+ if (first_task) {
+ first_item_task = first_task->data;
+ }
/* Perhaps this is too specific to TrackerMinerFiles, if the extractor
* is choking on some file, the miner will get a timeout for all files
@@ -1703,7 +1721,7 @@ item_add_or_update_cb (TrackerMinerFS *fs,
* is the first one that was added to the processing pool, so we retry
* the others.
*/
- if (task != first_item_task &&
+ if (extraction_task != first_item_task &&
(error->code == G_DBUS_ERROR_NO_REPLY ||
error->code == G_DBUS_ERROR_TIMEOUT ||
error->code == G_DBUS_ERROR_TIMED_OUT)) {
@@ -1713,7 +1731,7 @@ item_add_or_update_cb (TrackerMinerFS *fs,
g_object_unref (ctxt->builder);
ctxt->builder = tracker_sparql_builder_new_update ();
- do_process_file (fs, task);
+ do_process_file (fs, extraction_task);
g_free (uri);
return;
@@ -1724,8 +1742,10 @@ item_add_or_update_cb (TrackerMinerFS *fs,
if (error->code == G_IO_ERROR_CANCELLED) {
/* Cancelled is cancelled, just move along in this case */
- tracker_processing_pool_remove_task (fs->priv->processing_pool, task);
- tracker_processing_task_unref (task);
+ tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
+ fs->priv->extraction_tasks = g_list_remove (fs->priv->extraction_tasks,
+ extraction_task);
+ tracker_task_unref (extraction_task);
item_queue_handlers_set_up (fs);
return;
@@ -1733,6 +1753,12 @@ item_add_or_update_cb (TrackerMinerFS *fs,
}
}
+ tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
+ fs->priv->extraction_tasks = g_list_remove (fs->priv->extraction_tasks,
+ extraction_task);
+
+ task = tracker_processing_task_new (task_file);
+
if (ctxt->urn) {
gboolean attribute_update_only;
@@ -1807,6 +1833,8 @@ item_add_or_update_cb (TrackerMinerFS *fs,
item_queue_handlers_set_up (fs);
}
+ tracker_task_unref (extraction_task);
+
g_free (uri);
}
@@ -1818,7 +1846,7 @@ item_add_or_update (TrackerMinerFS *fs,
TrackerSparqlBuilder *sparql;
GCancellable *cancellable;
gboolean retval;
- TrackerProcessingTask *task;
+ TrackerTask *task;
GFile *parent;
const gchar *urn;
const gchar *parent_urn = NULL;
@@ -1871,21 +1899,20 @@ item_add_or_update (TrackerMinerFS *fs,
/* Create task and add it to the pool as a WAIT task (we need to extract
* the file metadata and such) */
- task = tracker_processing_task_new (file);
ctxt = update_processing_task_context_new (TRACKER_MINER (fs),
urn,
parent_urn,
cancellable,
sparql);
- tracker_processing_task_set_context (task,
- ctxt,
- (GFreeFunc) update_processing_task_context_free);
- tracker_processing_pool_push_wait_task (priv->processing_pool, task);
+ task = tracker_task_new (file, ctxt,
+ (GDestroyNotify) update_processing_task_context_free);
+ tracker_task_pool_add (priv->task_pool, task);
+ priv->extraction_tasks = g_list_prepend (priv->extraction_tasks, task);
if (do_process_file (fs, task)) {
fs->priv->total_files_processed++;
- if (tracker_processing_pool_wait_limit_reached (priv->processing_pool)) {
+ if (tracker_task_pool_limit_reached (priv->task_pool)) {
retval = FALSE;
}
}
@@ -2435,9 +2462,7 @@ should_wait (TrackerMinerFS *fs,
GFile *parent;
/* Is the item already being processed? */
- if (tracker_processing_pool_find_task (fs->priv->processing_pool,
- file,
- TRUE)) {
+ if (tracker_task_pool_find (fs->priv->task_pool, file)) {
/* Yes, a previous event on same item currently
* being processed */
return TRUE;
@@ -2446,9 +2471,7 @@ should_wait (TrackerMinerFS *fs,
/* Is the item's parent being processed right now? */
parent = g_file_get_parent (file);
if (parent) {
- if (tracker_processing_pool_find_task (fs->priv->processing_pool,
- parent,
- TRUE)) {
+ if (tracker_task_pool_find (fs->priv->task_pool, parent)) {
/* Yes, a previous event on the parent of this item
* currently being processed */
g_object_unref (parent);
@@ -2503,7 +2526,7 @@ item_queue_get_next_file (TrackerMinerFS *fs,
!tracker_priority_queue_is_empty (fs->priv->crawled_directories)) {
trace_eq ("Created items queue empty, but still crawling (%d tasks in WAIT state)",
- tracker_processing_pool_get_wait_task_count (fs->priv->processing_pool));
+ tracker_task_pool_get_size (fs->priv->task_pool));
/* The items_created queue is empty, but there are pending
* items from the crawler to be processed. We feed the queue
@@ -2511,7 +2534,7 @@ item_queue_get_next_file (TrackerMinerFS *fs,
* info is inserted to the store before the children are
* inspected.
*/
- if (tracker_processing_pool_get_wait_task_count (fs->priv->processing_pool) > 0) {
+ if (tracker_task_pool_get_size (fs->priv->task_pool) > 0) {
/* Items still being processed */
*file = NULL;
*source_file = NULL;
@@ -2842,6 +2865,7 @@ item_queue_handlers_cb (gpointer user_data)
case QUEUE_NONE:
/* Print stats and signal finished */
if (!fs->priv->is_crawling &&
+ tracker_task_pool_get_size (fs->priv->task_pool) == 0 &&
tracker_processing_pool_get_total_task_count (fs->priv->processing_pool) == 0) {
process_stop (fs);
}
@@ -2936,7 +2960,7 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
}
/* Already sent max number of tasks to tracker-extract? */
- if (tracker_processing_pool_wait_limit_reached (fs->priv->processing_pool)) {
+ if (tracker_task_pool_limit_reached (fs->priv->task_pool)) {
return;
}
@@ -4275,16 +4299,16 @@ tracker_miner_fs_directory_add (TrackerMinerFS *fs,
}
static void
-processing_pool_cancel_foreach (gpointer data,
- gpointer user_data)
+task_pool_cancel_foreach (gpointer data,
+ gpointer user_data)
{
- TrackerProcessingTask *task = data;
+ TrackerTask *task = data;
GFile *file = user_data;
GFile *task_file;
UpdateProcessingTaskContext *ctxt;
- task_file = tracker_processing_task_get_file (task);
- ctxt = tracker_processing_task_get_context (task);
+ ctxt = tracker_task_get_data (task);
+ task_file = tracker_task_get_file (task);
if (ctxt &&
ctxt->cancellable &&
@@ -4324,9 +4348,9 @@ tracker_miner_fs_directory_remove (TrackerMinerFS *fs,
g_debug ("Removing directory");
/* Cancel all pending tasks on files inside the path given by file */
- tracker_processing_pool_foreach (priv->processing_pool,
- processing_pool_cancel_foreach,
- file);
+ tracker_task_pool_foreach (priv->task_pool,
+ task_pool_cancel_foreach,
+ file);
g_debug (" Cancelled processing pool tasks at %f\n", g_timer_elapsed (timer, NULL));
@@ -4630,16 +4654,14 @@ tracker_miner_fs_file_notify (TrackerMinerFS *fs,
GFile *file,
const GError *error)
{
- TrackerProcessingTask *task;
+ TrackerTask *task;
g_return_if_fail (TRACKER_IS_MINER_FS (fs));
g_return_if_fail (G_IS_FILE (file));
fs->priv->total_files_notified++;
- task = tracker_processing_pool_find_task (fs->priv->processing_pool,
- file,
- FALSE);
+ task = tracker_task_pool_find (fs->priv->task_pool, file);
if (!task) {
gchar *uri;
@@ -4742,15 +4764,13 @@ G_CONST_RETURN gchar *
tracker_miner_fs_get_urn (TrackerMinerFS *fs,
GFile *file)
{
- TrackerProcessingTask *task;
+ TrackerTask *task;
g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
g_return_val_if_fail (G_IS_FILE (file), NULL);
/* Check if found in currently processed data */
- task = tracker_processing_pool_find_task (fs->priv->processing_pool,
- file,
- FALSE);
+ task = tracker_task_pool_find (fs->priv->task_pool, file);
if (!task) {
gchar *uri;
@@ -4766,7 +4786,8 @@ tracker_miner_fs_get_urn (TrackerMinerFS *fs,
UpdateProcessingTaskContext *ctxt;
/* We are only storing the URN in the created/updated tasks */
- ctxt = tracker_processing_task_get_context (task);
+ ctxt = tracker_task_get_data (task);
+
if (!ctxt) {
gchar *uri;
@@ -4834,15 +4855,13 @@ G_CONST_RETURN gchar *
tracker_miner_fs_get_parent_urn (TrackerMinerFS *fs,
GFile *file)
{
- TrackerProcessingTask *task;
+ TrackerTask *task;
g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
g_return_val_if_fail (G_IS_FILE (file), NULL);
/* Check if found in currently processed data */
- task = tracker_processing_pool_find_task (fs->priv->processing_pool,
- file,
- FALSE);
+ task = tracker_task_pool_find (fs->priv->task_pool, file);
if (!task) {
gchar *uri;
@@ -4858,7 +4877,8 @@ tracker_miner_fs_get_parent_urn (TrackerMinerFS *fs,
UpdateProcessingTaskContext *ctxt;
/* We are only storing the URN in the created/updated tasks */
- ctxt = tracker_processing_task_get_context (task);
+ ctxt = tracker_task_get_data (task);
+
if (!ctxt) {
gchar *uri;
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]