[tracker/async-queries: 2/2] tracker-store: Use async queries



commit 93388d4461e0e46e3c15d08b9048923244918d06
Author: Jürg Billeter <j bitron ch>
Date:   Mon Apr 12 12:12:41 2010 +0200

    tracker-store: Use async queries
    
    This does not yet execute multiple queries at the same time.

 src/tracker-store/tracker-store.c |  168 +++++++++++++++++++++++++++----------
 1 files changed, 124 insertions(+), 44 deletions(-)
---
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
index 63f7364..2cddde4 100644
--- a/src/tracker-store/tracker-store.c
+++ b/src/tracker-store/tracker-store.c
@@ -35,6 +35,7 @@
 #include "tracker-store.h"
 
 #define TRACKER_STORE_TRANSACTION_MAX                   4000
+#define TRACKER_STORE_MAX_CONCURRENT_QUERIES               1
 
 typedef struct {
 	gboolean  have_handler, have_sync_handler;
@@ -42,6 +43,7 @@ typedef struct {
 	guint     batch_count;
 	GQueue   *queues[TRACKER_STORE_N_PRIORITIES];
 	guint     handler, sync_handler;
+	guint     n_queries_running;
 } TrackerStorePrivate;
 
 typedef enum {
@@ -54,6 +56,7 @@ typedef enum {
 
 typedef struct {
 	TrackerStoreTaskType  type;
+	TrackerStorePriority  priority;
 	union {
 		struct {
 			gchar                   *query;
@@ -81,6 +84,8 @@ typedef struct {
 
 static GStaticPrivate private_key = G_STATIC_PRIVATE_INIT;
 
+static void start_handler (TrackerStorePrivate *private);
+
 static void
 private_free (gpointer data)
 {
@@ -192,6 +197,106 @@ end_batch (TrackerStorePrivate *private)
 	}
 }
 
+static gboolean
+task_ready (TrackerStorePrivate *private)
+{
+	TrackerStoreTask *task;
+	gint i;
+
+	/* return TRUE if at least one queue is not empty (to keep idle handler running) */
+
+	if (private->n_queries_running >= TRACKER_STORE_MAX_CONCURRENT_QUERIES) {
+		/* maximum number of queries running already, cannot schedule anything else */
+		return FALSE;
+	}
+
+	for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
+		/* check next task of highest priority */
+		task = g_queue_peek_head (private->queues[i]);
+		if (task != NULL) {
+			if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+				/* we know that the maximum number of concurrent queries has not been reached yet,
+				   query can be scheduled */
+				return TRUE;
+			} else if (private->n_queries_running == 0) {
+				/* no queries running, updates can be scheduled */
+				return TRUE;
+			} else {
+				/* queries running, wait for them to finish before scheduling updates */
+				return FALSE;
+			}
+		}
+	}
+
+	return FALSE;
+}
+
+static void
+check_handler (TrackerStorePrivate *private)
+{
+	if (task_ready (private)) {
+		/* handler should be running */
+		if (!private->have_handler) {
+			start_handler (private);
+		}
+	} else {
+		/* handler should not be running */
+		if (private->have_handler) {
+			g_source_remove (private->handler);
+		}
+	}
+}
+
+static gboolean
+queue_idle_handler_finish (TrackerStorePrivate *private,
+                           TrackerStoreTask     *task)
+{
+	g_queue_remove (private->queues[task->priority], task);
+
+	if (task->destroy) {
+		task->destroy (task->user_data);
+	}
+
+	store_task_free (task);
+
+	return task_ready (private);
+}
+
+static void
+query_ready (GObject      *source,
+             GAsyncResult *res,
+             void         *user_data)
+{
+	GError *error = NULL;
+	TrackerDBResultSet *result_set;
+	TrackerStorePrivate *private;
+	TrackerStoreTask    *task;
+
+	private = g_static_private_get (&private_key);
+	g_return_if_fail (private != NULL);
+
+	task = user_data;
+
+	result_set = tracker_sparql_query_execute_finish (TRACKER_SPARQL_QUERY (source), res, &error);
+
+	private->n_queries_running--;
+
+	if (task->callback.query_callback) {
+		task->callback.query_callback (result_set, error, task->user_data);
+	}
+
+	if (result_set) {
+		g_object_unref (result_set);
+	}
+
+	if (error) {
+		g_clear_error (&error);
+	}
+
+	queue_idle_handler_finish (private, task);
+
+	check_handler (private);
+}
 
 static gboolean
 queue_idle_handler (gpointer user_data)
@@ -208,22 +313,16 @@ queue_idle_handler (gpointer user_data)
 	g_return_val_if_fail (task != NULL, FALSE);
 
 	if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
-		GError *error = NULL;
-		TrackerDBResultSet *result_set;
+		TrackerSparqlQuery *sparql_query;
 
-		result_set = tracker_data_query_sparql (task->data.query.query, &error);
+		private->n_queries_running++;
 
-		if (task->callback.query_callback) {
-			task->callback.query_callback (result_set, error, task->user_data);
-		}
+		sparql_query = tracker_sparql_query_new (task->data.query.query);
+		tracker_sparql_query_execute_async (sparql_query, query_ready, task);
+		g_object_unref (sparql_query);
 
-		if (result_set) {
-			g_object_unref (result_set);
-		}
-
-		if (error) {
-			g_clear_error (&error);
-		}
+		// suspend idle handler until above quey finished
+		return FALSE;
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
 		GError *error = NULL;
 
@@ -355,22 +454,8 @@ queue_idle_handler (gpointer user_data)
 		}
 	}
 
- out:
-	g_queue_pop_head (queue);
-
-	if (task->destroy) {
-		task->destroy (task->user_data);
-	}
-
-	store_task_free (task);
-
-	/* return TRUE if at least one queue is not empty (to keep idle handler running) */
-	for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
-		if (!g_queue_is_empty (private->queues[i])) {
-			return TRUE;
-		}
-	}
-	return FALSE;
+out:
+	return queue_idle_handler_finish (private, task);
 }
 
 static void
@@ -446,6 +531,7 @@ tracker_store_queue_commit (TrackerStoreCommitCallback callback,
 
 	task = g_slice_new0 (TrackerStoreTask);
 	task->type = TRACKER_STORE_TASK_TYPE_COMMIT;
+	task->priority = TRACKER_STORE_PRIORITY_LOW;
 	task->user_data = user_data;
 	task->callback.commit_callback = callback;
 	task->destroy = destroy;
@@ -454,9 +540,7 @@ tracker_store_queue_commit (TrackerStoreCommitCallback callback,
 
 	g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_LOW], task);
 
-	if (!private->have_handler) {
-		start_handler (private);
-	}
+	check_handler (private);
 }
 
 void
@@ -477,6 +561,7 @@ tracker_store_sparql_query (const gchar *sparql,
 
 	task = g_slice_new0 (TrackerStoreTask);
 	task->type = TRACKER_STORE_TASK_TYPE_QUERY;
+	task->priority = priority;
 	task->data.update.query = g_strdup (sparql);
 	task->user_data = user_data;
 	task->callback.query_callback = callback;
@@ -485,9 +570,7 @@ tracker_store_sparql_query (const gchar *sparql,
 
 	g_queue_push_tail (private->queues[priority], task);
 
-	if (!private->have_handler) {
-		start_handler (private);
-	}
+	check_handler (private);
 }
 
 void
@@ -509,6 +592,7 @@ tracker_store_sparql_update (const gchar *sparql,
 
 	task = g_slice_new0 (TrackerStoreTask);
 	task->type = TRACKER_STORE_TASK_TYPE_UPDATE;
+	task->priority = priority;
 	task->data.update.query = g_strdup (sparql);
 	task->data.update.batch = batch;
 	task->user_data = user_data;
@@ -518,9 +602,7 @@ tracker_store_sparql_update (const gchar *sparql,
 
 	g_queue_push_tail (private->queues[priority], task);
 
-	if (!private->have_handler) {
-		start_handler (private);
-	}
+	check_handler (private);
 }
 
 void
@@ -541,6 +623,7 @@ tracker_store_sparql_update_blank (const gchar *sparql,
 
 	task = g_slice_new0 (TrackerStoreTask);
 	task->type = TRACKER_STORE_TASK_TYPE_UPDATE_BLANK;
+	task->priority = priority;
 	task->data.update.query = g_strdup (sparql);
 	task->user_data = user_data;
 	task->callback.update_blank_callback = callback;
@@ -549,9 +632,7 @@ tracker_store_sparql_update_blank (const gchar *sparql,
 
 	g_queue_push_tail (private->queues[priority], task);
 
-	if (!private->have_handler) {
-		start_handler (private);
-	}
+	check_handler (private);
 }
 
 void
@@ -570,6 +651,7 @@ tracker_store_queue_turtle_import (GFile                      *file,
 
 	task = g_slice_new0 (TrackerStoreTask);
 	task->type = TRACKER_STORE_TASK_TYPE_TURTLE;
+	task->priority = TRACKER_STORE_PRIORITY_LOW;
 	task->data.turtle.path = g_file_get_path (file);
 	task->user_data = user_data;
 	task->callback.update_callback = callback;
@@ -577,9 +659,7 @@ tracker_store_queue_turtle_import (GFile                      *file,
 
 	g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_LOW], task);
 
-	if (!private->have_handler) {
-		start_handler (private);
-	}
+	check_handler (private);
 }
 
 guint



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]