[tracker/async-queries-wip: 22/23] threading changes



commit f8bd8d5021e8b7d36522c89c0514a58f1cfe3031
Author: Jürg Billeter <j bitron ch>
Date:   Thu Apr 15 13:55:31 2010 +0200

    threading changes

 src/libtracker-data/tracker-data-update.c       |   15 ++-
 src/libtracker-data/tracker-data-update.h       |    2 +
 src/libtracker-db/tracker-db-interface-sqlite.c |   12 +-
 src/libtracker-db/tracker-db-manager.c          |   24 ++-
 src/tracker-store/tracker-events.c              |    7 +
 src/tracker-store/tracker-store.c               |  261 ++++++++++++++---------
 src/tracker-store/tracker-writeback.c           |    7 +
 7 files changed, 224 insertions(+), 104 deletions(-)
---
diff --git a/src/libtracker-data/tracker-data-update.c b/src/libtracker-data/tracker-data-update.c
index 5f7f708..06bf196 100644
--- a/src/libtracker-data/tracker-data-update.c
+++ b/src/libtracker-data/tracker-data-update.c
@@ -1962,7 +1962,7 @@ tracker_data_begin_db_transaction_for_replay (time_t time)
 }
 
 void
-tracker_data_commit_db_transaction (void)
+tracker_data_commit_db_transaction_no_notify (void)
 {
 	TrackerDBInterface *iface;
 
@@ -1991,6 +1991,12 @@ tracker_data_commit_db_transaction (void)
 	g_hash_table_remove_all (update_buffer.resources_by_id);
 	g_hash_table_remove_all (update_buffer.resource_cache);
 
+	in_journal_replay = FALSE;
+}
+
+void
+tracker_data_commit_db_transaction_only_notify (void)
+{
 	if (commit_callbacks) {
 		guint n;
 		for (n = 0; n < commit_callbacks->len; n++) {
@@ -1999,8 +2005,13 @@ tracker_data_commit_db_transaction (void)
 			delegate->callback (delegate->user_data);
 		}
 	}
+}
 
-	in_journal_replay = FALSE;
+void
+tracker_data_commit_db_transaction (void)
+{
+	tracker_data_commit_db_transaction_no_notify ();
+	tracker_data_commit_db_transaction_only_notify ();
 }
 
 static void
diff --git a/src/libtracker-data/tracker-data-update.h b/src/libtracker-data/tracker-data-update.h
index 2300076..2c45801 100644
--- a/src/libtracker-data/tracker-data-update.h
+++ b/src/libtracker-data/tracker-data-update.h
@@ -83,6 +83,8 @@ void     tracker_data_insert_statement_with_string  (const gchar               *
 void     tracker_data_begin_db_transaction          (void);
 void     tracker_data_begin_db_transaction_for_replay (time_t                   time);
 void     tracker_data_commit_db_transaction         (void);
+void     tracker_data_commit_db_transaction_no_notify (void);
+void     tracker_data_commit_db_transaction_only_notify (void);
 void     tracker_data_begin_transaction             (GError                   **error);
 void     tracker_data_commit_transaction            (GError                   **error);
 void     tracker_data_rollback_transaction          (void);
diff --git a/src/libtracker-db/tracker-db-interface-sqlite.c b/src/libtracker-db/tracker-db-interface-sqlite.c
index 6e056da..6996695 100644
--- a/src/libtracker-db/tracker-db-interface-sqlite.c
+++ b/src/libtracker-db/tracker-db-interface-sqlite.c
@@ -153,7 +153,7 @@ static int
 my_lock (sqlite3_file* file, int locktype)
 {
 	if (locktype >= SQLITE_LOCK_PENDING) {
-		g_warning ("%p locked %d", file, locktype);
+		//g_warning ("%p locked %d", file, locktype);
 		db_locked = TRUE;
 	}
 	int rc = DEFAULT_METHODS(file)->xLock (file, locktype);
@@ -180,7 +180,7 @@ static int
 my_open (sqlite3_vfs *vfs, const char *zName, sqlite3_file* file, int flags, int *pOutFlags)
 {
 	int rc = default_vfs->xOpen (vfs, zName, file, flags, pOutFlags);
-	g_warning ("%p %s", file, zName);
+	//g_warning ("%p %s", file, zName);
 	if ((flags & SQLITE_OPEN_MAIN_DB) && file->pMethods) {
 		sqlite3_io_methods *my_methods = g_memdup (file->pMethods, sizeof (sqlite3_io_methods));
 		my_methods->xLock = my_lock;
@@ -202,9 +202,17 @@ tracker_db_manager_pending_lock (void)
 	}
 }
 
+static gboolean initialized = FALSE;
+
 void
 tracker_db_interface_sqlite_enable_shared_cache (void)
 {
+	if (initialized) {
+		return;
+	}
+
+	initialized = TRUE;
+
 	default_vfs = sqlite3_vfs_find (NULL);
 	sqlite3_vfs *wrapper = g_memdup (default_vfs, sizeof (sqlite3_vfs));
 	wrapper->szOsFile = ALIGN_STRUCT(default_vfs->szOsFile) + sizeof (gpointer);
diff --git a/src/libtracker-db/tracker-db-manager.c b/src/libtracker-db/tracker-db-manager.c
index 6af6d5b..1daf55e 100644
--- a/src/libtracker-db/tracker-db-manager.c
+++ b/src/libtracker-db/tracker-db-manager.c
@@ -177,6 +177,7 @@ static gpointer              db_type_enum_class_pointer;
 static TrackerDBInterface   *resources_iface;
 static TrackerDBManagerFlags old_flags = 0;
 
+static GStaticPrivate interface_data_key = G_STATIC_PRIVATE_INIT;
 static TrackerDBInterfacePool *interface_pool = NULL;
 
 #define TRACKER_DB_INTERFACE_POOL_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_DB_INTERFACE_POOL, TrackerDBInterfacePoolPrivate))
@@ -931,6 +932,8 @@ tracker_db_manager_shutdown (void)
 		return;
 	}
 
+	g_static_private_free (&interface_data_key);
+
 	for (i = 1; i < G_N_ELEMENTS (dbs); i++) {
 		if (dbs[i].abs_filename) {
 			g_free (dbs[i].abs_filename);
@@ -1209,9 +1212,28 @@ tracker_db_manager_get_db_interfaces_ro (gint num, ...)
 TrackerDBInterface *
 tracker_db_manager_get_db_interface (void)
 {
+	TrackerDBInterface *interface;
+
 	g_return_val_if_fail (initialized != FALSE, NULL);
 
-	return resources_iface;
+	interface = g_static_private_get (&interface_data_key);
+
+	/* Ensure the interface is there */
+	if (!interface) {
+		interface = tracker_db_manager_get_db_interfaces (3,
+		                                                  TRACKER_DB_METADATA,
+		                                                  TRACKER_DB_FULLTEXT,
+		                                                  TRACKER_DB_CONTENTS);
+
+		// FIXME should probably always be FALSE except sometimes on the main thread where we might not want to initialize fts implicitly at all
+		tracker_db_interface_sqlite_fts_init (TRACKER_DB_INTERFACE_SQLITE (interface), TRUE);
+
+		g_static_private_set (&interface_data_key,
+		                      interface,
+		                      (GDestroyNotify) g_object_unref);
+	}
+
+	return interface;
 }
 
 /**
diff --git a/src/tracker-store/tracker-events.c b/src/tracker-store/tracker-events.c
index beda952..99cc630 100644
--- a/src/tracker-store/tracker-events.c
+++ b/src/tracker-store/tracker-events.c
@@ -26,6 +26,13 @@
 
 #include "tracker-events.h"
 
+#define GStaticPrivate gpointer
+#undef G_STATIC_PRIVATE_INIT
+#define G_STATIC_PRIVATE_INIT NULL
+#define g_static_private_get(x) (*x)
+#define g_static_private_get(x) (*x)
+#define g_static_private_set(x,y,z) (*x = y)
+
 
 typedef struct {
 	GHashTable *allowances;
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
index 75e04f5..ec22a3c 100644
--- a/src/tracker-store/tracker-store.c
+++ b/src/tracker-store/tracker-store.c
@@ -44,6 +44,9 @@ typedef struct {
 	GQueue   *queues[TRACKER_STORE_N_PRIORITIES];
 	guint     handler, sync_handler;
 	guint     n_queries_running;
+	gboolean  update_running;
+	GThreadPool *main_pool;
+	GThreadPool *global_pool;
 } TrackerStorePrivate;
 
 typedef enum {
@@ -60,17 +63,20 @@ typedef struct {
 	union {
 		struct {
 			gchar                   *query;
+			TrackerDBResultSet      *result_set;
 		} query;
 		struct {
 			gchar                   *query;
 			gboolean                 batch;
 			gchar                   *client_id;
+			GPtrArray               *blank_nodes;
 		} update;
 		struct {
 			gboolean           in_progress;
 			gchar             *path;
 		} turtle;
 	} data;
+	GError                    *error;
 	gpointer                   user_data;
 	GDestroyNotify             destroy;
 	union {
@@ -190,7 +196,8 @@ end_batch (TrackerStorePrivate *private)
 {
 	if (private->batch_mode) {
 		/* commit pending batch items */
-		tracker_data_commit_db_transaction ();
+		tracker_data_commit_db_transaction_no_notify ();
+		tracker_data_commit_db_transaction_only_notify ();
 
 		/* the above commit will trigger a lock again, reset it */
 		tracker_db_manager_pending_lock ();
@@ -211,6 +218,9 @@ task_ready (TrackerStorePrivate *private)
 	if (private->n_queries_running >= TRACKER_STORE_MAX_CONCURRENT_QUERIES) {
 		/* maximum number of queries running already, cannot schedule anything else */
 		return FALSE;
+	} else if (private->update_running) {
+		/* update running already, cannot schedule anything else */
+		return FALSE;
 	}
 
 	for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
@@ -251,87 +261,106 @@ check_handler (TrackerStorePrivate *private)
 }
 
 static gboolean
-queue_idle_handler_finish (TrackerStorePrivate *private,
-                           TrackerStoreTask     *task)
+task_finish_cb (gpointer data)
 {
-	g_queue_remove (private->queues[task->priority], task);
+	TrackerStorePrivate *private;
+	TrackerStoreTask *task;
 
-	if (task->destroy) {
-		task->destroy (task->user_data);
-	}
+	private = g_static_private_get (&private_key);
+	task = data;
 
-	store_task_free (task);
+	if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+		if (task->callback.query_callback) {
+			task->callback.query_callback (task->data.query.result_set, task->error, task->user_data);
+		}
 
-	return task_ready (private);
-}
+		if (task->data.query.result_set) {
+			g_object_unref (task->data.query.result_set);
+			task->data.query.result_set = NULL;
+		}
 
-static void
-query_ready (GObject      *source,
-             GAsyncResult *res,
-             void         *user_data)
-{
-	GError *error = NULL;
-	TrackerDBResultSet *result_set;
-	TrackerStorePrivate *private;
-	TrackerStoreTask    *task;
+		if (task->error) {
+			g_clear_error (&task->error);
+		}
 
-	private = g_static_private_get (&private_key);
-	g_return_if_fail (private != NULL);
+		private->n_queries_running--;
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
+		if (!task->data.update.batch && !task->error) {
+			tracker_data_commit_db_transaction_only_notify ();
+		}
 
-	task = user_data;
+		if (task->callback.update_callback) {
+			task->callback.update_callback (task->error, task->user_data);
+		}
 
-	result_set = tracker_sparql_query_execute_finish (TRACKER_SPARQL_QUERY (source), res, &error);
+		if (task->error) {
+			g_clear_error (&task->error);
+		}
 
-	private->n_queries_running--;
+		private->update_running = FALSE;
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
+		if (!task->data.update.batch && !task->error) {
+			tracker_data_commit_db_transaction_only_notify ();
+		}
 
-	if (task->callback.query_callback) {
-		task->callback.query_callback (result_set, error, task->user_data);
-	}
+		if (task->callback.update_blank_callback) {
+			if (!task->data.update.blank_nodes) {
+				/* Create empty GPtrArray for dbus-glib to be happy */
+				task->data.update.blank_nodes = g_ptr_array_new ();
+			}
+
+			task->callback.update_blank_callback (task->data.update.blank_nodes, task->error, task->user_data);
+		}
+
+		if (task->data.update.blank_nodes) {
+			gint i;
 
-	if (result_set) {
-		g_object_unref (result_set);
+			for (i = 0; i < task->data.update.blank_nodes->len; i++) {
+				g_ptr_array_foreach (task->data.update.blank_nodes->pdata[i], (GFunc) g_hash_table_unref, NULL);
+				g_ptr_array_free (task->data.update.blank_nodes->pdata[i], TRUE);
+			}
+			g_ptr_array_free (task->data.update.blank_nodes, TRUE);
+		}
+
+		if (task->error) {
+			g_clear_error (&task->error);
+		}
+
+		private->update_running = FALSE;
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+		tracker_data_commit_db_transaction_only_notify ();
+
+		if (task->callback.commit_callback) {
+			task->callback.commit_callback (task->user_data);
+		}
+
+		private->update_running = FALSE;
 	}
 
-	if (error) {
-		g_clear_error (&error);
+	if (task->destroy) {
+		task->destroy (task->user_data);
 	}
 
-	queue_idle_handler_finish (private, task);
+	store_task_free (task);
 
 	check_handler (private);
+
+	return FALSE;
 }
 
-static gboolean
-queue_idle_handler (gpointer user_data)
+static void
+pool_dispatch_cb (gpointer data,
+                  gpointer user_data)
 {
-	TrackerStorePrivate *private = user_data;
-	GQueue              *queue;
-	TrackerStoreTask    *task = NULL;
-	gint                 i;
+	TrackerStorePrivate *private;
+	TrackerStoreTask *task;
 
-	for (i = 0; task == NULL && i < TRACKER_STORE_N_PRIORITIES; i++) {
-		queue = private->queues[i];
-		task = g_queue_peek_head (queue);
-	}
-	g_return_val_if_fail (task != NULL, FALSE);
+	private = user_data;
+	task = data;
 
 	if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
-		TrackerSparqlQuery *sparql_query;
-
-		private->n_queries_running++;
-
-		/* pop task now, otherwise further queries won't be scheduled */
-		g_queue_pop_head (queue);
-
-		sparql_query = tracker_sparql_query_new (task->data.query.query);
-		tracker_sparql_query_execute_async (sparql_query, NULL, query_ready, task);
-		g_object_unref (sparql_query);
-
-		/* suspend idle handler until above query finished */
-		return FALSE;
+		task->data.query.result_set = tracker_data_query_sparql (task->data.query.query, &task->error);
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
-		GError *error = NULL;
-
 		if (task->data.update.batch) {
 			begin_batch (private);
 		} else {
@@ -339,30 +368,19 @@ queue_idle_handler (gpointer user_data)
 			tracker_data_begin_db_transaction ();
 		}
 
-		tracker_data_update_sparql (task->data.update.query, &error);
+		tracker_data_update_sparql (task->data.update.query, &task->error);
 
 		if (task->data.update.batch) {
-			if (!error) {
+			if (!task->error) {
 				private->batch_count++;
 				if (tracker_db_manager_pending_lock ()) {
 					end_batch (private);
 				}
 			}
 		} else {
-			tracker_data_commit_db_transaction ();
-		}
-
-		if (task->callback.update_callback) {
-			task->callback.update_callback (error, task->user_data);
-		}
-
-		if (error) {
-			g_clear_error (&error);
+			tracker_data_commit_db_transaction_no_notify ();
 		}
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
-		GError *error = NULL;
-		GPtrArray *blank_nodes;
-
 		if (task->data.update.batch) {
 			begin_batch (private);
 		} else {
@@ -370,47 +388,68 @@ queue_idle_handler (gpointer user_data)
 			tracker_data_begin_db_transaction ();
 		}
 
-		blank_nodes = tracker_data_update_sparql_blank (task->data.update.query, &error);
+		task->data.update.blank_nodes = tracker_data_update_sparql_blank (task->data.update.query, &task->error);
 
 		if (task->data.update.batch) {
-			if (!error) {
+			if (!task->error) {
 				private->batch_count++;
 				if (tracker_db_manager_pending_lock ()) {
 					end_batch (private);
 				}
 			}
 		} else {
-			tracker_data_commit_db_transaction ();
+			tracker_data_commit_db_transaction_no_notify ();
 		}
 
-		if (task->callback.update_blank_callback) {
-			if (!blank_nodes) {
-				/* Create empty GPtrArray for dbus-glib to be happy */
-				blank_nodes = g_ptr_array_new ();
-			}
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+		end_batch (private);
+	}
 
-			task->callback.update_blank_callback (blank_nodes, error, task->user_data);
-		}
+	g_idle_add (task_finish_cb, task);
+}
 
-		if (blank_nodes) {
-			gint i;
+static void
+task_run_async (TrackerStorePrivate *private,
+                TrackerStoreTask    *task)
+{
+	if (private->n_queries_running > 1) {
+		/* use global pool if main pool might already be occupied */
+		g_thread_pool_push (private->global_pool, task, NULL);
+	} else {
+		/* use main pool for updates and non-parallel queries */
+		g_thread_pool_push (private->main_pool, task, NULL);
+	}
+}
 
-			for (i = 0; i < blank_nodes->len; i++) {
-				g_ptr_array_foreach (blank_nodes->pdata[i], (GFunc) g_hash_table_unref, NULL);
-				g_ptr_array_free (blank_nodes->pdata[i], TRUE);
-			}
-			g_ptr_array_free (blank_nodes, TRUE);
-		}
+static gboolean
+queue_idle_handler (gpointer user_data)
+{
+	TrackerStorePrivate *private = user_data;
+	GQueue              *queue;
+	TrackerStoreTask    *task = NULL;
+	gint                 i;
 
-		if (error) {
-			g_clear_error (&error);
-		}
-	} else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
-		end_batch (private);
+	for (i = 0; task == NULL && i < TRACKER_STORE_N_PRIORITIES; i++) {
+		queue = private->queues[i];
+		task = g_queue_peek_head (queue);
+	}
+	g_return_val_if_fail (task != NULL, FALSE);
 
-		if (task->callback.commit_callback) {
-			task->callback.commit_callback (task->user_data);
-		}
+	if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+		/* pop task now, otherwise further queries won't be scheduled */
+		g_queue_pop_head (queue);
+
+		private->n_queries_running++;
+
+		task_run_async (private, task);
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE ||
+	           task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK ||
+	           task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+		g_queue_pop_head (queue);
+
+		private->update_running = TRUE;
+
+		task_run_async (private, task);
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
 		GError *error = NULL;
 		static TrackerTurtleReader *turtle_reader = NULL;
@@ -457,11 +496,19 @@ queue_idle_handler (gpointer user_data)
 			if (error) {
 				g_clear_error (&error);
 			}
+
+			g_queue_remove (private->queues[task->priority], task);
+
+			if (task->destroy) {
+				task->destroy (task->user_data);
+			}
+
+			store_task_free (task);
 		}
 	}
 
 out:
-	return queue_idle_handler_finish (private, task);
+	return task_ready (private);
 }
 
 static void
@@ -485,6 +532,19 @@ tracker_store_init (void)
 		private->queues[i] = g_queue_new ();
 	}
 
+	private->main_pool = g_thread_pool_new (pool_dispatch_cb,
+	                                        private, 1,
+	                                        TRUE, NULL);
+	private->global_pool = g_thread_pool_new (pool_dispatch_cb,
+	                                          private, TRACKER_STORE_MAX_CONCURRENT_QUERIES,
+	                                          FALSE, NULL);
+
+	/* as the following settings are global for unknown reasons,
+	   let's use the same settings as gio, otherwise the used settings
+	   are rather random */
+	g_thread_pool_set_max_idle_time (15 * 1000);
+	g_thread_pool_set_max_unused_threads (2);
+
 	g_static_private_set (&private_key,
 	                      private,
 	                      private_free);
@@ -499,6 +559,9 @@ tracker_store_shutdown (void)
 	private = g_static_private_get (&private_key);
 	g_return_if_fail (private != NULL);
 
+	g_thread_pool_free (private->global_pool, FALSE, TRUE);
+	g_thread_pool_free (private->main_pool, FALSE, TRUE);
+
 	if (private->have_handler) {
 		g_source_remove (private->handler);
 		private->have_handler = FALSE;
diff --git a/src/tracker-store/tracker-writeback.c b/src/tracker-store/tracker-writeback.c
index 629e79a..b7e7490 100644
--- a/src/tracker-store/tracker-writeback.c
+++ b/src/tracker-store/tracker-writeback.c
@@ -31,6 +31,13 @@ typedef struct {
 	GHashTable *events;
 } WritebackPrivate;
 
+#define GStaticPrivate gpointer
+#undef G_STATIC_PRIVATE_INIT
+#define G_STATIC_PRIVATE_INIT NULL
+#define g_static_private_get(x) (*x)
+#define g_static_private_get(x) (*x)
+#define g_static_private_set(x,y,z) (*x = y)
+
 static GStaticPrivate private_key = G_STATIC_PRIVATE_INIT;
 
 static GStrv



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]