[tracker/rss-enclosures] tracker-store: Execute requests in thread pool



commit accb46e670fecf794108d9064ee0a0e21925374d
Author: Jürg Billeter <j bitron ch>
Date:   Thu Apr 15 17:31:27 2010 +0200

    tracker-store: Execute requests in thread pool
    
    Currently limited to one extra thread.

 src/libtracker-data/tracker-data-update.c |    9 +-
 src/libtracker-data/tracker-data-update.h |    1 +
 src/tracker-store/tracker-store.c         |  332 +++++++++++++++++++++--------
 3 files changed, 248 insertions(+), 94 deletions(-)
---
diff --git a/src/libtracker-data/tracker-data-update.c b/src/libtracker-data/tracker-data-update.c
index 4ea037d..dd057e3 100644
--- a/src/libtracker-data/tracker-data-update.c
+++ b/src/libtracker-data/tracker-data-update.c
@@ -1991,6 +1991,12 @@ tracker_data_commit_db_transaction (void)
 	g_hash_table_remove_all (update_buffer.resources_by_id);
 	g_hash_table_remove_all (update_buffer.resource_cache);
 
+	in_journal_replay = FALSE;
+}
+
+void
+tracker_data_notify_db_transaction (void)
+{
 	if (commit_callbacks) {
 		guint n;
 		for (n = 0; n < commit_callbacks->len; n++) {
@@ -1999,10 +2005,9 @@ tracker_data_commit_db_transaction (void)
 			delegate->callback (delegate->user_data);
 		}
 	}
-
-	in_journal_replay = FALSE;
 }
 
+
 static void
 format_sql_value_as_string (GString         *sql,
                             TrackerProperty *property)
diff --git a/src/libtracker-data/tracker-data-update.h b/src/libtracker-data/tracker-data-update.h
index 2300076..11444f9 100644
--- a/src/libtracker-data/tracker-data-update.h
+++ b/src/libtracker-data/tracker-data-update.h
@@ -83,6 +83,7 @@ void     tracker_data_insert_statement_with_string  (const gchar               *
 void     tracker_data_begin_db_transaction          (void);
 void     tracker_data_begin_db_transaction_for_replay (time_t                   time);
 void     tracker_data_commit_db_transaction         (void);
+void     tracker_data_notify_db_transaction         (void);
 void     tracker_data_begin_transaction             (GError                   **error);
 void     tracker_data_commit_transaction            (GError                   **error);
 void     tracker_data_rollback_transaction          (void);
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
index 5455cf6..8f5c823 100644
--- a/src/tracker-store/tracker-store.c
+++ b/src/tracker-store/tracker-store.c
@@ -35,13 +35,18 @@
 #include "tracker-store.h"
 
 #define TRACKER_STORE_TRANSACTION_MAX                   4000
+#define TRACKER_STORE_MAX_CONCURRENT_QUERIES               1
 
 typedef struct {
-	gboolean  have_handler, have_sync_handler;
-	gboolean  batch_mode, start_log;
-	guint     batch_count;
-	GQueue   *queues[TRACKER_STORE_N_PRIORITIES];
-	guint     handler, sync_handler;
+	gboolean     have_handler, have_sync_handler;
+	gboolean     batch_mode, start_log;
+	guint        batch_count;
+	GQueue      *queues[TRACKER_STORE_N_PRIORITIES];
+	guint        handler, sync_handler;
+	guint        n_queries_running;
+	gboolean     update_running;
+	GThreadPool *main_pool;
+	GThreadPool *global_pool;
 } TrackerStorePrivate;
 
 typedef enum {
@@ -57,17 +62,20 @@ typedef struct {
 	union {
 		struct {
 			gchar                   *query;
+			TrackerDBResultSet      *result_set;
 		} query;
 		struct {
 			gchar                   *query;
 			gboolean                 batch;
 			gchar                   *client_id;
+			GPtrArray               *blank_nodes;
 		} update;
 		struct {
 			gboolean           in_progress;
 			gchar             *path;
 		} turtle;
 	} data;
+	GError                    *error;
 	gpointer                   user_data;
 	GDestroyNotify             destroy;
 	union {
@@ -81,6 +89,8 @@ typedef struct {
 
 static GStaticPrivate private_key = G_STATIC_PRIVATE_INIT;
 
+static void start_handler (TrackerStorePrivate *private);
+
 static void
 private_free (gpointer data)
 {
@@ -186,47 +196,167 @@ end_batch (TrackerStorePrivate *private)
 	if (private->batch_mode) {
 		/* commit pending batch items */
 		tracker_data_commit_db_transaction ();
+		tracker_data_notify_db_transaction ();
 
 		private->batch_mode = FALSE;
 		private->batch_count = 0;
 	}
 }
 
-
 static gboolean
-queue_idle_handler (gpointer user_data)
+task_ready (TrackerStorePrivate *private)
 {
-	TrackerStorePrivate *private = user_data;
-	GQueue              *queue;
-	TrackerStoreTask    *task = NULL;
-	gint                 i;
+	TrackerStoreTask *task;
+	gint i;
 
-	for (i = 0; task == NULL && i < TRACKER_STORE_N_PRIORITIES; i++) {
-		queue = private->queues[i];
-		task = g_queue_peek_head (queue);
+	/* return TRUE if at least one queue is not empty (to keep idle handler running) */
+
+	if (private->n_queries_running >= TRACKER_STORE_MAX_CONCURRENT_QUERIES) {
+		/* maximum number of queries running already, cannot schedule anything else */
+		return FALSE;
+	} else if (private->update_running) {
+		/* update running already, cannot schedule anything else */
+		return FALSE;
 	}
-	g_return_val_if_fail (task != NULL, FALSE);
 
-	if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
-		GError *error = NULL;
-		TrackerDBResultSet *result_set;
+	for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
+		/* check next task of highest priority */
+		task = g_queue_peek_head (private->queues[i]);
+		if (task != NULL) {
+			if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+				/* we know that the maximum number of concurrent queries has not been reached yet,
+				   query can be scheduled */
+				return TRUE;
+			} else if (private->n_queries_running == 0) {
+				/* no queries running, updates can be scheduled */
+				return TRUE;
+			} else {
+				/* queries running, wait for them to finish before scheduling updates */
+				return FALSE;
+			}
+		}
+	}
+
+	return FALSE;
+}
+
+static void
+check_handler (TrackerStorePrivate *private)
+{
+	if (task_ready (private)) {
+		/* handler should be running */
+		if (!private->have_handler) {
+			start_handler (private);
+		}
+	} else {
+		/* handler should not be running */
+		if (private->have_handler) {
+			g_source_remove (private->handler);
+		}
+	}
+}
+
+static gboolean
+task_finish_cb (gpointer data)
+{
+	TrackerStorePrivate *private;
+	TrackerStoreTask *task;
 
-		result_set = tracker_data_query_sparql (task->data.query.query, &error);
+	private = g_static_private_get (&private_key);
+	task = data;
 
+	if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
 		if (task->callback.query_callback) {
-			task->callback.query_callback (result_set, error, task->user_data);
+			task->callback.query_callback (task->data.query.result_set, task->error, task->user_data);
 		}
 
-		if (result_set) {
-			g_object_unref (result_set);
+		if (task->data.query.result_set) {
+			g_object_unref (task->data.query.result_set);
+			task->data.query.result_set = NULL;
 		}
 
-		if (error) {
-			g_clear_error (&error);
+		if (task->error) {
+			g_clear_error (&task->error);
 		}
+
+		private->n_queries_running--;
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
-		GError *error = NULL;
+		if (!task->data.update.batch && !task->error) {
+			tracker_data_notify_db_transaction ();
+		}
+
+		if (task->callback.update_callback) {
+			task->callback.update_callback (task->error, task->user_data);
+		}
+
+		if (task->error) {
+			g_clear_error (&task->error);
+		}
+
+		private->update_running = FALSE;
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
+		if (!task->data.update.batch && !task->error) {
+			tracker_data_notify_db_transaction ();
+		}
+
+		if (task->callback.update_blank_callback) {
+			if (!task->data.update.blank_nodes) {
+				/* Create empty GPtrArray for dbus-glib to be happy */
+				task->data.update.blank_nodes = g_ptr_array_new ();
+			}
+
+			task->callback.update_blank_callback (task->data.update.blank_nodes, task->error, task->user_data);
+		}
+
+		if (task->data.update.blank_nodes) {
+			gint i;
+
+			for (i = 0; i < task->data.update.blank_nodes->len; i++) {
+				g_ptr_array_foreach (task->data.update.blank_nodes->pdata[i], (GFunc) g_hash_table_unref, NULL);
+				g_ptr_array_free (task->data.update.blank_nodes->pdata[i], TRUE);
+			}
+			g_ptr_array_free (task->data.update.blank_nodes, TRUE);
+		}
+
+		if (task->error) {
+			g_clear_error (&task->error);
+		}
+
+		private->update_running = FALSE;
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+		tracker_data_notify_db_transaction ();
+
+		if (task->callback.commit_callback) {
+			task->callback.commit_callback (task->user_data);
+		}
+
+		private->update_running = FALSE;
+	}
+
+	if (task->destroy) {
+		task->destroy (task->user_data);
+	}
+
+	store_task_free (task);
+
+	check_handler (private);
+
+	return FALSE;
+}
+
+static void
+pool_dispatch_cb (gpointer data,
+                  gpointer user_data)
+{
+	TrackerStorePrivate *private;
+	TrackerStoreTask *task;
 
+	private = user_data;
+	task = data;
+
+	if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+		task->data.query.result_set = tracker_data_query_sparql (task->data.query.query, &task->error);
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
 		if (task->data.update.batch) {
 			begin_batch (private);
 		} else {
@@ -234,10 +364,10 @@ queue_idle_handler (gpointer user_data)
 			tracker_data_begin_db_transaction ();
 		}
 
-		tracker_data_update_sparql (task->data.update.query, &error);
+		tracker_data_update_sparql (task->data.update.query, &task->error);
 
 		if (task->data.update.batch) {
-			if (!error) {
+			if (!task->error) {
 				private->batch_count++;
 				if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
 					end_batch (private);
@@ -246,18 +376,7 @@ queue_idle_handler (gpointer user_data)
 		} else {
 			tracker_data_commit_db_transaction ();
 		}
-
-		if (task->callback.update_callback) {
-			task->callback.update_callback (error, task->user_data);
-		}
-
-		if (error) {
-			g_clear_error (&error);
-		}
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
-		GError *error = NULL;
-		GPtrArray *blank_nodes;
-
 		if (task->data.update.batch) {
 			begin_batch (private);
 		} else {
@@ -265,10 +384,10 @@ queue_idle_handler (gpointer user_data)
 			tracker_data_begin_db_transaction ();
 		}
 
-		blank_nodes = tracker_data_update_sparql_blank (task->data.update.query, &error);
+		task->data.update.blank_nodes = tracker_data_update_sparql_blank (task->data.update.query, &task->error);
 
 		if (task->data.update.batch) {
-			if (!error) {
+			if (!task->error) {
 				private->batch_count++;
 				if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
 					end_batch (private);
@@ -278,34 +397,55 @@ queue_idle_handler (gpointer user_data)
 			tracker_data_commit_db_transaction ();
 		}
 
-		if (task->callback.update_blank_callback) {
-			if (!blank_nodes) {
-				/* Create empty GPtrArray for dbus-glib to be happy */
-				blank_nodes = g_ptr_array_new ();
-			}
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+		end_batch (private);
+	}
 
-			task->callback.update_blank_callback (blank_nodes, error, task->user_data);
-		}
+	g_idle_add (task_finish_cb, task);
+}
 
-		if (blank_nodes) {
-			gint i;
+static void
+task_run_async (TrackerStorePrivate *private,
+                TrackerStoreTask    *task)
+{
+	if (private->n_queries_running > 1) {
+		/* use global pool if main pool might already be occupied */
+		g_thread_pool_push (private->global_pool, task, NULL);
+	} else {
+		/* use main pool for updates and non-parallel queries */
+		g_thread_pool_push (private->main_pool, task, NULL);
+	}
+}
 
-			for (i = 0; i < blank_nodes->len; i++) {
-				g_ptr_array_foreach (blank_nodes->pdata[i], (GFunc) g_hash_table_unref, NULL);
-				g_ptr_array_free (blank_nodes->pdata[i], TRUE);
-			}
-			g_ptr_array_free (blank_nodes, TRUE);
-		}
+static gboolean
+queue_idle_handler (gpointer user_data)
+{
+	TrackerStorePrivate *private = user_data;
+	GQueue              *queue;
+	TrackerStoreTask    *task = NULL;
+	gint                 i;
 
-		if (error) {
-			g_clear_error (&error);
-		}
-	} else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
-		end_batch (private);
+	for (i = 0; task == NULL && i < TRACKER_STORE_N_PRIORITIES; i++) {
+		queue = private->queues[i];
+		task = g_queue_peek_head (queue);
+	}
+	g_return_val_if_fail (task != NULL, FALSE);
 
-		if (task->callback.commit_callback) {
-			task->callback.commit_callback (task->user_data);
-		}
+	if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+		/* pop task now, otherwise further queries won't be scheduled */
+		g_queue_pop_head (queue);
+
+		private->n_queries_running++;
+
+		task_run_async (private, task);
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE ||
+	           task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK ||
+	           task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+		g_queue_pop_head (queue);
+
+		private->update_running = TRUE;
+
+		task_run_async (private, task);
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
 		GError *error = NULL;
 		static TrackerTurtleReader *turtle_reader = NULL;
@@ -320,6 +460,14 @@ queue_idle_handler (gpointer user_data)
 				turtle_reader = NULL;
 				g_clear_error (&error);
 
+				g_queue_pop_head (queue);
+
+				if (task->destroy) {
+					task->destroy (task->user_data);
+				}
+
+				store_task_free (task);
+
 				goto out;
 			}
 			task->data.turtle.in_progress = TRUE;
@@ -352,25 +500,19 @@ queue_idle_handler (gpointer user_data)
 			if (error) {
 				g_clear_error (&error);
 			}
-		}
-	}
-
- out:
-	g_queue_pop_head (queue);
 
-	if (task->destroy) {
-		task->destroy (task->user_data);
-	}
+			g_queue_pop_head (queue);
 
-	store_task_free (task);
+			if (task->destroy) {
+				task->destroy (task->user_data);
+			}
 
-	/* return TRUE if at least one queue is not empty (to keep idle handler running) */
-	for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
-		if (!g_queue_is_empty (private->queues[i])) {
-			return TRUE;
+			store_task_free (task);
 		}
 	}
-	return FALSE;
+
+out:
+	return task_ready (private);
 }
 
 static void
@@ -394,6 +536,19 @@ tracker_store_init (void)
 		private->queues[i] = g_queue_new ();
 	}
 
+	private->main_pool = g_thread_pool_new (pool_dispatch_cb,
+	                                        private, 1,
+	                                        TRUE, NULL);
+	private->global_pool = g_thread_pool_new (pool_dispatch_cb,
+	                                          private, TRACKER_STORE_MAX_CONCURRENT_QUERIES,
+	                                          FALSE, NULL);
+
+	/* as the following settings are global for unknown reasons,
+	   let's use the same settings as gio, otherwise the used settings
+	   are rather random */
+	g_thread_pool_set_max_idle_time (15 * 1000);
+	g_thread_pool_set_max_unused_threads (2);
+
 	g_static_private_set (&private_key,
 	                      private,
 	                      private_free);
@@ -408,6 +563,9 @@ tracker_store_shutdown (void)
 	private = g_static_private_get (&private_key);
 	g_return_if_fail (private != NULL);
 
+	g_thread_pool_free (private->global_pool, FALSE, TRUE);
+	g_thread_pool_free (private->main_pool, FALSE, TRUE);
+
 	if (private->have_handler) {
 		g_source_remove (private->handler);
 		private->have_handler = FALSE;
@@ -454,9 +612,7 @@ tracker_store_queue_commit (TrackerStoreCommitCallback callback,
 
 	g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_LOW], task);
 
-	if (!private->have_handler) {
-		start_handler (private);
-	}
+	check_handler (private);
 }
 
 void
@@ -485,9 +641,7 @@ tracker_store_sparql_query (const gchar *sparql,
 
 	g_queue_push_tail (private->queues[priority], task);
 
-	if (!private->have_handler) {
-		start_handler (private);
-	}
+	check_handler (private);
 }
 
 void
@@ -518,9 +672,7 @@ tracker_store_sparql_update (const gchar *sparql,
 
 	g_queue_push_tail (private->queues[priority], task);
 
-	if (!private->have_handler) {
-		start_handler (private);
-	}
+	check_handler (private);
 }
 
 void
@@ -549,9 +701,7 @@ tracker_store_sparql_update_blank (const gchar *sparql,
 
 	g_queue_push_tail (private->queues[priority], task);
 
-	if (!private->have_handler) {
-		start_handler (private);
-	}
+	check_handler (private);
 }
 
 void
@@ -577,9 +727,7 @@ tracker_store_queue_turtle_import (GFile                      *file,
 
 	g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_LOW], task);
 
-	if (!private->have_handler) {
-		start_handler (private);
-	}
+	check_handler (private);
 }
 
 guint



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]