[tracker/miner-fs-limit-requests: 1/3] libtracker-miner: New limit for the number of requests sent to the store
- From: Aleksander Morgado <aleksm src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-fs-limit-requests: 1/3] libtracker-miner: New limit for the number of requests sent to the store
- Date: Tue, 15 Mar 2011 13:44:11 +0000 (UTC)
commit ec892d61be972921ea71d7f963713e43f803bbb3
Author: Aleksander Morgado <aleksander lanedo com>
Date: Tue Mar 15 12:28:07 2011 +0100
libtracker-miner: New limit for the number of requests sent to the store
.../tracker-miner-fs-processing-pool.c | 98 ++++++++++++++++++--
.../tracker-miner-fs-processing-pool.h | 7 +-
src/libtracker-miner/tracker-miner-fs.c | 21 ++++-
3 files changed, 114 insertions(+), 12 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index 9eabaa4..5ddec99 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -183,8 +183,13 @@ struct _TrackerProcessingPool {
/* The tasks currently in WAIT or PROCESS status */
GQueue *tasks[TRACKER_PROCESSING_TASK_STATUS_LAST];
+ /* The current number of requests sent to the store */
+ guint n_requests;
+
/* The processing pool limits */
guint limit[TRACKER_PROCESSING_TASK_STATUS_LAST];
+ /* The limit for number of requests sent to the store */
+ guint limit_n_requests;
/* SPARQL buffer to pile up several UPDATEs */
GPtrArray *sparql_buffer;
@@ -199,6 +204,7 @@ struct _TrackerProcessingPool {
};
typedef struct {
+ TrackerProcessingPool *pool;
GPtrArray *tasks;
GArray *error_map;
guint n_bulk_operations;
@@ -415,7 +421,8 @@ tracker_processing_pool_free (TrackerProcessingPool *pool)
TrackerProcessingPool *
tracker_processing_pool_new (TrackerMinerFS *miner,
guint limit_wait,
- guint limit_ready)
+ guint limit_ready,
+ guint limit_n_requests)
{
TrackerProcessingPool *pool;
@@ -426,16 +433,19 @@ tracker_processing_pool_new (TrackerMinerFS *miner,
pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] = limit_ready;
/* convenience limit, not really used currently */
pool->limit[TRACKER_PROCESSING_TASK_STATUS_PROCESSING] = G_MAXUINT;
+ pool->limit_n_requests = limit_n_requests;
pool->tasks[TRACKER_PROCESSING_TASK_STATUS_WAIT] = g_queue_new ();
pool->tasks[TRACKER_PROCESSING_TASK_STATUS_READY] = g_queue_new ();
pool->tasks[TRACKER_PROCESSING_TASK_STATUS_PROCESSING] = g_queue_new ();
g_debug ("Processing pool created with a limit of "
- "%u tasks in WAIT status and "
- "%u tasks in READY status",
+ "%u tasks in WAIT status, "
+ "%u tasks in READY status and "
+ "%u requests",
limit_wait,
- limit_ready);
+ limit_ready,
+ limit_n_requests);
#ifdef PROCESSING_POOL_ENABLE_TRACE
pool->timeout_id = g_timeout_add_seconds (POOL_STATUS_TRACE_TIMEOUT_SECS,
@@ -464,6 +474,15 @@ tracker_processing_pool_set_ready_limit (TrackerProcessingPool *pool,
pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] = limit;
}
+void
+tracker_processing_pool_set_n_requests_limit (TrackerProcessingPool *pool,
+ guint limit)
+{
+ g_message ("Processing pool limit for number of requests set to %u",
+ limit);
+ pool->limit_n_requests = limit;
+}
+
guint
tracker_processing_pool_get_wait_limit (TrackerProcessingPool *pool)
{
@@ -476,6 +495,12 @@ tracker_processing_pool_get_ready_limit (TrackerProcessingPool *pool)
return pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY];
}
+guint
+tracker_processing_pool_get_n_requests_limit (TrackerProcessingPool *pool)
+{
+ return pool->limit_n_requests;
+}
+
gboolean
tracker_processing_pool_wait_limit_reached (TrackerProcessingPool *pool)
{
@@ -492,6 +517,12 @@ tracker_processing_pool_ready_limit_reached (TrackerProcessingPool *pool)
TRUE : FALSE);
}
+static gboolean
+tracker_processing_pool_n_requests_limit_reached (TrackerProcessingPool *pool)
+{
+ return (pool->n_requests >= pool->limit_n_requests ? TRUE : FALSE);
+}
+
TrackerProcessingTask *
tracker_processing_pool_find_task (TrackerProcessingPool *pool,
GFile *file,
@@ -558,17 +589,26 @@ tracker_processing_pool_sparql_update_cb (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
+ TrackerProcessingPool *pool;
TrackerProcessingTask *task;
GError *error = NULL;
-
- tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
+ gboolean flush_next;
task = user_data;
-
+ pool = task->pool;
trace ("(Processing Pool) Finished update of task %p for file '%s'",
task,
task->file_uri);
+ /* If we had reached the limit of requests, flush next as this request is
+ * just finished */
+ flush_next = tracker_processing_pool_n_requests_limit_reached (pool);
+
+ /* Request finished */
+ pool->n_requests--;
+
+ tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
+
/* Before calling user-provided callback, REMOVE the task from the pool;
* as the user-provided callback may actually modify the pool again */
tracker_processing_pool_remove_task (task->pool, task);
@@ -579,6 +619,13 @@ tracker_processing_pool_sparql_update_cb (GObject *object,
/* Deallocate unneeded stuff */
tracker_processing_task_free (task);
g_clear_error (&error);
+
+ /* Flush if needed */
+ if (flush_next) {
+ tracker_processing_pool_buffer_flush (pool,
+ "Pool request limit was reached and "
+ "request just finished");
+ }
}
static void
@@ -586,17 +633,26 @@ tracker_processing_pool_sparql_update_array_cb (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
+ TrackerProcessingPool *pool;
GError *global_error = NULL;
GPtrArray *sparql_array_errors;
UpdateArrayData *update_data;
+ gboolean flush_next;
guint i;
/* Get arrays of errors and queries */
update_data = user_data;
-
+ pool = update_data->pool;
trace ("(Processing Pool) Finished array-update of tasks %p",
update_data->tasks);
+ /* If we had reached the limit of requests, flush next as this request is
+ * just finished */
+ flush_next = tracker_processing_pool_n_requests_limit_reached (pool);
+
+ /* Request finished */
+ pool->n_requests--;
+
sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
result,
&global_error);
@@ -653,6 +709,13 @@ tracker_processing_pool_sparql_update_array_cb (GObject *object,
if (global_error) {
g_error_free (global_error);
}
+
+ /* Flush if needed */
+ if (flush_next) {
+ tracker_processing_pool_buffer_flush (pool,
+ "Pool request limit was reached and "
+ "UpdateArrayrequest just finished");
+ }
}
static void
@@ -764,9 +827,14 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
UpdateArrayData *update_data;
guint i, j;
+ /* If nothing to flush, return */
if (!pool->sparql_buffer)
return;
+ /* If we cannot flush right now as we reached the limit of requests, return */
+ if (tracker_processing_pool_n_requests_limit_reached (pool))
+ return;
+
/* Loop buffer and construct array of strings */
sparql_array = g_array_new (FALSE, TRUE, sizeof (gchar *));
bulk_ops = g_ptr_array_new_with_free_func ((GDestroyNotify) bulk_operation_merge_free);
@@ -841,10 +909,14 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
reason ? reason : "Unknown reason");
update_data = g_slice_new0 (UpdateArrayData);
+ update_data->pool = pool;
update_data->tasks = pool->sparql_buffer;
update_data->n_bulk_operations = bulk_ops->len;
update_data->error_map = error_map;
+ /* New Request */
+ pool->n_requests++;
+
tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
(gchar **) sparql_array->data,
sparql_array->len,
@@ -898,8 +970,11 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
task->finished_user_data = user_data;
/* If buffering not requested, OR the limit of READY tasks is actually 1,
- * flush previous buffer (if any) and then the new update */
- if (!buffer || pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1) {
+ * flush previous buffer (if any) and then the new update (only if n_requests limit
+ * not reached, otherwise buffer it) */
+ if (!tracker_processing_pool_n_requests_limit_reached (pool) &&
+ (!buffer ||
+ pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1)) {
BulkOperationMerge *operation = NULL;
const gchar *sparql = NULL;
@@ -936,6 +1011,9 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
}
if (sparql) {
+ /* New Request */
+ pool->n_requests++;
+
tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
sparql,
G_PRIORITY_DEFAULT,
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index 1d73794..f695718 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -61,14 +61,18 @@ void tracker_processing_task_set_bulk_operation (TrackerProces
TrackerProcessingPool *tracker_processing_pool_new (TrackerMinerFS *miner,
guint limit_wait,
- guint limit_process);
+ guint limit_process,
+ guint limit_n_requests);
void tracker_processing_pool_free (TrackerProcessingPool *pool);
void tracker_processing_pool_set_wait_limit (TrackerProcessingPool *pool,
guint limit);
void tracker_processing_pool_set_ready_limit (TrackerProcessingPool *pool,
guint limit);
+void tracker_processing_pool_set_n_requests_limit (TrackerProcessingPool *pool,
+ guint limit);
guint tracker_processing_pool_get_wait_limit (TrackerProcessingPool *pool);
guint tracker_processing_pool_get_ready_limit (TrackerProcessingPool *pool);
+guint tracker_processing_pool_get_n_requests_limit (TrackerProcessingPool *pool);
TrackerProcessingTask *tracker_processing_pool_find_task (TrackerProcessingPool *pool,
GFile *file,
gboolean path_search);
@@ -82,6 +86,7 @@ gboolean tracker_processing_pool_push_ready_task (TrackerPro
gboolean buffer,
TrackerProcessingPoolTaskFinishedCallback finished_handler,
gpointer user_data);
+guint tracker_processing_pool_get_n_requests (TrackerProcessingPool *pool);
guint tracker_processing_pool_get_wait_task_count (TrackerProcessingPool *pool);
guint tracker_processing_pool_get_total_task_count (TrackerProcessingPool *pool);
TrackerProcessingTask *tracker_processing_pool_get_last_wait (TrackerProcessingPool *pool);
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 33453df..5e929e6 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -97,6 +97,7 @@ static gboolean miner_fs_queues_status_trace_timeout_cb (gpointer data);
/* Default processing pool limits to be set */
#define DEFAULT_WAIT_POOL_LIMIT 1
#define DEFAULT_READY_POOL_LIMIT 1
+#define DEFAULT_N_REQUESTS_POOL_LIMIT 10
/**
* SECTION:tracker-miner-fs
@@ -262,6 +263,7 @@ enum {
PROP_THROTTLE,
PROP_WAIT_POOL_LIMIT,
PROP_READY_POOL_LIMIT,
+ PROP_N_REQUESTS_POOL_LIMIT,
PROP_MTIME_CHECKING,
PROP_INITIAL_CRAWLING
};
@@ -402,6 +404,14 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
1, G_MAXUINT, DEFAULT_READY_POOL_LIMIT,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
g_object_class_install_property (object_class,
+ PROP_N_REQUESTS_POOL_LIMIT,
+ g_param_spec_uint ("processing-pool-requests-limit",
+ "Processing pool limit for number of requests",
+ "Maximum number of SPARQL requests that can be sent "
+ "to the store in parallel.",
+ 1, G_MAXUINT, DEFAULT_N_REQUESTS_POOL_LIMIT,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
+ g_object_class_install_property (object_class,
PROP_MTIME_CHECKING,
g_param_spec_boolean ("mtime-checking",
"Mtime checking",
@@ -666,7 +676,8 @@ tracker_miner_fs_init (TrackerMinerFS *object)
/* Create processing pool */
priv->processing_pool = tracker_processing_pool_new (object,
DEFAULT_WAIT_POOL_LIMIT,
- DEFAULT_READY_POOL_LIMIT);
+ DEFAULT_READY_POOL_LIMIT,
+ DEFAULT_N_REQUESTS_POOL_LIMIT);
/* Set up the crawlers now we have config and hal */
priv->crawler = tracker_crawler_new ();
@@ -825,6 +836,10 @@ fs_set_property (GObject *object,
tracker_processing_pool_set_ready_limit (fs->private->processing_pool,
g_value_get_uint (value));
break;
+ case PROP_N_REQUESTS_POOL_LIMIT:
+ tracker_processing_pool_set_n_requests_limit (fs->private->processing_pool,
+ g_value_get_uint (value));
+ break;
case PROP_MTIME_CHECKING:
fs->private->mtime_checking = g_value_get_boolean (value);
break;
@@ -859,6 +874,10 @@ fs_get_property (GObject *object,
g_value_set_uint (value,
tracker_processing_pool_get_ready_limit (fs->private->processing_pool));
break;
+ case PROP_N_REQUESTS_POOL_LIMIT:
+ g_value_set_uint (value,
+ tracker_processing_pool_get_n_requests_limit (fs->private->processing_pool));
+ break;
case PROP_MTIME_CHECKING:
g_value_set_boolean (value, fs->private->mtime_checking);
break;
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]