[tracker] libtracker-miner: New limit for the number of requests sent to the store
- From: Martyn James Russell <mr src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker] libtracker-miner: New limit for the number of requests sent to the store
- Date: Tue, 19 Apr 2011 10:31:31 +0000 (UTC)
commit 08a18cf899d3cf231c5fa2f16b649b6725d083ab
Author: Aleksander Morgado <aleksander lanedo com>
Date: Tue Mar 15 12:28:07 2011 +0100
libtracker-miner: New limit for the number of requests sent to the store
.../tracker-miner-fs-processing-pool.c | 246 +++++++++++++++++---
.../tracker-miner-fs-processing-pool.h | 7 +-
src/libtracker-miner/tracker-miner-fs.c | 21 ++-
3 files changed, 234 insertions(+), 40 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index 9eabaa4..bf22c43 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -186,6 +186,14 @@ struct _TrackerProcessingPool {
/* The processing pool limits */
guint limit[TRACKER_PROCESSING_TASK_STATUS_LAST];
+ /* The current number of requests sent to the store */
+ guint n_requests;
+ /* The limit for number of requests sent to the store */
+ guint limit_n_requests;
+ /* The list of UpdateArrayData items pending to be flushed, blocked
+ * because the maximum number of requests was reached */
+ GQueue *pending_requests;
+
/* SPARQL buffer to pile up several UPDATEs */
GPtrArray *sparql_buffer;
GFile *sparql_buffer_current_parent;
@@ -199,7 +207,9 @@ struct _TrackerProcessingPool {
};
typedef struct {
+ TrackerProcessingPool *pool;
GPtrArray *tasks;
+ GArray *sparql_array;
GArray *error_map;
guint n_bulk_operations;
} UpdateArrayData;
@@ -360,6 +370,10 @@ pool_status_trace_timeout_cb (gpointer data)
l = g_list_next (l);
}
}
+ trace ("(Processing Pool %s) Requests being currently processed: %u (max: %u)",
+ G_OBJECT_TYPE_NAME (pool->miner),
+ pool->n_requests,
+ pool->limit_n_requests);
return TRUE;
}
#endif /* PROCESSING_POOL_ENABLE_TRACE */
@@ -377,6 +391,23 @@ pool_queue_free_foreach (gpointer data,
}
}
+static void
+update_array_data_free (UpdateArrayData *update_data)
+{
+ if (!update_data)
+ return;
+
+ if (update_data->sparql_array) {
+ /* The array contains pointers to strings in the tasks, so no need to
+ * deallocate its pointed contents, just the array itself. */
+ g_array_free (update_data->sparql_array, TRUE);
+ }
+
+ g_ptr_array_free (update_data->tasks, TRUE);
+ g_array_free (update_data->error_map, TRUE);
+ g_slice_free (UpdateArrayData, update_data);
+}
+
void
tracker_processing_pool_free (TrackerProcessingPool *pool)
{
@@ -390,6 +421,11 @@ tracker_processing_pool_free (TrackerProcessingPool *pool)
g_source_remove (pool->timeout_id);
#endif /* PROCESSING_POOL_ENABLE_TRACE */
+ g_queue_foreach (pool->pending_requests,
+ (GFunc)update_array_data_free,
+ NULL);
+ g_queue_free (pool->pending_requests);
+
/* Free any pending task here... shouldn't really
* be any */
for (i = TRACKER_PROCESSING_TASK_STATUS_WAIT;
@@ -410,12 +446,13 @@ tracker_processing_pool_free (TrackerProcessingPool *pool)
}
g_free (pool);
-}
+ }
TrackerProcessingPool *
tracker_processing_pool_new (TrackerMinerFS *miner,
guint limit_wait,
- guint limit_ready)
+ guint limit_ready,
+ guint limit_n_requests)
{
TrackerProcessingPool *pool;
@@ -426,16 +463,21 @@ tracker_processing_pool_new (TrackerMinerFS *miner,
pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] = limit_ready;
/* convenience limit, not really used currently */
pool->limit[TRACKER_PROCESSING_TASK_STATUS_PROCESSING] = G_MAXUINT;
+ pool->limit_n_requests = limit_n_requests;
pool->tasks[TRACKER_PROCESSING_TASK_STATUS_WAIT] = g_queue_new ();
pool->tasks[TRACKER_PROCESSING_TASK_STATUS_READY] = g_queue_new ();
pool->tasks[TRACKER_PROCESSING_TASK_STATUS_PROCESSING] = g_queue_new ();
+ pool->pending_requests = g_queue_new ();
+
g_debug ("Processing pool created with a limit of "
- "%u tasks in WAIT status and "
- "%u tasks in READY status",
+ "%u tasks in WAIT status, "
+ "%u tasks in READY status and "
+ "%u requests",
limit_wait,
- limit_ready);
+ limit_ready,
+ limit_n_requests);
#ifdef PROCESSING_POOL_ENABLE_TRACE
pool->timeout_id = g_timeout_add_seconds (POOL_STATUS_TRACE_TIMEOUT_SECS,
@@ -464,6 +506,15 @@ tracker_processing_pool_set_ready_limit (TrackerProcessingPool *pool,
pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] = limit;
}
+void
+tracker_processing_pool_set_n_requests_limit (TrackerProcessingPool *pool,
+ guint limit)
+{
+ g_message ("Processing pool limit for number of requests set to %u",
+ limit);
+ pool->limit_n_requests = limit;
+}
+
guint
tracker_processing_pool_get_wait_limit (TrackerProcessingPool *pool)
{
@@ -476,6 +527,12 @@ tracker_processing_pool_get_ready_limit (TrackerProcessingPool *pool)
return pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY];
}
+guint
+tracker_processing_pool_get_n_requests_limit (TrackerProcessingPool *pool)
+{
+ return pool->limit_n_requests;
+}
+
gboolean
tracker_processing_pool_wait_limit_reached (TrackerProcessingPool *pool)
{
@@ -492,6 +549,12 @@ tracker_processing_pool_ready_limit_reached (TrackerProcessingPool *pool)
TRUE : FALSE);
}
+static gboolean
+tracker_processing_pool_n_requests_limit_reached (TrackerProcessingPool *pool)
+{
+ return (pool->n_requests >= pool->limit_n_requests ? TRUE : FALSE);
+}
+
TrackerProcessingTask *
tracker_processing_pool_find_task (TrackerProcessingPool *pool,
GFile *file,
@@ -558,16 +621,28 @@ tracker_processing_pool_sparql_update_cb (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
+ TrackerProcessingPool *pool;
TrackerProcessingTask *task;
GError *error = NULL;
-
- tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
+ gboolean flush_next;
task = user_data;
+ pool = task->pool;
+
+ /* If we had reached the limit of requests, flush next as this request is
+ * just finished */
+ flush_next = tracker_processing_pool_n_requests_limit_reached (pool);
+
+ /* Request finished */
+ pool->n_requests--;
- trace ("(Processing Pool) Finished update of task %p for file '%s'",
+ trace ("(Processing Pool) Finished update of task %p for file '%s' "
+ "(%u requests pending)",
task,
- task->file_uri);
+ task->file_uri,
+ pool->n_requests);
+
+ tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
/* Before calling user-provided callback, REMOVE the task from the pool;
* as the user-provided callback may actually modify the pool again */
@@ -579,6 +654,13 @@ tracker_processing_pool_sparql_update_cb (GObject *object,
/* Deallocate unneeded stuff */
tracker_processing_task_free (task);
g_clear_error (&error);
+
+ /* Flush if needed */
+ if (flush_next) {
+ tracker_processing_pool_buffer_flush (pool,
+ "Pool request limit was reached and "
+ "request just finished");
+ }
}
static void
@@ -586,16 +668,28 @@ tracker_processing_pool_sparql_update_array_cb (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
+ TrackerProcessingPool *pool;
GError *global_error = NULL;
GPtrArray *sparql_array_errors;
UpdateArrayData *update_data;
+ gboolean flush_next;
guint i;
/* Get arrays of errors and queries */
update_data = user_data;
+ pool = update_data->pool;
+
+ /* If we had reached the limit of requests, flush next as this request is
+ * just finished */
+ flush_next = tracker_processing_pool_n_requests_limit_reached (pool);
+
+ /* Request finished */
+ pool->n_requests--;
- trace ("(Processing Pool) Finished array-update of tasks %p",
- update_data->tasks);
+ trace ("(Processing Pool) Finished array-update of tasks %p"
+ "(%u requests pending)",
+ update_data->tasks,
+ pool->n_requests);
sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
result,
@@ -646,13 +740,18 @@ tracker_processing_pool_sparql_update_array_cb (GObject *object,
g_ptr_array_unref (sparql_array_errors);
/* Note that tasks are actually deallocated here */
- g_ptr_array_free (update_data->tasks, TRUE);
- g_array_free (update_data->error_map, TRUE);
- g_slice_free (UpdateArrayData, update_data);
+ update_array_data_free (update_data);
if (global_error) {
g_error_free (global_error);
}
+
+ /* Flush if needed */
+ if (flush_next) {
+ tracker_processing_pool_buffer_flush (pool,
+ "Pool request limit was reached and "
+ "UpdateArrayrequest just finished");
+ }
}
static void
@@ -755,6 +854,76 @@ bulk_operation_merge_free (BulkOperationMerge *operation)
g_slice_free (BulkOperationMerge, operation);
}
+static void
+processing_pool_update_array_flush (TrackerProcessingPool *pool,
+ UpdateArrayData *update_data,
+ const gchar *reason)
+{
+ /* This method will flush the UpdateArrayData passed as
+ * argument if:
+ * - The threshold of requests not reached.
+ * - There is no other pending request to flush.
+ *
+ * Otherwise, the passed UpdateArrayData will be queued (if any) and the
+ * first one in the pending queue will get flushed.
+ */
+ UpdateArrayData *to_flush;
+
+ /* If we cannot flush anything or existing pending requests to flush,
+ * just queue the UpdateArrayData if any */
+ if (tracker_processing_pool_n_requests_limit_reached (pool)) {
+ /* If we hit the threshold, there's nothing to flush */
+ to_flush = NULL;
+
+ if (update_data) {
+ trace ("(Processing Pool %s) Queueing array-update of tasks %p with %u items "
+ "(%s, threshold reached)",
+ G_OBJECT_TYPE_NAME (pool->miner),
+ update_data->tasks,
+ update_data->tasks->len,
+ reason ? reason : "Unknown reason");
+ g_queue_push_tail (pool->pending_requests, update_data);
+ }
+ } else if (pool->pending_requests->length > 0) {
+ /* There are other pending tasks to be flushed, we need to queue this one if any. */
+ to_flush = g_queue_pop_head (pool->pending_requests);
+
+ if (update_data) {
+ trace ("(Processing Pool %s) Queueing array-update of tasks %p with %u items "
+ "(%s, pending requests first)",
+ G_OBJECT_TYPE_NAME (pool->miner),
+ update_data->tasks,
+ update_data->tasks->len,
+ reason ? reason : "Unknown reason");
+ g_queue_push_tail (pool->pending_requests, update_data);
+ }
+ } else {
+ /* No pending requests, flush the received UpdateArrayData, if any */
+ to_flush = update_data;
+ }
+
+ /* If nothing to flush, return */
+ if (!to_flush)
+ return;
+
+ trace ("(Processing Pool %s) Flushing array-update of tasks %p with %u items (%s)",
+ G_OBJECT_TYPE_NAME (pool->miner),
+ to_flush->tasks,
+ to_flush->tasks->len,
+ reason ? reason : "Unknown reason");
+
+ /* New Request */
+ pool->n_requests++;
+
+ tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
+ (gchar **) to_flush->sparql_array->data,
+ to_flush->sparql_array->len,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ tracker_processing_pool_sparql_update_array_cb,
+ to_flush);
+}
+
void
tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
const gchar *reason)
@@ -764,11 +933,17 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
UpdateArrayData *update_data;
guint i, j;
- if (!pool->sparql_buffer)
+ /* If no sparql buffer, flush any pending request, if any;
+ * or just return otherwise */
+ if (!pool->sparql_buffer) {
+ processing_pool_update_array_flush (pool, NULL, reason);
return;
+ }
/* Loop buffer and construct array of strings */
sparql_array = g_array_new (FALSE, TRUE, sizeof (gchar *));
+
+ /* TODO: Avoid preallocating this, as we may not have any bulk operation */
bulk_ops = g_ptr_array_new_with_free_func ((GDestroyNotify) bulk_operation_merge_free);
error_map = g_array_new (TRUE, TRUE, sizeof (gint));
@@ -834,24 +1009,21 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
}
}
- trace ("(Processing Pool %s) Flushing array-update of tasks %p with %u items (%s)",
- G_OBJECT_TYPE_NAME (pool->miner),
- pool->sparql_buffer,
- pool->sparql_buffer->len,
- reason ? reason : "Unknown reason");
-
+ /* Create new UpdateArrayData with the contents, which take ownership
+ * of the SPARQL buffer. */
update_data = g_slice_new0 (UpdateArrayData);
+ update_data->pool = pool;
update_data->tasks = pool->sparql_buffer;
update_data->n_bulk_operations = bulk_ops->len;
update_data->error_map = error_map;
+ update_data->sparql_array = sparql_array;
- tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
- (gchar **) sparql_array->data,
- sparql_array->len,
- G_PRIORITY_DEFAULT,
- NULL,
- tracker_processing_pool_sparql_update_array_cb,
- update_data);
+ /* Reset buffer in the pool */
+ pool->sparql_buffer = NULL;
+ pool->sparql_buffer_start_time = 0;
+
+ /* Flush or queue... */
+ processing_pool_update_array_flush (pool, update_data, reason);
/* Clear current parent */
if (pool->sparql_buffer_current_parent) {
@@ -859,15 +1031,7 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
pool->sparql_buffer_current_parent = NULL;
}
- /* Clear temp buffer */
- g_array_free (sparql_array, TRUE);
-
g_ptr_array_free (bulk_ops, TRUE);
-
- pool->sparql_buffer_start_time = 0;
- /* Note the whole buffer is passed to the update_array callback,
- * so no need to free it. */
- pool->sparql_buffer = NULL;
}
gboolean
@@ -898,8 +1062,11 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
task->finished_user_data = user_data;
/* If buffering not requested, OR the limit of READY tasks is actually 1,
- * flush previous buffer (if any) and then the new update */
- if (!buffer || pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1) {
+ * flush previous buffer (if any) and then the new update (only if n_requests limit
+ * not reached, otherwise buffer it) */
+ if (!tracker_processing_pool_n_requests_limit_reached (pool) &&
+ (!buffer ||
+ pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1)) {
BulkOperationMerge *operation = NULL;
const gchar *sparql = NULL;
@@ -936,6 +1103,9 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
}
if (sparql) {
+ /* New Request */
+ pool->n_requests++;
+
tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
sparql,
G_PRIORITY_DEFAULT,
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index 1d73794..f695718 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -61,14 +61,18 @@ void tracker_processing_task_set_bulk_operation (TrackerProces
TrackerProcessingPool *tracker_processing_pool_new (TrackerMinerFS *miner,
guint limit_wait,
- guint limit_process);
+ guint limit_process,
+ guint limit_n_requests);
void tracker_processing_pool_free (TrackerProcessingPool *pool);
void tracker_processing_pool_set_wait_limit (TrackerProcessingPool *pool,
guint limit);
void tracker_processing_pool_set_ready_limit (TrackerProcessingPool *pool,
guint limit);
+void tracker_processing_pool_set_n_requests_limit (TrackerProcessingPool *pool,
+ guint limit);
guint tracker_processing_pool_get_wait_limit (TrackerProcessingPool *pool);
guint tracker_processing_pool_get_ready_limit (TrackerProcessingPool *pool);
+guint tracker_processing_pool_get_n_requests_limit (TrackerProcessingPool *pool);
TrackerProcessingTask *tracker_processing_pool_find_task (TrackerProcessingPool *pool,
GFile *file,
gboolean path_search);
@@ -82,6 +86,7 @@ gboolean tracker_processing_pool_push_ready_task (TrackerPro
gboolean buffer,
TrackerProcessingPoolTaskFinishedCallback finished_handler,
gpointer user_data);
+guint tracker_processing_pool_get_n_requests (TrackerProcessingPool *pool);
guint tracker_processing_pool_get_wait_task_count (TrackerProcessingPool *pool);
guint tracker_processing_pool_get_total_task_count (TrackerProcessingPool *pool);
TrackerProcessingTask *tracker_processing_pool_get_last_wait (TrackerProcessingPool *pool);
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index d223e5b..60d521e 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -97,6 +97,7 @@ static gboolean miner_fs_queues_status_trace_timeout_cb (gpointer data);
/* Default processing pool limits to be set */
#define DEFAULT_WAIT_POOL_LIMIT 1
#define DEFAULT_READY_POOL_LIMIT 1
+#define DEFAULT_N_REQUESTS_POOL_LIMIT 10
/**
* SECTION:tracker-miner-fs
@@ -262,6 +263,7 @@ enum {
PROP_THROTTLE,
PROP_WAIT_POOL_LIMIT,
PROP_READY_POOL_LIMIT,
+ PROP_N_REQUESTS_POOL_LIMIT,
PROP_MTIME_CHECKING,
PROP_INITIAL_CRAWLING
};
@@ -402,6 +404,14 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
1, G_MAXUINT, DEFAULT_READY_POOL_LIMIT,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
g_object_class_install_property (object_class,
+ PROP_N_REQUESTS_POOL_LIMIT,
+ g_param_spec_uint ("processing-pool-requests-limit",
+ "Processing pool limit for number of requests",
+ "Maximum number of SPARQL requests that can be sent "
+ "to the store in parallel.",
+ 1, G_MAXUINT, DEFAULT_N_REQUESTS_POOL_LIMIT,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
+ g_object_class_install_property (object_class,
PROP_MTIME_CHECKING,
g_param_spec_boolean ("mtime-checking",
"Mtime checking",
@@ -666,7 +676,8 @@ tracker_miner_fs_init (TrackerMinerFS *object)
/* Create processing pool */
priv->processing_pool = tracker_processing_pool_new (object,
DEFAULT_WAIT_POOL_LIMIT,
- DEFAULT_READY_POOL_LIMIT);
+ DEFAULT_READY_POOL_LIMIT,
+ DEFAULT_N_REQUESTS_POOL_LIMIT);
/* Set up the crawlers now we have config and hal */
priv->crawler = tracker_crawler_new ();
@@ -825,6 +836,10 @@ fs_set_property (GObject *object,
tracker_processing_pool_set_ready_limit (fs->private->processing_pool,
g_value_get_uint (value));
break;
+ case PROP_N_REQUESTS_POOL_LIMIT:
+ tracker_processing_pool_set_n_requests_limit (fs->private->processing_pool,
+ g_value_get_uint (value));
+ break;
case PROP_MTIME_CHECKING:
fs->private->mtime_checking = g_value_get_boolean (value);
break;
@@ -859,6 +874,10 @@ fs_get_property (GObject *object,
g_value_set_uint (value,
tracker_processing_pool_get_ready_limit (fs->private->processing_pool));
break;
+ case PROP_N_REQUESTS_POOL_LIMIT:
+ g_value_set_uint (value,
+ tracker_processing_pool_get_n_requests_limit (fs->private->processing_pool));
+ break;
case PROP_MTIME_CHECKING:
g_value_set_boolean (value, fs->private->mtime_checking);
break;
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]