[tracker/miner-fs-limit-requests: 3/7] libtracker-miner: Always try to buffer requests and use UpdateArray



commit 4444d8992f561c756e1012717698484321d12876
Author: Aleksander Morgado <aleksander lanedo com>
Date:   Tue Mar 15 17:00:31 2011 +0100

    libtracker-miner: Always try to buffer requests and use UpdateArray

 .../tracker-miner-fs-processing-pool.c             |  213 +++++---------------
 .../tracker-miner-fs-processing-pool.h             |    1 -
 src/libtracker-miner/tracker-miner-fs.c            |    4 -
 3 files changed, 52 insertions(+), 166 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index bf22c43..34e6d81 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -604,7 +604,6 @@ tracker_processing_pool_push_wait_task (TrackerProcessingPool *pool,
 	/* Set status of the task as WAIT */
 	task->status = TRACKER_PROCESSING_TASK_STATUS_WAIT;
 
-
 	trace ("(Processing Pool %s) Pushed WAIT task %p for file '%s'",
 	       G_OBJECT_TYPE_NAME (pool->miner),
 	       task,
@@ -617,53 +616,6 @@ tracker_processing_pool_push_wait_task (TrackerProcessingPool *pool,
 }
 
 static void
-tracker_processing_pool_sparql_update_cb (GObject      *object,
-                                          GAsyncResult *result,
-                                          gpointer      user_data)
-{
-	TrackerProcessingPool *pool;
-	TrackerProcessingTask *task;
-	GError *error = NULL;
-	gboolean flush_next;
-
-	task = user_data;
-	pool = task->pool;
-
-	/* If we had reached the limit of requests, flush next as this request is
-	 * just finished */
-	flush_next = tracker_processing_pool_n_requests_limit_reached (pool);
-
-	/* Request finished */
-	pool->n_requests--;
-
-	trace ("(Processing Pool) Finished update of task %p for file '%s' "
-	       "(%u requests pending)",
-	       task,
-	       task->file_uri,
-	       pool->n_requests);
-
-	tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
-
-	/* Before calling user-provided callback, REMOVE the task from the pool;
-	 * as the user-provided callback may actually modify the pool again */
-	tracker_processing_pool_remove_task (task->pool, task);
-
-	/* Call finished handler with the error, if any */
-	task->finished_handler (task, task->finished_user_data, error);
-
-	/* Deallocate unneeded stuff */
-	tracker_processing_task_free (task);
-	g_clear_error (&error);
-
-	/* Flush if needed */
-	if (flush_next) {
-		tracker_processing_pool_buffer_flush (pool,
-		                                      "Pool request limit was reached and "
-		                                      "request just finished");
-	}
-}
-
-static void
 tracker_processing_pool_sparql_update_array_cb (GObject      *object,
                                                 GAsyncResult *result,
                                                 gpointer      user_data)
@@ -686,9 +638,10 @@ tracker_processing_pool_sparql_update_array_cb (GObject      *object,
 	/* Request finished */
 	pool->n_requests--;
 
-	trace ("(Processing Pool) Finished array-update of tasks %p"
+	trace ("(Processing Pool) Finished array-update %p with %u tasks "
 	       "(%u requests pending)",
 	       update_data->tasks,
+	       update_data->tasks->len,
 	       pool->n_requests);
 
 	sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
@@ -1037,10 +990,11 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 gboolean
 tracker_processing_pool_push_ready_task (TrackerProcessingPool                     *pool,
                                          TrackerProcessingTask                     *task,
-                                         gboolean                                   buffer,
                                          TrackerProcessingPoolTaskFinishedCallback  finished_handler,
                                          gpointer                                   user_data)
 {
+	GFile *parent;
+	gboolean flushed = FALSE;
 	GList *previous;
 
 	/* The task MUST have a proper content here */
@@ -1061,125 +1015,62 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
 	task->finished_handler = finished_handler;
 	task->finished_user_data = user_data;
 
-	/* If buffering not requested, OR the limit of READY tasks is actually 1,
-	 * flush previous buffer (if any) and then the new update (only if n_requests limit
-	 * not reached, otherwise buffer it) */
-	if (!tracker_processing_pool_n_requests_limit_reached (pool) &&
-	    (!buffer ||
-	     pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1)) {
-		BulkOperationMerge *operation = NULL;
-		const gchar *sparql = NULL;
-
-		trace ("(Processing Pool %s) Pushed READY/PROCESSING task %p for file '%s'",
-		       G_OBJECT_TYPE_NAME (pool->miner),
-		       task,
-		       task->file_uri);
-
-		/* Flush previous */
-		tracker_processing_pool_buffer_flush (pool,
-		                                      "Before unbuffered task");
-
-		/* Set status of the task as PROCESSING (No READY status here!) */
-		task->status = TRACKER_PROCESSING_TASK_STATUS_PROCESSING;
-		g_queue_push_head (pool->tasks[TRACKER_PROCESSING_TASK_STATUS_PROCESSING], task);
-
-		trace ("(Processing Pool %s) Flushing single task %p",
-		       G_OBJECT_TYPE_NAME (pool->miner),
-		       task);
-
-		/* And update the new one */
-		if (task->content == CONTENT_SPARQL_STRING) {
-			sparql = task->data.string;
-		} else if (task->content == CONTENT_SPARQL_BUILDER) {
-			sparql = tracker_sparql_builder_get_result (task->data.builder);
-		} else if (task->content == CONTENT_BULK_OPERATION) {
-			operation = bulk_operation_merge_new (task->data.bulk.bulk_operation);
-			operation->tasks = g_list_prepend (NULL, task);
-			bulk_operation_merge_finish (operation);
-
-			if (operation->sparql) {
-				sparql = operation->sparql;
-			}
-		}
+	/* Set status of the task as READY */
+	task->status = TRACKER_PROCESSING_TASK_STATUS_READY;
+	g_queue_push_head (pool->tasks[TRACKER_PROCESSING_TASK_STATUS_READY], task);
 
-		if (sparql) {
-			/* New Request */
-			pool->n_requests++;
-
-			tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
-			                                        sparql,
-			                                        G_PRIORITY_DEFAULT,
-			                                        NULL,
-			                                        tracker_processing_pool_sparql_update_cb,
-			                                        task);
-		}
-
-		if (operation) {
-			bulk_operation_merge_free (operation);
-		}
-
-		return TRUE;
-	} else {
-		GFile *parent;
-		gboolean flushed = FALSE;
+	/* Get parent of this file we're updating/creating */
+	parent = g_file_get_parent (task->file);
 
-		/* Set status of the task as READY */
-		task->status = TRACKER_PROCESSING_TASK_STATUS_READY;
-		g_queue_push_head (pool->tasks[TRACKER_PROCESSING_TASK_STATUS_READY], task);
+	/* Start buffer if not already done */
+	if (!pool->sparql_buffer) {
+		pool->sparql_buffer =
+			g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_processing_task_free);
+		pool->sparql_buffer_start_time = time (NULL);
+	}
 
-		/* Get parent of this file we're updating/creating */
-		parent = g_file_get_parent (task->file);
+	/* Set current parent if not set already */
+	if (!pool->sparql_buffer_current_parent && parent) {
+		pool->sparql_buffer_current_parent = g_object_ref (parent);
+	}
 
-		/* Start buffer if not already done */
-		if (!pool->sparql_buffer) {
-			pool->sparql_buffer =
-				g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_processing_task_free);
-			pool->sparql_buffer_start_time = time (NULL);
-		}
+	trace ("(Processing Pool %s) Pushed READY task %p for file '%s' into array %p",
+	       G_OBJECT_TYPE_NAME (pool->miner),
+	       task,
+	       task->file_uri,
+	       pool->sparql_buffer);
 
-		/* Set current parent if not set already */
-		if (!pool->sparql_buffer_current_parent && parent) {
-			pool->sparql_buffer_current_parent = g_object_ref (parent);
-		}
+	/* Add task to array */
+	g_ptr_array_add (pool->sparql_buffer, task);
 
-		trace ("(Processing Pool %s) Pushed READY task %p for file '%s' into array %p",
-		       G_OBJECT_TYPE_NAME (pool->miner),
-		       task,
-		       task->file_uri,
-		       pool->sparql_buffer);
-
-		/* Add task to array */
-		g_ptr_array_add (pool->sparql_buffer, task);
-
-		/* Flush buffer if:
-		 *  - Last item has no parent
-		 *  - Parent change was detected
-		 *  - Maximum number of READY items reached
-		 *  - Not flushed in the last MAX_SPARQL_BUFFER_TIME seconds
-		 */
-		if (!parent) {
-			tracker_processing_pool_buffer_flush (pool,
-			                                      "File with no parent");
-			flushed = TRUE;
-		} else if (!g_file_equal (parent, pool->sparql_buffer_current_parent)) {
-			tracker_processing_pool_buffer_flush (pool,
-			                                      "Different parent");
-			flushed = TRUE;
-		} else if (tracker_processing_pool_ready_limit_reached (pool)) {
-			tracker_processing_pool_buffer_flush (pool,
-			                                      "Ready limit reached");
-			flushed = TRUE;
-		} else if (time (NULL) - pool->sparql_buffer_start_time > MAX_SPARQL_BUFFER_TIME) {
-			tracker_processing_pool_buffer_flush (pool,
-			                                      "Buffer time reached");
-			flushed = TRUE;
-		}
+	/* Flush buffer if:
+	 *  - Last item has no parent
+	 *  - Parent change was detected
+	 *  - Maximum number of READY items reached
+	 *  - Not flushed in the last MAX_SPARQL_BUFFER_TIME seconds
+	 */
+	if (!parent) {
+		tracker_processing_pool_buffer_flush (pool,
+		                                      "File with no parent");
+		flushed = TRUE;
+	} else if (!g_file_equal (parent, pool->sparql_buffer_current_parent)) {
+		tracker_processing_pool_buffer_flush (pool,
+		                                      "Different parent");
+		flushed = TRUE;
+	} else if (tracker_processing_pool_ready_limit_reached (pool)) {
+		tracker_processing_pool_buffer_flush (pool,
+		                                      "Ready limit reached");
+		flushed = TRUE;
+	} else if (time (NULL) - pool->sparql_buffer_start_time > MAX_SPARQL_BUFFER_TIME) {
+		tracker_processing_pool_buffer_flush (pool,
+		                                      "Buffer time reached");
+		flushed = TRUE;
+	}
 
-		if (parent)
-			g_object_unref (parent);
+	if (parent)
+		g_object_unref (parent);
 
-		return flushed;
-	}
+	return flushed;
 }
 
 void
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index f695718..956bf2c 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -83,7 +83,6 @@ void                   tracker_processing_pool_push_wait_task        (TrackerPro
                                                                       TrackerProcessingTask   *task);
 gboolean               tracker_processing_pool_push_ready_task       (TrackerProcessingPool   *pool,
                                                                       TrackerProcessingTask   *task,
-                                                                      gboolean                 buffer,
                                                                       TrackerProcessingPoolTaskFinishedCallback finished_handler,
                                                                       gpointer                 user_data);
 guint                  tracker_processing_pool_get_n_requests        (TrackerProcessingPool   *pool);
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 5e929e6..082db3f 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -1723,7 +1723,6 @@ item_add_or_update_cb (TrackerMinerFS        *fs,
 		 * and in this case we need to setup queue handlers again */
 		if (!tracker_processing_pool_push_ready_task (fs->private->processing_pool,
 		                                              task,
-		                                              TRUE, /* buffer! */
 		                                              processing_pool_task_finished_cb,
 		                                              fs)) {
 			item_queue_handlers_set_up (fs);
@@ -1864,7 +1863,6 @@ item_remove (TrackerMinerFS *fs,
 	 * and in this case we need to setup queue handlers again */
 	if (!tracker_processing_pool_push_ready_task (fs->private->processing_pool,
 	                                              task,
-	                                              TRUE,
 	                                              processing_pool_task_finished_cb,
 	                                              fs)) {
 		item_queue_handlers_set_up (fs);
@@ -1888,7 +1886,6 @@ item_remove (TrackerMinerFS *fs,
 	 * and in this case we need to setup queue handlers again */
 	if (!tracker_processing_pool_push_ready_task (fs->private->processing_pool,
 	                                              task,
-	                                              TRUE,
 	                                              processing_pool_task_finished_cb,
 	                                              fs)) {
 		item_queue_handlers_set_up (fs);
@@ -2236,7 +2233,6 @@ item_move (TrackerMinerFS *fs,
 	 * and in this case we need to setup queue handlers again */
 	if (!tracker_processing_pool_push_ready_task (fs->private->processing_pool,
 	                                              task,
-	                                              FALSE,
 	                                              processing_pool_task_finished_cb,
 	                                              fs)) {
 		item_queue_handlers_set_up (fs);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]