[tracker/queue: 1/3] tracker-store: Add second queue for higher priority requests



commit 945b5f1ab4b179a782ade54568922ec0b2d03cac
Author: Jürg Billeter <j bitron ch>
Date:   Wed Apr 7 14:23:24 2010 +0200

    tracker-store: Add second queue for higher priority requests

 src/tracker-store/tracker-store.c |   90 ++++++++++++++++++++++++------------
 1 files changed, 60 insertions(+), 30 deletions(-)
---
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
index 7c9a9d6..9bf4c09 100644
--- a/src/tracker-store/tracker-store.c
+++ b/src/tracker-store/tracker-store.c
@@ -36,11 +36,13 @@
 
 #define TRACKER_STORE_TRANSACTION_MAX                   4000
 
+#define N_QUEUES                                           2
+
 typedef struct {
 	gboolean  have_handler, have_sync_handler;
 	gboolean  batch_mode, start_log;
 	guint     batch_count;
-	GQueue   *queue;
+	GQueue   *queues[N_QUEUES];
 	guint     handler, sync_handler;
 } TrackerStorePrivate;
 
@@ -77,8 +79,11 @@ static void
 private_free (gpointer data)
 {
 	TrackerStorePrivate *private = data;
+	gint i;
 
-	g_queue_free (private->queue);
+	for (i = 0; i < N_QUEUES; i++) {
+		g_queue_free (private->queues[i]);
+	}
 	g_free (private);
 }
 
@@ -186,9 +191,14 @@ static gboolean
 queue_idle_handler (gpointer user_data)
 {
 	TrackerStorePrivate *private = user_data;
-	TrackerStoreTask    *task;
+	GQueue              *queue;
+	TrackerStoreTask    *task = NULL;
+	gint                 i;
 
-	task = g_queue_peek_head (private->queue);
+	for (i = 0; task == NULL && i < N_QUEUES; i++) {
+		queue = private->queues[i];
+		task = g_queue_peek_head (queue);
+	}
 	g_return_val_if_fail (task != NULL, FALSE);
 
 	if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
@@ -268,7 +278,7 @@ queue_idle_handler (gpointer user_data)
 	}
 
  out:
-	g_queue_pop_head (private->queue);
+	g_queue_pop_head (queue);
 
 	if (task->destroy) {
 		task->destroy (task->user_data);
@@ -276,7 +286,13 @@ queue_idle_handler (gpointer user_data)
 
 	store_task_free (task);
 
-	return !g_queue_is_empty (private->queue);
+	/* return TRUE if at least one queue is not empty (to keep idle handler running) */
+	for (i = 0; i < N_QUEUES; i++) {
+		if (!g_queue_is_empty (private->queues[i])) {
+			return TRUE;
+		}
+	}
+	return FALSE;
 }
 
 static void
@@ -292,10 +308,13 @@ void
 tracker_store_init (void)
 {
 	TrackerStorePrivate *private;
+	gint i;
 
 	private = g_new0 (TrackerStorePrivate, 1);
 
-	private->queue = g_queue_new ();
+	for (i = 0; i < N_QUEUES; i++) {
+		private->queues[i] = g_queue_new ();
+	}
 
 	g_static_private_set (&private_key,
 	                      private,
@@ -355,7 +374,7 @@ tracker_store_queue_commit (TrackerStoreCommitCallback callback,
 	task->data.update.client_id = g_strdup (client_id);
 	task->data.update.query = NULL;
 
-	g_queue_push_tail (private->queue, task);
+	g_queue_push_tail (private->queues[1], task);
 
 	if (!private->have_handler) {
 		start_handler (private);
@@ -386,7 +405,7 @@ tracker_store_queue_sparql_update (const gchar *sparql,
 	task->destroy = destroy;
 	task->data.update.client_id = g_strdup (client_id);
 
-	g_queue_push_tail (private->queue, task);
+	g_queue_push_tail (private->queues[1], task);
 
 	if (!private->have_handler) {
 		start_handler (private);
@@ -414,7 +433,7 @@ tracker_store_queue_turtle_import (GFile                      *file,
 	task->callback.update_callback = callback;
 	task->destroy = destroy;
 
-	g_queue_push_tail (private->queue, task);
+	g_queue_push_tail (private->queues[1], task);
 
 	if (!private->have_handler) {
 		start_handler (private);
@@ -482,11 +501,16 @@ guint
 tracker_store_get_queue_size (void)
 {
 	TrackerStorePrivate *private;
+	gint i;
+	guint result = 0;
 
 	private = g_static_private_get (&private_key);
 	g_return_val_if_fail (private != NULL, 0);
 
-	return g_queue_get_length (private->queue);
+	for (i = 0; i < N_QUEUES; i++) {
+		result += g_queue_get_length (private->queues[i]);
+	}
+	return result;
 }
 
 void
@@ -495,35 +519,41 @@ tracker_store_unreg_batches (const gchar *client_id)
 	TrackerStorePrivate *private;
 	static GError *error = NULL;
 	GList *list, *cur;
+	GQueue *queue;
+	gint i;
 
 	private = g_static_private_get (&private_key);
 	g_return_if_fail (private != NULL);
 
-	list = private->queue->head;
+	for (i = 0; i < N_QUEUES; i++) {
+		queue = private->queues[i];
 
-	while (list) {
-		TrackerStoreTask *task;
+		list = queue->head;
 
-		cur = list;
-		list = list->next;
-		task = cur->data;
+		while (list) {
+			TrackerStoreTask *task;
 
-		if (task && task->type != TRACKER_STORE_TASK_TYPE_TURTLE) {
-			if (g_strcmp0 (task->data.update.client_id, client_id) == 0) {
-				if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
-					if (!error) {
-						g_set_error (&error, TRACKER_DBUS_ERROR, 0,
-						             "Client disappeared");
+			cur = list;
+			list = list->next;
+			task = cur->data;
+
+			if (task && task->type != TRACKER_STORE_TASK_TYPE_TURTLE) {
+				if (g_strcmp0 (task->data.update.client_id, client_id) == 0) {
+					if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
+						if (!error) {
+							g_set_error (&error, TRACKER_DBUS_ERROR, 0,
+								     "Client disappeared");
+						}
+						task->callback.update_callback (error, task->user_data);
+					} else {
+						task->callback.commit_callback (task->user_data);
 					}
-					task->callback.update_callback (error, task->user_data);
-				} else {
-					task->callback.commit_callback (task->user_data);
-				}
-				task->destroy (task->user_data);
+					task->destroy (task->user_data);
 
-				g_queue_delete_link (private->queue, cur);
+					g_queue_delete_link (queue, cur);
 
-				store_task_free (task);
+					store_task_free (task);
+				}
 			}
 		}
 	}



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]