[tracker/direct-access: 4/121] tracker-store: Allow running queries while update is running



commit 6e754e8f487f4d68573aa327ac4308806ea7d2a7
Author: Jürg Billeter <j bitron ch>
Date:   Wed Jun 30 16:25:57 2010 +0200

    tracker-store: Allow running queries while update is running
    
    This will break order guarantees. Check with client applications and
    libraries before merging this commit.

 src/tracker-store/tracker-store.c |  391 +++++++++++++------------------------
 1 files changed, 140 insertions(+), 251 deletions(-)
---
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
index 5504008..ae96dcc 100644
--- a/src/tracker-store/tracker-store.c
+++ b/src/tracker-store/tracker-store.c
@@ -41,20 +41,17 @@
 
 #define TRACKER_STORE_MAX_CONCURRENT_QUERIES               2
 
-#define TRACKER_STORE_N_TURTLE_STATEMENTS                 50
-
 #define TRACKER_STORE_QUERY_WATCHDOG_TIMEOUT 10
 #define TRACKER_STORE_MAX_TASK_TIME          30
 
 typedef struct {
-	gboolean     have_handler, have_sync_handler;
 	gboolean     start_log;
-	GQueue      *queues[TRACKER_STORE_N_PRIORITIES];
-	guint        handler, sync_handler;
+	GQueue      *query_queues[TRACKER_STORE_N_PRIORITIES];
+	GQueue      *update_queues[TRACKER_STORE_N_PRIORITIES];
 	guint        n_queries_running;
 	gboolean     update_running;
-	GThreadPool *main_pool;
-	GThreadPool *global_pool;
+	GThreadPool *update_pool;
+	GThreadPool *query_pool;
 	GSList	    *running_tasks;
 	guint	     watchdog_id;
 	guint        max_task_time;
@@ -81,8 +78,6 @@ typedef struct {
 			GPtrArray    *blank_nodes;
 		} update;
 		struct {
-			TrackerTurtleReader *reader;
-			gboolean             in_progress;
 			gchar               *path;
 		} turtle;
 	} data;
@@ -108,8 +103,6 @@ static GStaticPrivate private_key = G_STATIC_PRIVATE_INIT;
 static int main_cpu;
 #endif /* __USE_GNU */
 
-static void start_handler (TrackerStorePrivate *private);
-
 static void
 private_free (gpointer data)
 {
@@ -117,7 +110,8 @@ private_free (gpointer data)
 	gint i;
 
 	for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
-		g_queue_free (private->queues[i]);
+		g_queue_free (private->query_queues[i]);
+		g_queue_free (private->update_queues[i]);
 	}
 	g_free (private);
 }
@@ -126,7 +120,6 @@ static void
 store_task_free (TrackerStoreTask *task)
 {
 	if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
-		g_object_unref (task->data.turtle.reader);
 		g_free (task->data.turtle.path);
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
 		g_free (task->data.query.query);
@@ -144,26 +137,15 @@ store_task_free (TrackerStoreTask *task)
 	g_slice_free (TrackerStoreTask, task);
 }
 
-static gboolean
-process_turtle_file_part (TrackerTurtleReader *reader, GError **error)
+static void
+process_turtle_file (TrackerTurtleReader *reader, GError **error)
 {
-	int i;
 	GError *new_error = NULL;
 
-	/* process 10 statements at once before returning to main loop */
-
-	i = 0;
-
-	/* There is no logical structure in turtle files, so we have no choice
-	 * but fallback to fixed number of statements per transaction to avoid
-	 * blocking tracker-store.
-	 * Real applications should all use SPARQL update instead of turtle
-	 * import to avoid this issue.
-	 */
 	tracker_data_begin_transaction (&new_error);
 	if (new_error) {
 		g_propagate_error (error, new_error);
-		return FALSE;
+		return;
 	}
 
 	while (new_error == NULL && tracker_turtle_reader_next (reader, &new_error)) {
@@ -183,86 +165,19 @@ process_turtle_file_part (TrackerTurtleReader *reader, GError **error)
 			                                           tracker_turtle_reader_get_object (reader),
 			                                           &new_error);
 		}
-
-		i++;
-		if (!new_error && i >= TRACKER_STORE_N_TURTLE_STATEMENTS) {
-			tracker_data_commit_transaction (&new_error);
-			if (new_error) {
-				tracker_data_rollback_transaction ();
-				g_propagate_error (error, new_error);
-				return FALSE;
-			}
-			/* return to main loop */
-			return TRUE;
-		}
 	}
 
 	if (new_error) {
 		tracker_data_rollback_transaction ();
 		g_propagate_error (error, new_error);
-		return FALSE;
+		return;
 	}
 
 	tracker_data_commit_transaction (&new_error);
 	if (new_error) {
 		tracker_data_rollback_transaction ();
 		g_propagate_error (error, new_error);
-		return FALSE;
-	}
-
-	return FALSE;
-}
-
-static gboolean
-task_ready (TrackerStorePrivate *private)
-{
-	TrackerStoreTask *task;
-	gint i;
-
-	/* return TRUE if at least one queue is not empty (to keep idle handler running) */
-
-	if (private->n_queries_running >= TRACKER_STORE_MAX_CONCURRENT_QUERIES) {
-		/* maximum number of queries running already, cannot schedule anything else */
-		return FALSE;
-	} else if (private->update_running) {
-		/* update running already, cannot schedule anything else */
-		return FALSE;
-	}
-
-	for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
-		/* check next task of highest priority */
-		task = g_queue_peek_head (private->queues[i]);
-		if (task != NULL) {
-			if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
-				/* we know that the maximum number of concurrent queries has not been reached yet,
-				   query can be scheduled */
-				return TRUE;
-			} else if (private->n_queries_running == 0) {
-				/* no queries running, updates can be scheduled */
-				return TRUE;
-			} else {
-				/* queries running, wait for them to finish before scheduling updates */
-				return FALSE;
-			}
-		}
-	}
-
-	return FALSE;
-}
-
-static void
-check_handler (TrackerStorePrivate *private)
-{
-	if (task_ready (private)) {
-		/* handler should be running */
-		if (!private->have_handler) {
-			start_handler (private);
-		}
-	} else {
-		/* handler should not be running */
-		if (private->have_handler) {
-			g_source_remove (private->handler);
-		}
+		return;
 	}
 }
 
@@ -316,6 +231,51 @@ check_running_tasks_watchdog (TrackerStorePrivate *private)
 	}
 }
 
+static void
+sched (TrackerStorePrivate *private)
+{
+	GQueue              *queue;
+	TrackerStoreTask    *task;
+	gint                 i;
+
+	while (private->n_queries_running < TRACKER_STORE_MAX_CONCURRENT_QUERIES) {
+		for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
+			queue = private->query_queues[i];
+			task = g_queue_pop_head (queue);
+			if (task != NULL) {
+				break;
+			}
+		}
+		if (task == NULL) {
+			/* no pending query */
+			break;
+		}
+
+		private->running_tasks = g_slist_prepend (private->running_tasks, task);
+		ensure_running_tasks_watchdog (private);
+		private->n_queries_running++;
+
+		task->data.query.timer = g_timer_new ();
+
+		g_thread_pool_push (private->query_pool, task, NULL);
+	}
+
+	if (!private->update_running) {
+		for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
+			queue = private->update_queues[i];
+			task = g_queue_pop_head (queue);
+			if (task != NULL) {
+				break;
+			}
+		}
+		if (task != NULL) {
+			private->update_running = TRUE;
+
+			g_thread_pool_push (private->update_pool, task, NULL);
+		}
+	}
+}
+
 static gboolean
 task_finish_cb (gpointer data)
 {
@@ -386,24 +346,19 @@ task_finish_cb (gpointer data)
 
 		private->update_running = FALSE;
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
-		private->update_running = FALSE;
-
-		if (task->data.turtle.in_progress) {
-			/* Task still in progress */
-			check_handler (private);
-			return FALSE;
-		} else {
-			if (task->callback.turtle_callback) {
-				task->callback.turtle_callback (task->error, task->user_data);
-			}
+		if (!task->error) {
+			tracker_data_notify_transaction ();
+		}
 
-			if (task->error) {
-				g_clear_error (&task->error);
-			}
+		if (task->callback.turtle_callback) {
+			task->callback.turtle_callback (task->error, task->user_data);
+		}
 
-			/* Remove the task now that we're done with it */
-			g_queue_pop_head (private->queues[TRACKER_STORE_PRIORITY_TURTLE]);
+		if (task->error) {
+			g_clear_error (&task->error);
 		}
+
+		private->update_running = FALSE;
 	}
 
 	if (task->destroy) {
@@ -412,7 +367,7 @@ task_finish_cb (gpointer data)
 
 	store_task_free (task);
 
-	check_handler (private);
+	sched (private);
 
 	return FALSE;
 }
@@ -455,89 +410,16 @@ pool_dispatch_cb (gpointer data,
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
 		task->data.update.blank_nodes = tracker_data_update_sparql_blank (task->data.update.query, &task->error);
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
-		if (!task->data.turtle.in_progress) {
-			task->data.turtle.reader = tracker_turtle_reader_new (task->data.turtle.path, &task->error);
-
-			if (task->error) {
-				g_idle_add (task_finish_cb, task);
-				return;
-			}
+		TrackerTurtleReader *reader;
 
-			task->data.turtle.in_progress = TRUE;
-		}
-
-		if (process_turtle_file_part (task->data.turtle.reader, &task->error)) {
-			/* import still in progress */
-		} else {
-			/* import finished */
-			task->data.turtle.in_progress = FALSE;
-		}
+		reader = tracker_turtle_reader_new (task->data.turtle.path, &task->error);
+		process_turtle_file (reader, &task->error);
+		g_object_unref (reader);
 	}
 
 	g_idle_add (task_finish_cb, task);
 }
 
-static void
-task_run_async (TrackerStorePrivate *private,
-                TrackerStoreTask    *task)
-{
-	if (private->n_queries_running > 1) {
-		/* use global pool if main pool might already be occupied */
-		g_thread_pool_push (private->global_pool, task, NULL);
-	} else {
-		/* use main pool for updates and non-parallel queries */
-		g_thread_pool_push (private->main_pool, task, NULL);
-	}
-}
-
-static gboolean
-queue_idle_handler (gpointer user_data)
-{
-	TrackerStorePrivate *private = user_data;
-	GQueue              *queue;
-	TrackerStoreTask    *task = NULL;
-	gint                 i;
-
-	for (i = 0; task == NULL && i < TRACKER_STORE_N_PRIORITIES; i++) {
-		queue = private->queues[i];
-		task = g_queue_peek_head (queue);
-	}
-	g_return_val_if_fail (task != NULL, FALSE);
-
-	if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
-		/* pop task now, otherwise further queries won't be scheduled */
-		g_queue_pop_head (queue);
-
-		private->running_tasks = g_slist_prepend (private->running_tasks, task);
-		ensure_running_tasks_watchdog (private);
-		private->n_queries_running++;
-
-		task->data.query.timer = g_timer_new ();
-
-		task_run_async (private, task);
-	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE ||
-	           task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
-		g_queue_pop_head (queue);
-
-		private->update_running = TRUE;
-
-		task_run_async (private, task);
-	} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
-		private->update_running = TRUE;
-		task_run_async (private, task);
-	}
-
-	return task_ready (private);
-}
-
-static void
-queue_idle_destroy (gpointer user_data)
-{
-	TrackerStorePrivate *private = user_data;
-
-	private->have_handler = FALSE;
-}
-
 void
 tracker_store_init (void)
 {
@@ -557,15 +439,16 @@ tracker_store_init (void)
 	}
 
 	for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
-		private->queues[i] = g_queue_new ();
+		private->query_queues[i] = g_queue_new ();
+		private->update_queues[i] = g_queue_new ();
 	}
 
-	private->main_pool = g_thread_pool_new (pool_dispatch_cb,
-	                                        private, 1,
-	                                        TRUE, NULL);
-	private->global_pool = g_thread_pool_new (pool_dispatch_cb,
-	                                          private, TRACKER_STORE_MAX_CONCURRENT_QUERIES,
-	                                          FALSE, NULL);
+	private->update_pool = g_thread_pool_new (pool_dispatch_cb,
+	                                          private, 1,
+	                                          TRUE, NULL);
+	private->query_pool = g_thread_pool_new (pool_dispatch_cb,
+	                                         private, TRACKER_STORE_MAX_CONCURRENT_QUERIES,
+	                                         FALSE, NULL);
 
 	/* as the following settings are global for unknown reasons,
 	   let's use the same settings as gio, otherwise the used settings
@@ -582,7 +465,7 @@ tracker_store_init (void)
 	pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
 	/* lock main update/query thread to same cpu to improve overall performance
 	   main loop thread is essentially idle during query execution */
-	g_thread_pool_push (private->main_pool, GINT_TO_POINTER (1), NULL);
+	g_thread_pool_push (private->update_pool, GINT_TO_POINTER (1), NULL);
 #endif /* __USE_GNU */
 
 	g_static_private_set (&private_key,
@@ -598,33 +481,12 @@ tracker_store_shutdown (void)
 	private = g_static_private_get (&private_key);
 	g_return_if_fail (private != NULL);
 
-	g_thread_pool_free (private->global_pool, FALSE, TRUE);
-	g_thread_pool_free (private->main_pool, FALSE, TRUE);
-
-	if (private->have_handler) {
-		g_source_remove (private->handler);
-		private->have_handler = FALSE;
-	}
-
-	if (private->have_sync_handler) {
-		g_source_remove (private->sync_handler);
-		private->have_sync_handler = FALSE;
-	}
+	g_thread_pool_free (private->query_pool, FALSE, TRUE);
+	g_thread_pool_free (private->update_pool, FALSE, TRUE);
 
 	g_static_private_set (&private_key, NULL, NULL);
 }
 
-static void
-start_handler (TrackerStorePrivate *private)
-{
-	private->have_handler = TRUE;
-
-	private->handler = g_idle_add_full (G_PRIORITY_LOW,
-	                                    queue_idle_handler,
-	                                    private,
-	                                    queue_idle_destroy);
-}
-
 void
 tracker_store_sparql_query (const gchar *sparql,
                             TrackerStorePriority priority,
@@ -652,9 +514,9 @@ tracker_store_sparql_query (const gchar *sparql,
 	task->destroy = destroy;
 	task->client_id = g_strdup (client_id);
 
-	g_queue_push_tail (private->queues[priority], task);
+	g_queue_push_tail (private->query_queues[priority], task);
 
-	check_handler (private);
+	sched (private);
 }
 
 void
@@ -681,9 +543,9 @@ tracker_store_sparql_update (const gchar *sparql,
 	task->destroy = destroy;
 	task->client_id = g_strdup (client_id);
 
-	g_queue_push_tail (private->queues[priority], task);
+	g_queue_push_tail (private->update_queues[priority], task);
 
-	check_handler (private);
+	sched (private);
 }
 
 void
@@ -710,9 +572,9 @@ tracker_store_sparql_update_blank (const gchar *sparql,
 	task->destroy = destroy;
 	task->client_id = g_strdup (client_id);
 
-	g_queue_push_tail (private->queues[priority], task);
+	g_queue_push_tail (private->update_queues[priority], task);
 
-	check_handler (private);
+	sched (private);
 }
 
 void
@@ -736,9 +598,9 @@ tracker_store_queue_turtle_import (GFile                      *file,
 	task->callback.update_callback = callback;
 	task->destroy = destroy;
 
-	g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_TURTLE], task);
+	g_queue_push_tail (private->update_queues[TRACKER_STORE_PRIORITY_TURTLE], task);
 
-	check_handler (private);
+	sched (private);
 }
 
 guint
@@ -752,11 +614,30 @@ tracker_store_get_queue_size (void)
 	g_return_val_if_fail (private != NULL, 0);
 
 	for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
-		result += g_queue_get_length (private->queues[i]);
+		result += g_queue_get_length (private->query_queues[i]);
+		result += g_queue_get_length (private->update_queues[i]);
 	}
 	return result;
 }
 
+static void
+unreg_task (TrackerStoreTask *task,
+            GError           *error)
+{
+	if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+		task->callback.query.query_callback (NULL, error, task->user_data);
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
+		task->callback.update_callback (error, task->user_data);
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
+		task->callback.update_blank_callback (NULL, error, task->user_data);
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
+		task->callback.turtle_callback (error, task->user_data);
+	}
+	task->destroy (task->user_data);
+
+	store_task_free (task);
+}
+
 void
 tracker_store_unreg_batches (const gchar *client_id)
 {
@@ -782,10 +663,29 @@ tracker_store_unreg_batches (const gchar *client_id)
 	}
 
 	for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
-		queue = private->queues[i];
-
+		queue = private->query_queues[i];
 		list = queue->head;
+		while (list) {
+			TrackerStoreTask *task;
+
+			cur = list;
+			list = list->next;
+			task = cur->data;
+
+			if (task && g_strcmp0 (task->client_id, client_id) == 0) {
+				g_queue_delete_link (queue, cur);
+
+				if (!error) {
+					g_set_error (&error, TRACKER_DBUS_ERROR, 0,
+						     "Client disappeared");
+				}
+
+				unreg_task (task, error);
+			}
+		}
 
+		queue = private->update_queues[i];
+		list = queue->head;
 		while (list) {
 			TrackerStoreTask *task;
 
@@ -793,26 +693,15 @@ tracker_store_unreg_batches (const gchar *client_id)
 			list = list->next;
 			task = cur->data;
 
-			if (task && task->type != TRACKER_STORE_TASK_TYPE_TURTLE) {
-				if (g_strcmp0 (task->client_id, client_id) == 0) {
-					if (!error) {
-						g_set_error (&error, TRACKER_DBUS_ERROR, 0,
-							     "Client disappeared");
-					}
-
-					if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
-						task->callback.query.query_callback (NULL, error, task->user_data);
-					} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
-						task->callback.update_callback (error, task->user_data);
-					} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
-						task->callback.update_blank_callback (NULL, error, task->user_data);
-					}
-					task->destroy (task->user_data);
-
-					g_queue_delete_link (queue, cur);
-
-					store_task_free (task);
+			if (task && g_strcmp0 (task->client_id, client_id) == 0) {
+				g_queue_delete_link (queue, cur);
+
+				if (!error) {
+					g_set_error (&error, TRACKER_DBUS_ERROR, 0,
+						     "Client disappeared");
 				}
+
+				unreg_task (task, error);
 			}
 		}
 	}
@@ -821,5 +710,5 @@ tracker_store_unreg_batches (const gchar *client_id)
 		g_clear_error (&error);
 	}
 
-	check_handler (private);
+	sched (private);
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]