[tracker/miner-fs-refactor] libtracker-miner: Use different limits for WAIT and PROCESS tasks
- From: Aleksander Morgado <aleksm src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-fs-refactor] libtracker-miner: Use different limits for WAIT and PROCESS tasks
- Date: Thu, 14 Oct 2010 13:59:17 +0000 (UTC)
commit aa69ae0667c99de5c404c4f6021de79f7d980be6
Author: Aleksander Morgado <aleksander lanedo com>
Date: Thu Oct 14 12:01:53 2010 +0200
libtracker-miner: Use different limits for WAIT and PROCESS tasks
.../tracker-miner-fs-processing-pool.c | 182 +++++++++++++-------
.../tracker-miner-fs-processing-pool.h | 53 +++---
src/libtracker-miner/tracker-miner-fs.c | 55 ++++--
src/miners/fs/tracker-miner-files.c | 5 +-
4 files changed, 191 insertions(+), 104 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index a78599b..3a2543c 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -24,8 +24,9 @@
typedef enum {
PROCESSING_TASK_STATUS_NO_POOL,
- PROCESSING_TASK_STATUS_WAIT,
+ PROCESSING_TASK_STATUS_WAIT = 0,
PROCESSING_TASK_STATUS_PROCESS,
+ PROCESSING_TASK_STATUS_LAST
} ProcessingTaskStatus;
struct _ProcessingTask {
@@ -117,10 +118,10 @@ struct _ProcessingPool {
/* Connection to the Store */
TrackerSparqlConnection *connection;
- /* The tasks currently being processed */
- GQueue *tasks;
- /* The processing pool limit */
- guint limit;
+ /* The tasks currently in WAIT or PROCESS status */
+ GQueue *tasks[PROCESSING_TASK_STATUS_LAST];
+ /* The processing pool limits */
+ guint limit[PROCESSING_TASK_STATUS_LAST];
};
static void
@@ -133,15 +134,21 @@ pool_queue_free_foreach (gpointer data,
void
processing_pool_free (ProcessingPool *pool)
{
+ guint i;
+
if (!pool)
return;
/* Free any pending task here... shouldn't really
* be any */
- g_queue_foreach (pool->tasks,
- pool_queue_free_foreach,
- NULL);
- g_queue_free (pool->tasks);
+ for (i = PROCESSING_TASK_STATUS_WAIT;
+ i < PROCESSING_TASK_STATUS_LAST;
+ i++) {
+ g_queue_foreach (pool->tasks[i],
+ pool_queue_free_foreach,
+ NULL);
+ g_queue_free (pool->tasks[i]);
+ }
g_object_unref (pool->connection);
g_free (pool);
@@ -149,33 +156,71 @@ processing_pool_free (ProcessingPool *pool)
ProcessingPool *
processing_pool_new (TrackerSparqlConnection *connection,
- guint limit)
+ guint limit_wait,
+ guint limit_process)
{
ProcessingPool *pool;
pool = g_new0 (ProcessingPool, 1);
pool->connection = g_object_ref (connection);
- pool->limit = limit;
- pool->tasks = g_queue_new ();
+ pool->limit[PROCESSING_TASK_STATUS_WAIT] = limit_wait;
+ pool->limit[PROCESSING_TASK_STATUS_PROCESS] = limit_process;
+
+ pool->tasks[PROCESSING_TASK_STATUS_WAIT] = g_queue_new ();
+ pool->tasks[PROCESSING_TASK_STATUS_PROCESS] = g_queue_new ();
- g_debug ("Processing pool created with a limit of %u tasks", limit);
+ g_debug ("Processing pool created with a limit of "
+ "%u tasks in WAIT status and "
+ "%u tasks in PROCESS status",
+ limit_wait,
+ limit_process);
return pool;
}
void
-processing_pool_set_limit (ProcessingPool *pool,
- guint limit)
+processing_pool_set_wait_limit (ProcessingPool *pool,
+ guint limit)
{
- g_message ("Processing pool limit is set to %u", limit);
- pool->limit = limit;
+ g_message ("Processing pool limit for WAIT tasks set to %u", limit);
+ pool->limit[PROCESSING_TASK_STATUS_WAIT] = limit;
+}
+
+void
+processing_pool_set_process_limit (ProcessingPool *pool,
+ guint limit)
+{
+ g_message ("Processing pool limit for PROCESS tasks set to %u", limit);
+ pool->limit[PROCESSING_TASK_STATUS_PROCESS] = limit;
}
guint
-processing_pool_get_limit (ProcessingPool *pool)
+processing_pool_get_wait_limit (ProcessingPool *pool)
{
- return pool->limit;
+ return pool->limit[PROCESSING_TASK_STATUS_WAIT];
+}
+
+guint
+processing_pool_get_process_limit (ProcessingPool *pool)
+{
+ return pool->limit[PROCESSING_TASK_STATUS_PROCESS];
+}
+
+gboolean
+processing_pool_wait_limit_reached (ProcessingPool *pool)
+{
+ return ((g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_WAIT]) >=
+ pool->limit[PROCESSING_TASK_STATUS_WAIT]) ?
+ TRUE : FALSE);
+}
+
+gboolean
+processing_pool_process_limit_reached (ProcessingPool *pool)
+{
+ return ((g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_PROCESS]) >=
+ pool->limit[PROCESSING_TASK_STATUS_PROCESS]) ?
+ TRUE : FALSE);
}
ProcessingTask *
@@ -183,28 +228,34 @@ processing_pool_find_task (ProcessingPool *pool,
GFile *file,
gboolean path_search)
{
- GList *l;
-
- for (l = pool->tasks->head; l; l = g_list_next (l)) {
- ProcessingTask *task = l->data;
-
- if (!path_search) {
- /* Different operations for the same file URI could be
- * piled up here, each being a different GFile object.
- * Miner implementations should really notify on the
- * same GFile object that's being passed, so we check for
- * pointer equality here, rather than doing path comparisons
- */
- if(task->file == file)
- return task;
- } else {
- /* Note that if there are different GFiles being
- * processed for the same file path, we are actually
- * returning the first one found, If you want exactly
- * the same GFile as the one as input, use the
- * process_data_find() method instead */
- if (g_file_equal (task->file, file))
- return task;
+ guint i;
+
+ for (i = PROCESSING_TASK_STATUS_WAIT;
+ i < PROCESSING_TASK_STATUS_PROCESS;
+ i++) {
+ GList *l;
+
+ for (l = pool->tasks[i]->head; l; l = g_list_next (l)) {
+ ProcessingTask *task = l->data;
+
+ if (!path_search) {
+ /* Different operations for the same file URI could be
+ * piled up here, each being a different GFile object.
+ * Miner implementations should really notify on the
+ * same GFile object that's being passed, so we check for
+ * pointer equality here, rather than doing path comparisons
+ */
+ if(task->file == file)
+ return task;
+ } else {
+ /* Note that if there are different GFiles being
+ * processed for the same file path, we are actually
+ * returning the first one found, If you want exactly
+ * the same GFile as the one as input, use the
+ * process_data_find() method instead */
+ if (g_file_equal (task->file, file))
+ return task;
+ }
}
}
@@ -213,13 +264,6 @@ processing_pool_find_task (ProcessingPool *pool,
}
gboolean
-processing_pool_limit_reached (ProcessingPool *pool)
-{
- return (g_queue_get_length (pool->tasks) >= pool->limit ?
- TRUE : FALSE);
-}
-
-gboolean
processing_pool_wait_task (ProcessingPool *pool,
ProcessingTask *task)
{
@@ -230,10 +274,10 @@ processing_pool_wait_task (ProcessingPool *pool,
/* Push a new task in WAIT status (so just add it to the tasks queue,
* and don't process it. */
- g_queue_push_head (pool->tasks, task);
+ g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_WAIT], task);
task->pool = pool;
- return (!processing_pool_limit_reached (pool));
+ return (!processing_pool_wait_limit_reached (pool));
}
static void
@@ -276,14 +320,17 @@ processing_pool_process_task (ProcessingPool *pool,
g_assert (task->sparql != NULL);
/* First, check if the task was already added as being WAITING */
- previous = g_queue_find (pool->tasks, task);
+ previous = g_queue_find (pool->tasks[PROCESSING_TASK_STATUS_WAIT], task);
if (!previous) {
- /* Add it to the queue */
- g_queue_push_head (pool->tasks, task);
+ /* Add it to the PROCESS queue */
+ g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_PROCESS], task);
task->pool = pool;
} else {
/* Make sure it was a WAIT task */
g_assert (task->status == PROCESSING_TASK_STATUS_WAIT);
+ /* Move task from WAIT queue to PROCESS queue */
+ g_queue_delete_link (pool->tasks[PROCESSING_TASK_STATUS_WAIT], previous);
+ g_queue_push_head (pool->tasks[PROCESSING_TASK_STATUS_PROCESS], task);
}
/* Set status of the task as PROCESS */
@@ -300,7 +347,7 @@ processing_pool_process_task (ProcessingPool *pool,
sparql_update_cb,
task);
- return (!processing_pool_limit_reached (pool));
+ return (!processing_pool_process_limit_reached (pool));
}
void
@@ -313,25 +360,32 @@ processing_pool_remove_task (ProcessingPool *pool,
g_assert (pool == task->pool);
/* Make sure the task was in the pool */
- in_pool = g_queue_find (pool->tasks, task);
+ in_pool = g_queue_find (pool->tasks[task->status], task);
g_assert (in_pool != NULL);
- g_queue_delete_link (pool->tasks, in_pool);
+ g_queue_delete_link (pool->tasks[task->status], in_pool);
task->pool = NULL;
+ task->status = PROCESSING_TASK_STATUS_NO_POOL;
+}
+
+guint
+processing_pool_get_wait_task_count (ProcessingPool *pool)
+{
+ return g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_WAIT]);
}
guint
-processing_pool_get_task_count (ProcessingPool *pool)
+processing_pool_get_process_task_count (ProcessingPool *pool)
{
- return g_queue_get_length (pool->tasks);
+ return g_queue_get_length (pool->tasks[PROCESSING_TASK_STATUS_PROCESS]);
}
ProcessingTask *
-processing_pool_get_last_wait (ProcessingPool *pool)
+processing_pool_get_last_wait (ProcessingPool *pool)
{
GList *li;
- for (li = pool->tasks->tail; li; li = g_list_previous (li)) {
+ for (li = pool->tasks[PROCESSING_TASK_STATUS_WAIT]->tail; li; li = g_list_previous (li)) {
ProcessingTask *task = li->data;
if (task->status == PROCESSING_TASK_STATUS_WAIT) {
@@ -346,5 +400,11 @@ processing_pool_foreach (ProcessingPool *pool,
GFunc func,
gpointer user_data)
{
- g_queue_foreach (pool->tasks, func, user_data);
+ guint i;
+
+ for (i = PROCESSING_TASK_STATUS_WAIT;
+ i < PROCESSING_TASK_STATUS_PROCESS;
+ i++) {
+ g_queue_foreach (pool->tasks[i], func, user_data);
+ }
}
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index ab2275a..e2274ae 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -47,31 +47,36 @@ void processing_task_set_sparql (ProcessingTask *task,
gchar *sparql);
-ProcessingPool *processing_pool_new (TrackerSparqlConnection *connection,
- guint limit);
-void processing_pool_free (ProcessingPool *pool);
-void processing_pool_set_limit (ProcessingPool *pool,
- guint limit);
-guint processing_pool_get_limit (ProcessingPool *pool);
-ProcessingTask *processing_pool_find_task (ProcessingPool *pool,
- GFile *file,
- gboolean path_search);
-gboolean processing_pool_limit_reached (ProcessingPool *pool);
-void processing_pool_remove_task (ProcessingPool *pool,
- ProcessingTask *task);
-gboolean processing_pool_wait_task (ProcessingPool *pool,
- ProcessingTask *task);
-gboolean processing_pool_process_task (ProcessingPool *pool,
- ProcessingTask *task,
- ProcessingPoolTaskFinishedCallback finished_handler,
- gpointer user_data);
+ProcessingPool *processing_pool_new (TrackerSparqlConnection *connection,
+ guint limit_wait,
+ guint limit_process);
+void processing_pool_free (ProcessingPool *pool);
+void processing_pool_set_wait_limit (ProcessingPool *pool,
+ guint limit);
+void processing_pool_set_process_limit (ProcessingPool *pool,
+ guint limit);
+guint processing_pool_get_wait_limit (ProcessingPool *pool);
+guint processing_pool_get_process_limit (ProcessingPool *pool);
+ProcessingTask *processing_pool_find_task (ProcessingPool *pool,
+ GFile *file,
+ gboolean path_search);
+gboolean processing_pool_wait_limit_reached (ProcessingPool *pool);
+gboolean processing_pool_process_limit_reached (ProcessingPool *pool);
-guint processing_pool_get_task_count (ProcessingPool *pool);
-ProcessingTask *processing_pool_get_last_wait (ProcessingPool *pool);
-
-void processing_pool_foreach (ProcessingPool *pool,
- GFunc func,
- gpointer user_data);
+void processing_pool_remove_task (ProcessingPool *pool,
+ ProcessingTask *task);
+gboolean processing_pool_wait_task (ProcessingPool *pool,
+ ProcessingTask *task);
+gboolean processing_pool_process_task (ProcessingPool *pool,
+ ProcessingTask *task,
+ ProcessingPoolTaskFinishedCallback finished_handler,
+ gpointer user_data);
+guint processing_pool_get_wait_task_count (ProcessingPool *pool);
+guint processing_pool_get_process_task_count (ProcessingPool *pool);
+ProcessingTask *processing_pool_get_last_wait (ProcessingPool *pool);
+void processing_pool_foreach (ProcessingPool *pool,
+ GFunc func,
+ gpointer user_data);
G_END_DECLS
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 687b5a9..f6c46e6 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -39,8 +39,9 @@
/* If defined will print contents of populated mtime cache while running */
#undef PRINT_MTIME_CACHE_CONTENTS
-/* Default processing pool limit to be set */
-#define DEFAULT_POOL_LIMIT 1
+/* Default processing pool limits to be set */
+#define DEFAULT_WAIT_POOL_LIMIT 1
+#define DEFAULT_PROCESS_POOL_LIMIT 1
/**
* SECTION:tracker-miner-fs
@@ -195,7 +196,8 @@ enum {
enum {
PROP_0,
PROP_THROTTLE,
- PROP_POOL_LIMIT,
+ PROP_WAIT_POOL_LIMIT,
+ PROP_PROCESS_POOL_LIMIT,
PROP_MTIME_CHECKING,
PROP_INITIAL_CRAWLING
};
@@ -320,11 +322,20 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
0, 1, 0,
G_PARAM_READWRITE));
g_object_class_install_property (object_class,
- PROP_POOL_LIMIT,
+ PROP_WAIT_POOL_LIMIT,
+ g_param_spec_uint ("wait-pool-limit",
+ "Processing pool limit for WAIT tasks",
+ "Maximum number of files that can be concurrently "
+ "processed by the upper layer",
+ 1, G_MAXUINT, DEFAULT_WAIT_POOL_LIMIT,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
+ g_object_class_install_property (object_class,
+ PROP_PROCESS_POOL_LIMIT,
g_param_spec_uint ("process-pool-limit",
- "Processing pool limit",
- "Number of files that can be concurrently processed",
- 1, G_MAXUINT, DEFAULT_POOL_LIMIT,
+ "Processing pool limit for PROCESS tasks",
+ "Maximum number of SPARQL updates that can be merged "
+ "in a single connection to the store",
+ 1, G_MAXUINT, DEFAULT_PROCESS_POOL_LIMIT,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
g_object_class_install_property (object_class,
PROP_MTIME_CHECKING,
@@ -567,7 +578,8 @@ tracker_miner_fs_init (TrackerMinerFS *object)
/* Create processing pool */
priv->processing_pool = processing_pool_new (tracker_miner_get_connection (TRACKER_MINER (object)),
- DEFAULT_POOL_LIMIT);
+ DEFAULT_WAIT_POOL_LIMIT,
+ DEFAULT_PROCESS_POOL_LIMIT);
/* Set up the crawlers now we have config and hal */
priv->crawler = tracker_crawler_new ();
@@ -712,9 +724,13 @@ fs_set_property (GObject *object,
tracker_miner_fs_set_throttle (TRACKER_MINER_FS (object),
g_value_get_double (value));
break;
- case PROP_POOL_LIMIT:
- processing_pool_set_limit (fs->private->processing_pool,
- g_value_get_uint (value));
+ case PROP_WAIT_POOL_LIMIT:
+ processing_pool_set_wait_limit (fs->private->processing_pool,
+ g_value_get_uint (value));
+ break;
+ case PROP_PROCESS_POOL_LIMIT:
+ processing_pool_set_process_limit (fs->private->processing_pool,
+ g_value_get_uint (value));
break;
case PROP_MTIME_CHECKING:
fs->private->mtime_checking = g_value_get_boolean (value);
@@ -742,9 +758,13 @@ fs_get_property (GObject *object,
case PROP_THROTTLE:
g_value_set_double (value, fs->private->throttle);
break;
- case PROP_POOL_LIMIT:
+ case PROP_WAIT_POOL_LIMIT:
+ g_value_set_uint (value,
+ processing_pool_get_wait_limit (fs->private->processing_pool));
+ break;
+ case PROP_PROCESS_POOL_LIMIT:
g_value_set_uint (value,
- processing_pool_get_limit (fs->private->processing_pool));
+ processing_pool_get_process_limit (fs->private->processing_pool));
break;
case PROP_MTIME_CHECKING:
g_value_set_boolean (value, fs->private->mtime_checking);
@@ -1644,7 +1664,7 @@ item_add_or_update (TrackerMinerFS *fs,
if (do_process_file (fs, task)) {
fs->private->total_files_processed++;
- if (processing_pool_limit_reached (priv->processing_pool)) {
+ if (processing_pool_wait_limit_reached (priv->processing_pool)) {
retval = FALSE;
}
}
@@ -2191,7 +2211,7 @@ item_queue_get_next_file (TrackerMinerFS *fs,
* info is inserted to the store before the children are
* inspected.
*/
- if (processing_pool_get_task_count (fs->private->processing_pool) > 0) {
+ if (processing_pool_get_wait_task_count (fs->private->processing_pool) > 0) {
/* Items still being processed */
*file = NULL;
*source_file = NULL;
@@ -2422,7 +2442,8 @@ item_queue_handlers_cb (gpointer user_data)
case QUEUE_NONE:
/* Print stats and signal finished */
if (!fs->private->is_crawling &&
- processing_pool_get_task_count (fs->private->processing_pool) == 0) {
+ processing_pool_get_wait_task_count (fs->private->processing_pool) == 0 &&
+ processing_pool_get_process_task_count (fs->private->processing_pool) == 0) {
process_stop (fs);
}
@@ -2492,7 +2513,7 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
return;
}
- if (processing_pool_limit_reached (fs->private->processing_pool)) {
+ if (processing_pool_wait_limit_reached (fs->private->processing_pool)) {
/* There is no room in the pool for more files */
return;
}
diff --git a/src/miners/fs/tracker-miner-files.c b/src/miners/fs/tracker-miner-files.c
index c2a596e..e93cea3 100644
--- a/src/miners/fs/tracker-miner-files.c
+++ b/src/miners/fs/tracker-miner-files.c
@@ -1866,7 +1866,7 @@ extractor_create_proxy (DBusGConnection *connection)
* Assuming that the files which need more time to get extracted are PDFs
* using libpoppler, we already have a limit in the PDF extractor not to
* spend more than 5s extraction contents. And, assuming the default
- * value of 10 in process-pool-limit, it means we may end up queueing up
+ * value of 10 in wait-pool-limit, it means we may end up queueing up
* to 10 PDF files which may need 5s each, so in order not to have dbus
* timeouts in this case, any value greater than 5*10 would be good.
*/
@@ -2460,7 +2460,8 @@ tracker_miner_files_new (TrackerConfig *config)
return g_object_new (TRACKER_TYPE_MINER_FILES,
"name", "Files",
"config", config,
- "process-pool-limit", 10,
+ "wait-pool-limit", 10,
+ "process-pool-limit", 100,
"mtime-checking", should_check_mtime (config),
NULL);
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]