[tracker/miner-fs-limit-requests: 3/7] libtracker-miner: Always try to buffer requests and use UpdateArray
- From: Martyn James Russell <mr src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-fs-limit-requests: 3/7] libtracker-miner: Always try to buffer requests and use UpdateArray
- Date: Tue, 19 Apr 2011 10:21:42 +0000 (UTC)
commit aa13f68052d30751adb586d7409bdb024654d2e1
Author: Aleksander Morgado <aleksander lanedo com>
Date: Tue Mar 15 17:00:31 2011 +0100
libtracker-miner: Always try to buffer requests and use UpdateArray
.../tracker-miner-fs-processing-pool.c | 213 +++++---------------
.../tracker-miner-fs-processing-pool.h | 1 -
src/libtracker-miner/tracker-miner-fs.c | 4 -
3 files changed, 52 insertions(+), 166 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index bf22c43..34e6d81 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -604,7 +604,6 @@ tracker_processing_pool_push_wait_task (TrackerProcessingPool *pool,
/* Set status of the task as WAIT */
task->status = TRACKER_PROCESSING_TASK_STATUS_WAIT;
-
trace ("(Processing Pool %s) Pushed WAIT task %p for file '%s'",
G_OBJECT_TYPE_NAME (pool->miner),
task,
@@ -617,53 +616,6 @@ tracker_processing_pool_push_wait_task (TrackerProcessingPool *pool,
}
static void
-tracker_processing_pool_sparql_update_cb (GObject *object,
- GAsyncResult *result,
- gpointer user_data)
-{
- TrackerProcessingPool *pool;
- TrackerProcessingTask *task;
- GError *error = NULL;
- gboolean flush_next;
-
- task = user_data;
- pool = task->pool;
-
- /* If we had reached the limit of requests, flush next as this request is
- * just finished */
- flush_next = tracker_processing_pool_n_requests_limit_reached (pool);
-
- /* Request finished */
- pool->n_requests--;
-
- trace ("(Processing Pool) Finished update of task %p for file '%s' "
- "(%u requests pending)",
- task,
- task->file_uri,
- pool->n_requests);
-
- tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
-
- /* Before calling user-provided callback, REMOVE the task from the pool;
- * as the user-provided callback may actually modify the pool again */
- tracker_processing_pool_remove_task (task->pool, task);
-
- /* Call finished handler with the error, if any */
- task->finished_handler (task, task->finished_user_data, error);
-
- /* Deallocate unneeded stuff */
- tracker_processing_task_free (task);
- g_clear_error (&error);
-
- /* Flush if needed */
- if (flush_next) {
- tracker_processing_pool_buffer_flush (pool,
- "Pool request limit was reached and "
- "request just finished");
- }
-}
-
-static void
tracker_processing_pool_sparql_update_array_cb (GObject *object,
GAsyncResult *result,
gpointer user_data)
@@ -686,9 +638,10 @@ tracker_processing_pool_sparql_update_array_cb (GObject *object,
/* Request finished */
pool->n_requests--;
- trace ("(Processing Pool) Finished array-update of tasks %p"
+ trace ("(Processing Pool) Finished array-update %p with %u tasks "
"(%u requests pending)",
update_data->tasks,
+ update_data->tasks->len,
pool->n_requests);
sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
@@ -1037,10 +990,11 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
gboolean
tracker_processing_pool_push_ready_task (TrackerProcessingPool *pool,
TrackerProcessingTask *task,
- gboolean buffer,
TrackerProcessingPoolTaskFinishedCallback finished_handler,
gpointer user_data)
{
+ GFile *parent;
+ gboolean flushed = FALSE;
GList *previous;
/* The task MUST have a proper content here */
@@ -1061,125 +1015,62 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
task->finished_handler = finished_handler;
task->finished_user_data = user_data;
- /* If buffering not requested, OR the limit of READY tasks is actually 1,
- * flush previous buffer (if any) and then the new update (only if n_requests limit
- * not reached, otherwise buffer it) */
- if (!tracker_processing_pool_n_requests_limit_reached (pool) &&
- (!buffer ||
- pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1)) {
- BulkOperationMerge *operation = NULL;
- const gchar *sparql = NULL;
-
- trace ("(Processing Pool %s) Pushed READY/PROCESSING task %p for file '%s'",
- G_OBJECT_TYPE_NAME (pool->miner),
- task,
- task->file_uri);
-
- /* Flush previous */
- tracker_processing_pool_buffer_flush (pool,
- "Before unbuffered task");
-
- /* Set status of the task as PROCESSING (No READY status here!) */
- task->status = TRACKER_PROCESSING_TASK_STATUS_PROCESSING;
- g_queue_push_head (pool->tasks[TRACKER_PROCESSING_TASK_STATUS_PROCESSING], task);
-
- trace ("(Processing Pool %s) Flushing single task %p",
- G_OBJECT_TYPE_NAME (pool->miner),
- task);
-
- /* And update the new one */
- if (task->content == CONTENT_SPARQL_STRING) {
- sparql = task->data.string;
- } else if (task->content == CONTENT_SPARQL_BUILDER) {
- sparql = tracker_sparql_builder_get_result (task->data.builder);
- } else if (task->content == CONTENT_BULK_OPERATION) {
- operation = bulk_operation_merge_new (task->data.bulk.bulk_operation);
- operation->tasks = g_list_prepend (NULL, task);
- bulk_operation_merge_finish (operation);
-
- if (operation->sparql) {
- sparql = operation->sparql;
- }
- }
+ /* Set status of the task as READY */
+ task->status = TRACKER_PROCESSING_TASK_STATUS_READY;
+ g_queue_push_head (pool->tasks[TRACKER_PROCESSING_TASK_STATUS_READY], task);
- if (sparql) {
- /* New Request */
- pool->n_requests++;
-
- tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
- sparql,
- G_PRIORITY_DEFAULT,
- NULL,
- tracker_processing_pool_sparql_update_cb,
- task);
- }
-
- if (operation) {
- bulk_operation_merge_free (operation);
- }
-
- return TRUE;
- } else {
- GFile *parent;
- gboolean flushed = FALSE;
+ /* Get parent of this file we're updating/creating */
+ parent = g_file_get_parent (task->file);
- /* Set status of the task as READY */
- task->status = TRACKER_PROCESSING_TASK_STATUS_READY;
- g_queue_push_head (pool->tasks[TRACKER_PROCESSING_TASK_STATUS_READY], task);
+ /* Start buffer if not already done */
+ if (!pool->sparql_buffer) {
+ pool->sparql_buffer =
+ g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_processing_task_free);
+ pool->sparql_buffer_start_time = time (NULL);
+ }
- /* Get parent of this file we're updating/creating */
- parent = g_file_get_parent (task->file);
+ /* Set current parent if not set already */
+ if (!pool->sparql_buffer_current_parent && parent) {
+ pool->sparql_buffer_current_parent = g_object_ref (parent);
+ }
- /* Start buffer if not already done */
- if (!pool->sparql_buffer) {
- pool->sparql_buffer =
- g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_processing_task_free);
- pool->sparql_buffer_start_time = time (NULL);
- }
+ trace ("(Processing Pool %s) Pushed READY task %p for file '%s' into array %p",
+ G_OBJECT_TYPE_NAME (pool->miner),
+ task,
+ task->file_uri,
+ pool->sparql_buffer);
- /* Set current parent if not set already */
- if (!pool->sparql_buffer_current_parent && parent) {
- pool->sparql_buffer_current_parent = g_object_ref (parent);
- }
+ /* Add task to array */
+ g_ptr_array_add (pool->sparql_buffer, task);
- trace ("(Processing Pool %s) Pushed READY task %p for file '%s' into array %p",
- G_OBJECT_TYPE_NAME (pool->miner),
- task,
- task->file_uri,
- pool->sparql_buffer);
-
- /* Add task to array */
- g_ptr_array_add (pool->sparql_buffer, task);
-
- /* Flush buffer if:
- * - Last item has no parent
- * - Parent change was detected
- * - Maximum number of READY items reached
- * - Not flushed in the last MAX_SPARQL_BUFFER_TIME seconds
- */
- if (!parent) {
- tracker_processing_pool_buffer_flush (pool,
- "File with no parent");
- flushed = TRUE;
- } else if (!g_file_equal (parent, pool->sparql_buffer_current_parent)) {
- tracker_processing_pool_buffer_flush (pool,
- "Different parent");
- flushed = TRUE;
- } else if (tracker_processing_pool_ready_limit_reached (pool)) {
- tracker_processing_pool_buffer_flush (pool,
- "Ready limit reached");
- flushed = TRUE;
- } else if (time (NULL) - pool->sparql_buffer_start_time > MAX_SPARQL_BUFFER_TIME) {
- tracker_processing_pool_buffer_flush (pool,
- "Buffer time reached");
- flushed = TRUE;
- }
+ /* Flush buffer if:
+ * - Last item has no parent
+ * - Parent change was detected
+ * - Maximum number of READY items reached
+ * - Not flushed in the last MAX_SPARQL_BUFFER_TIME seconds
+ */
+ if (!parent) {
+ tracker_processing_pool_buffer_flush (pool,
+ "File with no parent");
+ flushed = TRUE;
+ } else if (!g_file_equal (parent, pool->sparql_buffer_current_parent)) {
+ tracker_processing_pool_buffer_flush (pool,
+ "Different parent");
+ flushed = TRUE;
+ } else if (tracker_processing_pool_ready_limit_reached (pool)) {
+ tracker_processing_pool_buffer_flush (pool,
+ "Ready limit reached");
+ flushed = TRUE;
+ } else if (time (NULL) - pool->sparql_buffer_start_time > MAX_SPARQL_BUFFER_TIME) {
+ tracker_processing_pool_buffer_flush (pool,
+ "Buffer time reached");
+ flushed = TRUE;
+ }
- if (parent)
- g_object_unref (parent);
+ if (parent)
+ g_object_unref (parent);
- return flushed;
- }
+ return flushed;
}
void
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index f695718..956bf2c 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -83,7 +83,6 @@ void tracker_processing_pool_push_wait_task (TrackerPro
TrackerProcessingTask *task);
gboolean tracker_processing_pool_push_ready_task (TrackerProcessingPool *pool,
TrackerProcessingTask *task,
- gboolean buffer,
TrackerProcessingPoolTaskFinishedCallback finished_handler,
gpointer user_data);
guint tracker_processing_pool_get_n_requests (TrackerProcessingPool *pool);
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 60d521e..972be0c 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -1726,7 +1726,6 @@ item_add_or_update_cb (TrackerMinerFS *fs,
* and in this case we need to setup queue handlers again */
if (!tracker_processing_pool_push_ready_task (fs->private->processing_pool,
task,
- TRUE, /* buffer! */
processing_pool_task_finished_cb,
fs)) {
item_queue_handlers_set_up (fs);
@@ -1867,7 +1866,6 @@ item_remove (TrackerMinerFS *fs,
* and in this case we need to setup queue handlers again */
if (!tracker_processing_pool_push_ready_task (fs->private->processing_pool,
task,
- TRUE,
processing_pool_task_finished_cb,
fs)) {
item_queue_handlers_set_up (fs);
@@ -1891,7 +1889,6 @@ item_remove (TrackerMinerFS *fs,
* and in this case we need to setup queue handlers again */
if (!tracker_processing_pool_push_ready_task (fs->private->processing_pool,
task,
- TRUE,
processing_pool_task_finished_cb,
fs)) {
item_queue_handlers_set_up (fs);
@@ -2239,7 +2236,6 @@ item_move (TrackerMinerFS *fs,
* and in this case we need to setup queue handlers again */
if (!tracker_processing_pool_push_ready_task (fs->private->processing_pool,
task,
- FALSE,
processing_pool_task_finished_cb,
fs)) {
item_queue_handlers_set_up (fs);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]