[tracker/rss-enclosures] tracker-store: Execute requests in thread pool
- From: Roberto Guido <rguido src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/rss-enclosures] tracker-store: Execute requests in thread pool
- Date: Mon, 3 May 2010 00:38:01 +0000 (UTC)
commit accb46e670fecf794108d9064ee0a0e21925374d
Author: Jürg Billeter <j bitron ch>
Date: Thu Apr 15 17:31:27 2010 +0200
tracker-store: Execute requests in thread pool
Currently limited to one extra thread.
src/libtracker-data/tracker-data-update.c | 9 +-
src/libtracker-data/tracker-data-update.h | 1 +
src/tracker-store/tracker-store.c | 332 +++++++++++++++++++++--------
3 files changed, 248 insertions(+), 94 deletions(-)
---
diff --git a/src/libtracker-data/tracker-data-update.c b/src/libtracker-data/tracker-data-update.c
index 4ea037d..dd057e3 100644
--- a/src/libtracker-data/tracker-data-update.c
+++ b/src/libtracker-data/tracker-data-update.c
@@ -1991,6 +1991,12 @@ tracker_data_commit_db_transaction (void)
g_hash_table_remove_all (update_buffer.resources_by_id);
g_hash_table_remove_all (update_buffer.resource_cache);
+ in_journal_replay = FALSE;
+}
+
+void
+tracker_data_notify_db_transaction (void)
+{
if (commit_callbacks) {
guint n;
for (n = 0; n < commit_callbacks->len; n++) {
@@ -1999,10 +2005,9 @@ tracker_data_commit_db_transaction (void)
delegate->callback (delegate->user_data);
}
}
-
- in_journal_replay = FALSE;
}
+
static void
format_sql_value_as_string (GString *sql,
TrackerProperty *property)
diff --git a/src/libtracker-data/tracker-data-update.h b/src/libtracker-data/tracker-data-update.h
index 2300076..11444f9 100644
--- a/src/libtracker-data/tracker-data-update.h
+++ b/src/libtracker-data/tracker-data-update.h
@@ -83,6 +83,7 @@ void tracker_data_insert_statement_with_string (const gchar *
void tracker_data_begin_db_transaction (void);
void tracker_data_begin_db_transaction_for_replay (time_t time);
void tracker_data_commit_db_transaction (void);
+void tracker_data_notify_db_transaction (void);
void tracker_data_begin_transaction (GError **error);
void tracker_data_commit_transaction (GError **error);
void tracker_data_rollback_transaction (void);
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
index 5455cf6..8f5c823 100644
--- a/src/tracker-store/tracker-store.c
+++ b/src/tracker-store/tracker-store.c
@@ -35,13 +35,18 @@
#include "tracker-store.h"
#define TRACKER_STORE_TRANSACTION_MAX 4000
+#define TRACKER_STORE_MAX_CONCURRENT_QUERIES 1
typedef struct {
- gboolean have_handler, have_sync_handler;
- gboolean batch_mode, start_log;
- guint batch_count;
- GQueue *queues[TRACKER_STORE_N_PRIORITIES];
- guint handler, sync_handler;
+ gboolean have_handler, have_sync_handler;
+ gboolean batch_mode, start_log;
+ guint batch_count;
+ GQueue *queues[TRACKER_STORE_N_PRIORITIES];
+ guint handler, sync_handler;
+ guint n_queries_running;
+ gboolean update_running;
+ GThreadPool *main_pool;
+ GThreadPool *global_pool;
} TrackerStorePrivate;
typedef enum {
@@ -57,17 +62,20 @@ typedef struct {
union {
struct {
gchar *query;
+ TrackerDBResultSet *result_set;
} query;
struct {
gchar *query;
gboolean batch;
gchar *client_id;
+ GPtrArray *blank_nodes;
} update;
struct {
gboolean in_progress;
gchar *path;
} turtle;
} data;
+ GError *error;
gpointer user_data;
GDestroyNotify destroy;
union {
@@ -81,6 +89,8 @@ typedef struct {
static GStaticPrivate private_key = G_STATIC_PRIVATE_INIT;
+static void start_handler (TrackerStorePrivate *private);
+
static void
private_free (gpointer data)
{
@@ -186,47 +196,167 @@ end_batch (TrackerStorePrivate *private)
if (private->batch_mode) {
/* commit pending batch items */
tracker_data_commit_db_transaction ();
+ tracker_data_notify_db_transaction ();
private->batch_mode = FALSE;
private->batch_count = 0;
}
}
-
static gboolean
-queue_idle_handler (gpointer user_data)
+task_ready (TrackerStorePrivate *private)
{
- TrackerStorePrivate *private = user_data;
- GQueue *queue;
- TrackerStoreTask *task = NULL;
- gint i;
+ TrackerStoreTask *task;
+ gint i;
- for (i = 0; task == NULL && i < TRACKER_STORE_N_PRIORITIES; i++) {
- queue = private->queues[i];
- task = g_queue_peek_head (queue);
+ /* return TRUE if at least one queue is not empty (to keep idle handler running) */
+
+ if (private->n_queries_running >= TRACKER_STORE_MAX_CONCURRENT_QUERIES) {
+ /* maximum number of queries running already, cannot schedule anything else */
+ return FALSE;
+ } else if (private->update_running) {
+ /* update running already, cannot schedule anything else */
+ return FALSE;
}
- g_return_val_if_fail (task != NULL, FALSE);
- if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
- GError *error = NULL;
- TrackerDBResultSet *result_set;
+ for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
+ /* check next task of highest priority */
+ task = g_queue_peek_head (private->queues[i]);
+ if (task != NULL) {
+ if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+ /* we know that the maximum number of concurrent queries has not been reached yet,
+ query can be scheduled */
+ return TRUE;
+ } else if (private->n_queries_running == 0) {
+ /* no queries running, updates can be scheduled */
+ return TRUE;
+ } else {
+ /* queries running, wait for them to finish before scheduling updates */
+ return FALSE;
+ }
+ }
+ }
+
+ return FALSE;
+}
+
+static void
+check_handler (TrackerStorePrivate *private)
+{
+ if (task_ready (private)) {
+ /* handler should be running */
+ if (!private->have_handler) {
+ start_handler (private);
+ }
+ } else {
+ /* handler should not be running */
+ if (private->have_handler) {
+ g_source_remove (private->handler);
+ }
+ }
+}
+
+static gboolean
+task_finish_cb (gpointer data)
+{
+ TrackerStorePrivate *private;
+ TrackerStoreTask *task;
- result_set = tracker_data_query_sparql (task->data.query.query, &error);
+ private = g_static_private_get (&private_key);
+ task = data;
+ if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
if (task->callback.query_callback) {
- task->callback.query_callback (result_set, error, task->user_data);
+ task->callback.query_callback (task->data.query.result_set, task->error, task->user_data);
}
- if (result_set) {
- g_object_unref (result_set);
+ if (task->data.query.result_set) {
+ g_object_unref (task->data.query.result_set);
+ task->data.query.result_set = NULL;
}
- if (error) {
- g_clear_error (&error);
+ if (task->error) {
+ g_clear_error (&task->error);
}
+
+ private->n_queries_running--;
} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
- GError *error = NULL;
+ if (!task->data.update.batch && !task->error) {
+ tracker_data_notify_db_transaction ();
+ }
+
+ if (task->callback.update_callback) {
+ task->callback.update_callback (task->error, task->user_data);
+ }
+
+ if (task->error) {
+ g_clear_error (&task->error);
+ }
+
+ private->update_running = FALSE;
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
+ if (!task->data.update.batch && !task->error) {
+ tracker_data_notify_db_transaction ();
+ }
+
+ if (task->callback.update_blank_callback) {
+ if (!task->data.update.blank_nodes) {
+ /* Create empty GPtrArray for dbus-glib to be happy */
+ task->data.update.blank_nodes = g_ptr_array_new ();
+ }
+
+ task->callback.update_blank_callback (task->data.update.blank_nodes, task->error, task->user_data);
+ }
+
+ if (task->data.update.blank_nodes) {
+ gint i;
+
+ for (i = 0; i < task->data.update.blank_nodes->len; i++) {
+ g_ptr_array_foreach (task->data.update.blank_nodes->pdata[i], (GFunc) g_hash_table_unref, NULL);
+ g_ptr_array_free (task->data.update.blank_nodes->pdata[i], TRUE);
+ }
+ g_ptr_array_free (task->data.update.blank_nodes, TRUE);
+ }
+
+ if (task->error) {
+ g_clear_error (&task->error);
+ }
+
+ private->update_running = FALSE;
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+ tracker_data_notify_db_transaction ();
+
+ if (task->callback.commit_callback) {
+ task->callback.commit_callback (task->user_data);
+ }
+
+ private->update_running = FALSE;
+ }
+
+ if (task->destroy) {
+ task->destroy (task->user_data);
+ }
+
+ store_task_free (task);
+
+ check_handler (private);
+
+ return FALSE;
+}
+
+static void
+pool_dispatch_cb (gpointer data,
+ gpointer user_data)
+{
+ TrackerStorePrivate *private;
+ TrackerStoreTask *task;
+ private = user_data;
+ task = data;
+
+ if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+ task->data.query.result_set = tracker_data_query_sparql (task->data.query.query, &task->error);
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
if (task->data.update.batch) {
begin_batch (private);
} else {
@@ -234,10 +364,10 @@ queue_idle_handler (gpointer user_data)
tracker_data_begin_db_transaction ();
}
- tracker_data_update_sparql (task->data.update.query, &error);
+ tracker_data_update_sparql (task->data.update.query, &task->error);
if (task->data.update.batch) {
- if (!error) {
+ if (!task->error) {
private->batch_count++;
if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
end_batch (private);
@@ -246,18 +376,7 @@ queue_idle_handler (gpointer user_data)
} else {
tracker_data_commit_db_transaction ();
}
-
- if (task->callback.update_callback) {
- task->callback.update_callback (error, task->user_data);
- }
-
- if (error) {
- g_clear_error (&error);
- }
} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
- GError *error = NULL;
- GPtrArray *blank_nodes;
-
if (task->data.update.batch) {
begin_batch (private);
} else {
@@ -265,10 +384,10 @@ queue_idle_handler (gpointer user_data)
tracker_data_begin_db_transaction ();
}
- blank_nodes = tracker_data_update_sparql_blank (task->data.update.query, &error);
+ task->data.update.blank_nodes = tracker_data_update_sparql_blank (task->data.update.query, &task->error);
if (task->data.update.batch) {
- if (!error) {
+ if (!task->error) {
private->batch_count++;
if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
end_batch (private);
@@ -278,34 +397,55 @@ queue_idle_handler (gpointer user_data)
tracker_data_commit_db_transaction ();
}
- if (task->callback.update_blank_callback) {
- if (!blank_nodes) {
- /* Create empty GPtrArray for dbus-glib to be happy */
- blank_nodes = g_ptr_array_new ();
- }
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+ end_batch (private);
+ }
- task->callback.update_blank_callback (blank_nodes, error, task->user_data);
- }
+ g_idle_add (task_finish_cb, task);
+}
- if (blank_nodes) {
- gint i;
+static void
+task_run_async (TrackerStorePrivate *private,
+ TrackerStoreTask *task)
+{
+ if (private->n_queries_running > 1) {
+ /* use global pool if main pool might already be occupied */
+ g_thread_pool_push (private->global_pool, task, NULL);
+ } else {
+ /* use main pool for updates and non-parallel queries */
+ g_thread_pool_push (private->main_pool, task, NULL);
+ }
+}
- for (i = 0; i < blank_nodes->len; i++) {
- g_ptr_array_foreach (blank_nodes->pdata[i], (GFunc) g_hash_table_unref, NULL);
- g_ptr_array_free (blank_nodes->pdata[i], TRUE);
- }
- g_ptr_array_free (blank_nodes, TRUE);
- }
+static gboolean
+queue_idle_handler (gpointer user_data)
+{
+ TrackerStorePrivate *private = user_data;
+ GQueue *queue;
+ TrackerStoreTask *task = NULL;
+ gint i;
- if (error) {
- g_clear_error (&error);
- }
- } else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
- end_batch (private);
+ for (i = 0; task == NULL && i < TRACKER_STORE_N_PRIORITIES; i++) {
+ queue = private->queues[i];
+ task = g_queue_peek_head (queue);
+ }
+ g_return_val_if_fail (task != NULL, FALSE);
- if (task->callback.commit_callback) {
- task->callback.commit_callback (task->user_data);
- }
+ if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+ /* pop task now, otherwise further queries won't be scheduled */
+ g_queue_pop_head (queue);
+
+ private->n_queries_running++;
+
+ task_run_async (private, task);
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE ||
+ task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK ||
+ task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+ g_queue_pop_head (queue);
+
+ private->update_running = TRUE;
+
+ task_run_async (private, task);
} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
GError *error = NULL;
static TrackerTurtleReader *turtle_reader = NULL;
@@ -320,6 +460,14 @@ queue_idle_handler (gpointer user_data)
turtle_reader = NULL;
g_clear_error (&error);
+ g_queue_pop_head (queue);
+
+ if (task->destroy) {
+ task->destroy (task->user_data);
+ }
+
+ store_task_free (task);
+
goto out;
}
task->data.turtle.in_progress = TRUE;
@@ -352,25 +500,19 @@ queue_idle_handler (gpointer user_data)
if (error) {
g_clear_error (&error);
}
- }
- }
-
- out:
- g_queue_pop_head (queue);
- if (task->destroy) {
- task->destroy (task->user_data);
- }
+ g_queue_pop_head (queue);
- store_task_free (task);
+ if (task->destroy) {
+ task->destroy (task->user_data);
+ }
- /* return TRUE if at least one queue is not empty (to keep idle handler running) */
- for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
- if (!g_queue_is_empty (private->queues[i])) {
- return TRUE;
+ store_task_free (task);
}
}
- return FALSE;
+
+out:
+ return task_ready (private);
}
static void
@@ -394,6 +536,19 @@ tracker_store_init (void)
private->queues[i] = g_queue_new ();
}
+ private->main_pool = g_thread_pool_new (pool_dispatch_cb,
+ private, 1,
+ TRUE, NULL);
+ private->global_pool = g_thread_pool_new (pool_dispatch_cb,
+ private, TRACKER_STORE_MAX_CONCURRENT_QUERIES,
+ FALSE, NULL);
+
+ /* as the following settings are global for unknown reasons,
+ let's use the same settings as gio, otherwise the used settings
+ are rather random */
+ g_thread_pool_set_max_idle_time (15 * 1000);
+ g_thread_pool_set_max_unused_threads (2);
+
g_static_private_set (&private_key,
private,
private_free);
@@ -408,6 +563,9 @@ tracker_store_shutdown (void)
private = g_static_private_get (&private_key);
g_return_if_fail (private != NULL);
+ g_thread_pool_free (private->global_pool, FALSE, TRUE);
+ g_thread_pool_free (private->main_pool, FALSE, TRUE);
+
if (private->have_handler) {
g_source_remove (private->handler);
private->have_handler = FALSE;
@@ -454,9 +612,7 @@ tracker_store_queue_commit (TrackerStoreCommitCallback callback,
g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_LOW], task);
- if (!private->have_handler) {
- start_handler (private);
- }
+ check_handler (private);
}
void
@@ -485,9 +641,7 @@ tracker_store_sparql_query (const gchar *sparql,
g_queue_push_tail (private->queues[priority], task);
- if (!private->have_handler) {
- start_handler (private);
- }
+ check_handler (private);
}
void
@@ -518,9 +672,7 @@ tracker_store_sparql_update (const gchar *sparql,
g_queue_push_tail (private->queues[priority], task);
- if (!private->have_handler) {
- start_handler (private);
- }
+ check_handler (private);
}
void
@@ -549,9 +701,7 @@ tracker_store_sparql_update_blank (const gchar *sparql,
g_queue_push_tail (private->queues[priority], task);
- if (!private->have_handler) {
- start_handler (private);
- }
+ check_handler (private);
}
void
@@ -577,9 +727,7 @@ tracker_store_queue_turtle_import (GFile *file,
g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_LOW], task);
- if (!private->have_handler) {
- start_handler (private);
- }
+ check_handler (private);
}
guint
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]