[tracker/miner-fs-limit-requests: 1/4] libtracker-miner: New limit for the number of requests sent to the store



commit 0c6a89c770c8074c2706b976575fa84a07f236cc
Author: Aleksander Morgado <aleksander lanedo com>
Date:   Tue Mar 15 12:28:07 2011 +0100

    libtracker-miner: New limit for the number of requests sent to the store

 .../tracker-miner-fs-processing-pool.c             |   98 ++++++++++++++++++--
 .../tracker-miner-fs-processing-pool.h             |    7 +-
 src/libtracker-miner/tracker-miner-fs.c            |   21 ++++-
 3 files changed, 114 insertions(+), 12 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index 9eabaa4..5ddec99 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -183,8 +183,13 @@ struct _TrackerProcessingPool {
 
 	/* The tasks currently in WAIT or PROCESS status */
 	GQueue *tasks[TRACKER_PROCESSING_TASK_STATUS_LAST];
+	/* The current number of requests sent to the store */
+	guint n_requests;
+
 	/* The processing pool limits */
 	guint limit[TRACKER_PROCESSING_TASK_STATUS_LAST];
+	/* The limit for number of requests sent to the store */
+	guint limit_n_requests;
 
 	/* SPARQL buffer to pile up several UPDATEs */
 	GPtrArray *sparql_buffer;
@@ -199,6 +204,7 @@ struct _TrackerProcessingPool {
 };
 
 typedef struct {
+	TrackerProcessingPool *pool;
 	GPtrArray *tasks;
 	GArray *error_map;
 	guint n_bulk_operations;
@@ -415,7 +421,8 @@ tracker_processing_pool_free (TrackerProcessingPool *pool)
 TrackerProcessingPool *
 tracker_processing_pool_new (TrackerMinerFS *miner,
                              guint           limit_wait,
-                             guint           limit_ready)
+                             guint           limit_ready,
+                             guint           limit_n_requests)
 {
 	TrackerProcessingPool *pool;
 
@@ -426,16 +433,19 @@ tracker_processing_pool_new (TrackerMinerFS *miner,
 	pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] = limit_ready;
 	/* convenience limit, not really used currently */
 	pool->limit[TRACKER_PROCESSING_TASK_STATUS_PROCESSING] = G_MAXUINT;
+	pool->limit_n_requests = limit_n_requests;
 
 	pool->tasks[TRACKER_PROCESSING_TASK_STATUS_WAIT] = g_queue_new ();
 	pool->tasks[TRACKER_PROCESSING_TASK_STATUS_READY] = g_queue_new ();
 	pool->tasks[TRACKER_PROCESSING_TASK_STATUS_PROCESSING] = g_queue_new ();
 
 	g_debug ("Processing pool created with a limit of "
-	         "%u tasks in WAIT status and "
-	         "%u tasks in READY status",
+	         "%u tasks in WAIT status, "
+	         "%u tasks in READY status and "
+	         "%u requests",
 	         limit_wait,
-	         limit_ready);
+	         limit_ready,
+	         limit_n_requests);
 
 #ifdef PROCESSING_POOL_ENABLE_TRACE
 	pool->timeout_id = g_timeout_add_seconds (POOL_STATUS_TRACE_TIMEOUT_SECS,
@@ -464,6 +474,15 @@ tracker_processing_pool_set_ready_limit (TrackerProcessingPool *pool,
 	pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] = limit;
 }
 
+void
+tracker_processing_pool_set_n_requests_limit (TrackerProcessingPool *pool,
+                                              guint                  limit)
+{
+	g_message ("Processing pool limit for number of requests set to %u",
+	           limit);
+	pool->limit_n_requests = limit;
+}
+
 guint
 tracker_processing_pool_get_wait_limit (TrackerProcessingPool *pool)
 {
@@ -476,6 +495,12 @@ tracker_processing_pool_get_ready_limit (TrackerProcessingPool *pool)
 	return pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY];
 }
 
+guint
+tracker_processing_pool_get_n_requests_limit (TrackerProcessingPool *pool)
+{
+	return pool->limit_n_requests;
+}
+
 gboolean
 tracker_processing_pool_wait_limit_reached (TrackerProcessingPool *pool)
 {
@@ -492,6 +517,12 @@ tracker_processing_pool_ready_limit_reached (TrackerProcessingPool *pool)
 	        TRUE : FALSE);
 }
 
+static gboolean
+tracker_processing_pool_n_requests_limit_reached (TrackerProcessingPool *pool)
+{
+	return (pool->n_requests >= pool->limit_n_requests ? TRUE : FALSE);
+}
+
 TrackerProcessingTask *
 tracker_processing_pool_find_task (TrackerProcessingPool *pool,
                                    GFile                 *file,
@@ -558,17 +589,26 @@ tracker_processing_pool_sparql_update_cb (GObject      *object,
                                           GAsyncResult *result,
                                           gpointer      user_data)
 {
+	TrackerProcessingPool *pool;
 	TrackerProcessingTask *task;
 	GError *error = NULL;
-
-	tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
+	gboolean flush_next;
 
 	task = user_data;
-
+	pool = task->pool;
 	trace ("(Processing Pool) Finished update of task %p for file '%s'",
 	       task,
 	       task->file_uri);
 
+	/* If we had reached the limit of requests, flush next as this request is
+	 * just finished */
+	flush_next = tracker_processing_pool_n_requests_limit_reached (pool);
+
+	/* Request finished */
+	pool->n_requests--;
+
+	tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
+
 	/* Before calling user-provided callback, REMOVE the task from the pool;
 	 * as the user-provided callback may actually modify the pool again */
 	tracker_processing_pool_remove_task (task->pool, task);
@@ -579,6 +619,13 @@ tracker_processing_pool_sparql_update_cb (GObject      *object,
 	/* Deallocate unneeded stuff */
 	tracker_processing_task_free (task);
 	g_clear_error (&error);
+
+	/* Flush if needed */
+	if (flush_next) {
+		tracker_processing_pool_buffer_flush (pool,
+		                                      "Pool request limit was reached and "
+		                                      "request just finished");
+	}
 }
 
 static void
@@ -586,17 +633,26 @@ tracker_processing_pool_sparql_update_array_cb (GObject      *object,
                                                 GAsyncResult *result,
                                                 gpointer      user_data)
 {
+	TrackerProcessingPool *pool;
 	GError *global_error = NULL;
 	GPtrArray *sparql_array_errors;
 	UpdateArrayData *update_data;
+	gboolean flush_next;
 	guint i;
 
 	/* Get arrays of errors and queries */
 	update_data = user_data;
-
+	pool = update_data->pool;
 	trace ("(Processing Pool) Finished array-update of tasks %p",
 	       update_data->tasks);
 
+	/* If we had reached the limit of requests, flush next as this request is
+	 * just finished */
+	flush_next = tracker_processing_pool_n_requests_limit_reached (pool);
+
+	/* Request finished */
+	pool->n_requests--;
+
 	sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
 	                                                                     result,
 	                                                                     &global_error);
@@ -653,6 +709,13 @@ tracker_processing_pool_sparql_update_array_cb (GObject      *object,
 	if (global_error) {
 		g_error_free (global_error);
 	}
+
+	/* Flush if needed */
+	if (flush_next) {
+		tracker_processing_pool_buffer_flush (pool,
+		                                      "Pool request limit was reached and "
+		                                      "UpdateArrayrequest just finished");
+	}
 }
 
 static void
@@ -764,9 +827,14 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 	UpdateArrayData *update_data;
 	guint i, j;
 
+	/* If nothing to flush, return */
 	if (!pool->sparql_buffer)
 		return;
 
+	/* If we cannot flush right now as we reached the limit of requests, return */
+	if (tracker_processing_pool_n_requests_limit_reached (pool))
+		return;
+
 	/* Loop buffer and construct array of strings */
 	sparql_array = g_array_new (FALSE, TRUE, sizeof (gchar *));
 	bulk_ops = g_ptr_array_new_with_free_func ((GDestroyNotify) bulk_operation_merge_free);
@@ -841,10 +909,14 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 	       reason ? reason : "Unknown reason");
 
 	update_data = g_slice_new0 (UpdateArrayData);
+	update_data->pool = pool;
 	update_data->tasks = pool->sparql_buffer;
 	update_data->n_bulk_operations = bulk_ops->len;
 	update_data->error_map = error_map;
 
+	/* New Request */
+	pool->n_requests++;
+
 	tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
 	                                              (gchar **) sparql_array->data,
 	                                              sparql_array->len,
@@ -898,8 +970,11 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
 	task->finished_user_data = user_data;
 
 	/* If buffering not requested, OR the limit of READY tasks is actually 1,
-	 * flush previous buffer (if any) and then the new update */
-	if (!buffer || pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1) {
+	 * flush previous buffer (if any) and then the new update (only if n_requests limit
+	 * not reached, otherwise buffer it) */
+	if (!tracker_processing_pool_n_requests_limit_reached (pool) &&
+	    (!buffer ||
+	     pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1)) {
 		BulkOperationMerge *operation = NULL;
 		const gchar *sparql = NULL;
 
@@ -936,6 +1011,9 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
 		}
 
 		if (sparql) {
+			/* New Request */
+			pool->n_requests++;
+
 			tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
 			                                        sparql,
 			                                        G_PRIORITY_DEFAULT,
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index 1d73794..f695718 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -61,14 +61,18 @@ void                   tracker_processing_task_set_bulk_operation (TrackerProces
 
 TrackerProcessingPool *tracker_processing_pool_new                   (TrackerMinerFS          *miner,
                                                                       guint                    limit_wait,
-                                                                      guint                    limit_process);
+                                                                      guint                    limit_process,
+                                                                      guint                    limit_n_requests);
 void                   tracker_processing_pool_free                  (TrackerProcessingPool   *pool);
 void                   tracker_processing_pool_set_wait_limit        (TrackerProcessingPool   *pool,
                                                                       guint                    limit);
 void                   tracker_processing_pool_set_ready_limit       (TrackerProcessingPool   *pool,
                                                                       guint                    limit);
+void                   tracker_processing_pool_set_n_requests_limit  (TrackerProcessingPool   *pool,
+                                                                      guint                    limit);
 guint                  tracker_processing_pool_get_wait_limit        (TrackerProcessingPool   *pool);
 guint                  tracker_processing_pool_get_ready_limit       (TrackerProcessingPool   *pool);
+guint                  tracker_processing_pool_get_n_requests_limit  (TrackerProcessingPool   *pool);
 TrackerProcessingTask *tracker_processing_pool_find_task             (TrackerProcessingPool   *pool,
                                                                       GFile                   *file,
                                                                       gboolean                 path_search);
@@ -82,6 +86,7 @@ gboolean               tracker_processing_pool_push_ready_task       (TrackerPro
                                                                       gboolean                 buffer,
                                                                       TrackerProcessingPoolTaskFinishedCallback finished_handler,
                                                                       gpointer                 user_data);
+guint                  tracker_processing_pool_get_n_requests        (TrackerProcessingPool   *pool);
 guint                  tracker_processing_pool_get_wait_task_count   (TrackerProcessingPool   *pool);
 guint                  tracker_processing_pool_get_total_task_count  (TrackerProcessingPool   *pool);
 TrackerProcessingTask *tracker_processing_pool_get_last_wait         (TrackerProcessingPool   *pool);
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 33453df..5e929e6 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -97,6 +97,7 @@ static gboolean miner_fs_queues_status_trace_timeout_cb (gpointer data);
 /* Default processing pool limits to be set */
 #define DEFAULT_WAIT_POOL_LIMIT 1
 #define DEFAULT_READY_POOL_LIMIT 1
+#define DEFAULT_N_REQUESTS_POOL_LIMIT 10
 
 /**
  * SECTION:tracker-miner-fs
@@ -262,6 +263,7 @@ enum {
 	PROP_THROTTLE,
 	PROP_WAIT_POOL_LIMIT,
 	PROP_READY_POOL_LIMIT,
+	PROP_N_REQUESTS_POOL_LIMIT,
 	PROP_MTIME_CHECKING,
 	PROP_INITIAL_CRAWLING
 };
@@ -402,6 +404,14 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
 	                                                    1, G_MAXUINT, DEFAULT_READY_POOL_LIMIT,
 	                                                    G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
 	g_object_class_install_property (object_class,
+	                                 PROP_N_REQUESTS_POOL_LIMIT,
+	                                 g_param_spec_uint ("processing-pool-requests-limit",
+	                                                    "Processing pool limit for number of requests",
+	                                                    "Maximum number of SPARQL requests that can be sent "
+	                                                    "to the store in parallel.",
+	                                                    1, G_MAXUINT, DEFAULT_N_REQUESTS_POOL_LIMIT,
+	                                                    G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
+	g_object_class_install_property (object_class,
 	                                 PROP_MTIME_CHECKING,
 	                                 g_param_spec_boolean ("mtime-checking",
 	                                                       "Mtime checking",
@@ -666,7 +676,8 @@ tracker_miner_fs_init (TrackerMinerFS *object)
 	/* Create processing pool */
 	priv->processing_pool = tracker_processing_pool_new (object,
 	                                                     DEFAULT_WAIT_POOL_LIMIT,
-	                                                     DEFAULT_READY_POOL_LIMIT);
+	                                                     DEFAULT_READY_POOL_LIMIT,
+	                                                     DEFAULT_N_REQUESTS_POOL_LIMIT);
 
 	/* Set up the crawlers now we have config and hal */
 	priv->crawler = tracker_crawler_new ();
@@ -825,6 +836,10 @@ fs_set_property (GObject      *object,
 		tracker_processing_pool_set_ready_limit (fs->private->processing_pool,
 		                                         g_value_get_uint (value));
 		break;
+	case PROP_N_REQUESTS_POOL_LIMIT:
+		tracker_processing_pool_set_n_requests_limit (fs->private->processing_pool,
+		                                              g_value_get_uint (value));
+		break;
 	case PROP_MTIME_CHECKING:
 		fs->private->mtime_checking = g_value_get_boolean (value);
 		break;
@@ -859,6 +874,10 @@ fs_get_property (GObject    *object,
 		g_value_set_uint (value,
 		                  tracker_processing_pool_get_ready_limit (fs->private->processing_pool));
 		break;
+	case PROP_N_REQUESTS_POOL_LIMIT:
+		g_value_set_uint (value,
+		                  tracker_processing_pool_get_n_requests_limit (fs->private->processing_pool));
+		break;
 	case PROP_MTIME_CHECKING:
 		g_value_set_boolean (value, fs->private->mtime_checking);
 		break;



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]