[tracker/async-queries: 2/2] tracker-store: Use async queries
- From: Jürg Billeter <juergbi src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/async-queries: 2/2] tracker-store: Use async queries
- Date: Mon, 12 Apr 2010 10:29:16 +0000 (UTC)
commit 93388d4461e0e46e3c15d08b9048923244918d06
Author: Jürg Billeter <j bitron ch>
Date: Mon Apr 12 12:12:41 2010 +0200
tracker-store: Use async queries
This does not yet execute multiple queries at the same time.
src/tracker-store/tracker-store.c | 168 +++++++++++++++++++++++++++----------
1 files changed, 124 insertions(+), 44 deletions(-)
---
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
index 63f7364..2cddde4 100644
--- a/src/tracker-store/tracker-store.c
+++ b/src/tracker-store/tracker-store.c
@@ -35,6 +35,7 @@
#include "tracker-store.h"
#define TRACKER_STORE_TRANSACTION_MAX 4000
+#define TRACKER_STORE_MAX_CONCURRENT_QUERIES 1
typedef struct {
gboolean have_handler, have_sync_handler;
@@ -42,6 +43,7 @@ typedef struct {
guint batch_count;
GQueue *queues[TRACKER_STORE_N_PRIORITIES];
guint handler, sync_handler;
+ guint n_queries_running;
} TrackerStorePrivate;
typedef enum {
@@ -54,6 +56,7 @@ typedef enum {
typedef struct {
TrackerStoreTaskType type;
+ TrackerStorePriority priority;
union {
struct {
gchar *query;
@@ -81,6 +84,8 @@ typedef struct {
static GStaticPrivate private_key = G_STATIC_PRIVATE_INIT;
+static void start_handler (TrackerStorePrivate *private);
+
static void
private_free (gpointer data)
{
@@ -192,6 +197,106 @@ end_batch (TrackerStorePrivate *private)
}
}
+static gboolean
+task_ready (TrackerStorePrivate *private)
+{
+ TrackerStoreTask *task;
+ gint i;
+
+ /* return TRUE if at least one queue is not empty (to keep idle handler running) */
+
+ if (private->n_queries_running >= TRACKER_STORE_MAX_CONCURRENT_QUERIES) {
+ /* maximum number of queries running already, cannot schedule anything else */
+ return FALSE;
+ }
+
+ for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
+ /* check next task of highest priority */
+ task = g_queue_peek_head (private->queues[i]);
+ if (task != NULL) {
+ if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+ /* we know that the maximum number of concurrent queries has not been reached yet,
+ query can be scheduled */
+ return TRUE;
+ } else if (private->n_queries_running == 0) {
+ /* no queries running, updates can be scheduled */
+ return TRUE;
+ } else {
+ /* queries running, wait for them to finish before scheduling updates */
+ return FALSE;
+ }
+ }
+ }
+
+ return FALSE;
+}
+
+static void
+check_handler (TrackerStorePrivate *private)
+{
+ if (task_ready (private)) {
+ /* handler should be running */
+ if (!private->have_handler) {
+ start_handler (private);
+ }
+ } else {
+ /* handler should not be running */
+ if (private->have_handler) {
+ g_source_remove (private->handler);
+ }
+ }
+}
+
+static gboolean
+queue_idle_handler_finish (TrackerStorePrivate *private,
+ TrackerStoreTask *task)
+{
+ g_queue_remove (private->queues[task->priority], task);
+
+ if (task->destroy) {
+ task->destroy (task->user_data);
+ }
+
+ store_task_free (task);
+
+ return task_ready (private);
+}
+
+static void
+query_ready (GObject *source,
+ GAsyncResult *res,
+ void *user_data)
+{
+ GError *error = NULL;
+ TrackerDBResultSet *result_set;
+ TrackerStorePrivate *private;
+ TrackerStoreTask *task;
+
+ private = g_static_private_get (&private_key);
+ g_return_if_fail (private != NULL);
+
+ task = user_data;
+
+ result_set = tracker_sparql_query_execute_finish (TRACKER_SPARQL_QUERY (source), res, &error);
+
+ private->n_queries_running--;
+
+ if (task->callback.query_callback) {
+ task->callback.query_callback (result_set, error, task->user_data);
+ }
+
+ if (result_set) {
+ g_object_unref (result_set);
+ }
+
+ if (error) {
+ g_clear_error (&error);
+ }
+
+ queue_idle_handler_finish (private, task);
+
+ check_handler (private);
+}
static gboolean
queue_idle_handler (gpointer user_data)
@@ -208,22 +313,16 @@ queue_idle_handler (gpointer user_data)
g_return_val_if_fail (task != NULL, FALSE);
if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
- GError *error = NULL;
- TrackerDBResultSet *result_set;
+ TrackerSparqlQuery *sparql_query;
- result_set = tracker_data_query_sparql (task->data.query.query, &error);
+ private->n_queries_running++;
- if (task->callback.query_callback) {
- task->callback.query_callback (result_set, error, task->user_data);
- }
+ sparql_query = tracker_sparql_query_new (task->data.query.query);
+ tracker_sparql_query_execute_async (sparql_query, query_ready, task);
+ g_object_unref (sparql_query);
- if (result_set) {
- g_object_unref (result_set);
- }
-
- if (error) {
- g_clear_error (&error);
- }
+ // suspend idle handler until above quey finished
+ return FALSE;
} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
GError *error = NULL;
@@ -355,22 +454,8 @@ queue_idle_handler (gpointer user_data)
}
}
- out:
- g_queue_pop_head (queue);
-
- if (task->destroy) {
- task->destroy (task->user_data);
- }
-
- store_task_free (task);
-
- /* return TRUE if at least one queue is not empty (to keep idle handler running) */
- for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
- if (!g_queue_is_empty (private->queues[i])) {
- return TRUE;
- }
- }
- return FALSE;
+out:
+ return queue_idle_handler_finish (private, task);
}
static void
@@ -446,6 +531,7 @@ tracker_store_queue_commit (TrackerStoreCommitCallback callback,
task = g_slice_new0 (TrackerStoreTask);
task->type = TRACKER_STORE_TASK_TYPE_COMMIT;
+ task->priority = TRACKER_STORE_PRIORITY_LOW;
task->user_data = user_data;
task->callback.commit_callback = callback;
task->destroy = destroy;
@@ -454,9 +540,7 @@ tracker_store_queue_commit (TrackerStoreCommitCallback callback,
g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_LOW], task);
- if (!private->have_handler) {
- start_handler (private);
- }
+ check_handler (private);
}
void
@@ -477,6 +561,7 @@ tracker_store_sparql_query (const gchar *sparql,
task = g_slice_new0 (TrackerStoreTask);
task->type = TRACKER_STORE_TASK_TYPE_QUERY;
+ task->priority = priority;
task->data.update.query = g_strdup (sparql);
task->user_data = user_data;
task->callback.query_callback = callback;
@@ -485,9 +570,7 @@ tracker_store_sparql_query (const gchar *sparql,
g_queue_push_tail (private->queues[priority], task);
- if (!private->have_handler) {
- start_handler (private);
- }
+ check_handler (private);
}
void
@@ -509,6 +592,7 @@ tracker_store_sparql_update (const gchar *sparql,
task = g_slice_new0 (TrackerStoreTask);
task->type = TRACKER_STORE_TASK_TYPE_UPDATE;
+ task->priority = priority;
task->data.update.query = g_strdup (sparql);
task->data.update.batch = batch;
task->user_data = user_data;
@@ -518,9 +602,7 @@ tracker_store_sparql_update (const gchar *sparql,
g_queue_push_tail (private->queues[priority], task);
- if (!private->have_handler) {
- start_handler (private);
- }
+ check_handler (private);
}
void
@@ -541,6 +623,7 @@ tracker_store_sparql_update_blank (const gchar *sparql,
task = g_slice_new0 (TrackerStoreTask);
task->type = TRACKER_STORE_TASK_TYPE_UPDATE_BLANK;
+ task->priority = priority;
task->data.update.query = g_strdup (sparql);
task->user_data = user_data;
task->callback.update_blank_callback = callback;
@@ -549,9 +632,7 @@ tracker_store_sparql_update_blank (const gchar *sparql,
g_queue_push_tail (private->queues[priority], task);
- if (!private->have_handler) {
- start_handler (private);
- }
+ check_handler (private);
}
void
@@ -570,6 +651,7 @@ tracker_store_queue_turtle_import (GFile *file,
task = g_slice_new0 (TrackerStoreTask);
task->type = TRACKER_STORE_TASK_TYPE_TURTLE;
+ task->priority = TRACKER_STORE_PRIORITY_LOW;
task->data.turtle.path = g_file_get_path (file);
task->user_data = user_data;
task->callback.update_callback = callback;
@@ -577,9 +659,7 @@ tracker_store_queue_turtle_import (GFile *file,
g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_LOW], task);
- if (!private->have_handler) {
- start_handler (private);
- }
+ check_handler (private);
}
guint
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]