[tracker/thread] libtracker-direct: Make singleton connection thread-safe



commit 9291fc5ee1a6abacde99620ca7f77fc6b35f5609
Author: Jürg Billeter <j bitron ch>
Date:   Wed Nov 24 17:23:10 2010 +0100

    libtracker-direct: Make singleton connection thread-safe

 src/libtracker-data/libtracker-data.vapi          |    5 ++-
 src/libtracker-data/tracker-data-query.c          |    2 +-
 src/libtracker-data/tracker-db-interface-sqlite.c |   33 +++++++++------
 src/libtracker-data/tracker-db-interface.h        |    1 +
 src/libtracker-data/tracker-db-manager.c          |   20 +++++++++
 src/libtracker-data/tracker-db-manager.h          |    4 ++
 src/libtracker-data/tracker-sparql-query.vala     |   18 ++++----
 src/libtracker-direct/tracker-direct.vala         |   44 ++++++++++++++++++--
 8 files changed, 98 insertions(+), 29 deletions(-)
---
diff --git a/src/libtracker-data/libtracker-data.vapi b/src/libtracker-data/libtracker-data.vapi
index 63c2f6b..bb26b5c 100644
--- a/src/libtracker-data/libtracker-data.vapi
+++ b/src/libtracker-data/libtracker-data.vapi
@@ -61,6 +61,9 @@ namespace Tracker {
 	[CCode (cheader_filename = "libtracker-data/tracker-db-manager.h")]
 	namespace DBManager {
 		public unowned DBInterface get_db_interface ();
+		public void lock ();
+		public bool trylock ();
+		public void unlock ();
 	}
 
 	[CCode (cheader_filename = "libtracker-data/tracker-db-interface.h")]
@@ -79,7 +82,7 @@ namespace Tracker {
 		public abstract void bind_int (int index, int value);
 		public abstract void bind_text (int index, string value);
 		public abstract DBResultSet execute () throws DBInterfaceError;
-		public abstract DBCursor start_sparql_cursor (PropertyType[] types, string[] variable_names) throws DBInterfaceError;
+		public abstract DBCursor start_sparql_cursor (PropertyType[] types, string[] variable_names, bool threadsafe) throws DBInterfaceError;
 	}
 
 	[CCode (cheader_filename = "libtracker-data/tracker-class.h")]
diff --git a/src/libtracker-data/tracker-data-query.c b/src/libtracker-data/tracker-data-query.c
index 9e5347a..d2161a9 100644
--- a/src/libtracker-data/tracker-data-query.c
+++ b/src/libtracker-data/tracker-data-query.c
@@ -161,7 +161,7 @@ tracker_data_query_sparql_cursor (const gchar  *query,
 
 	sparql_query = tracker_sparql_query_new (query);
 
-	cursor = tracker_sparql_query_execute_cursor (sparql_query, error);
+	cursor = tracker_sparql_query_execute_cursor (sparql_query, FALSE, error);
 
 	g_object_unref (sparql_query);
 
diff --git a/src/libtracker-data/tracker-db-interface-sqlite.c b/src/libtracker-data/tracker-db-interface-sqlite.c
index de7089f..349fb3b 100644
--- a/src/libtracker-data/tracker-db-interface-sqlite.c
+++ b/src/libtracker-data/tracker-db-interface-sqlite.c
@@ -37,6 +37,7 @@
 #include "tracker-collation.h"
 
 #include "tracker-db-interface-sqlite.h"
+#include "tracker-db-manager.h"
 
 #define UNKNOWN_STATUS 0.5
 
@@ -65,8 +66,6 @@ struct TrackerDBInterface {
 #endif
 	GCancellable *cancellable;
 
-	/* async operation pending */
-	gboolean pending;
 	GAsyncReadyCallback outstanding_callback;
 
 	TrackerDBStatementLru select_stmt_lru;
@@ -90,6 +89,8 @@ struct TrackerDBCursor {
 	gint n_types;
 	gchar **variable_names;
 	gint n_variable_names;
+
+	gboolean threadsafe;
 };
 
 struct TrackerDBCursorClass {
@@ -117,7 +118,8 @@ static TrackerDBCursor    *tracker_db_cursor_sqlite_new      (sqlite3_stmt
                                                               TrackerPropertyType  *types,
                                                               gint                  n_types,
                                                               const gchar         **variable_names,
-                                                              gint                  n_variable_names);
+                                                              gint                  n_variable_names,
+                                                              gboolean              threadsafe);
 static gboolean            tracker_db_cursor_get_boolean     (TrackerSparqlCursor  *cursor,
                                                               guint                 column);
 static gboolean            db_cursor_iter_next               (TrackerDBCursor      *cursor,
@@ -895,7 +897,6 @@ tracker_db_interface_create_statement (TrackerDBInterface           *db_interfac
 	gchar *full_query;
 
 	g_return_val_if_fail (TRACKER_IS_DB_INTERFACE (db_interface), NULL);
-	g_return_val_if_fail (!db_interface->pending, NULL);
 
 	va_start (args, query);
 	full_query = g_strdup_vprintf (query, args);
@@ -1310,7 +1311,6 @@ async_ready_callback_wrapper (GObject      *source_object,
 	TrackerDBCursor *cursor = TRACKER_DB_CURSOR (source_object);
 	TrackerDBInterface *iface = cursor->ref_stmt->db_interface;
 
-	iface->pending = FALSE;
 	if (iface->outstanding_callback) {
 		GAsyncReadyCallback callback = iface->outstanding_callback;
 
@@ -1328,8 +1328,6 @@ tracker_db_cursor_iter_next_async (TrackerDBCursor     *cursor,
 {
 	GSimpleAsyncResult *res;
 
-	g_return_if_fail (!cursor->ref_stmt->db_interface->pending);
-	cursor->ref_stmt->db_interface->pending = TRUE;
 	cursor->ref_stmt->db_interface->outstanding_callback = callback;
 
 	res = g_simple_async_result_new (g_object_ref (cursor), async_ready_callback_wrapper, user_data, tracker_db_cursor_iter_next_async);
@@ -1375,7 +1373,8 @@ tracker_db_cursor_sqlite_new (sqlite3_stmt        *sqlite_stmt,
                               TrackerPropertyType *types,
                               gint                 n_types,
                               const gchar        **variable_names,
-                              gint                 n_variable_names)
+                              gint                 n_variable_names,
+                              gboolean             threadsafe)
 {
 	TrackerDBCursor *cursor;
 
@@ -1384,6 +1383,8 @@ tracker_db_cursor_sqlite_new (sqlite3_stmt        *sqlite_stmt,
 	cursor->stmt = sqlite_stmt;
 	cursor->finished = FALSE;
 
+	cursor->threadsafe = threadsafe;
+
 	if (ref_stmt) {
 		ref_stmt->stmt_is_sunk = TRUE;
 		cursor->ref_stmt = g_object_ref (ref_stmt);
@@ -1465,7 +1466,6 @@ void
 tracker_db_cursor_rewind (TrackerDBCursor *cursor)
 {
 	g_return_if_fail (TRACKER_IS_DB_CURSOR (cursor));
-	g_return_if_fail (!cursor->ref_stmt->db_interface->pending);
 
 	sqlite3_reset (cursor->stmt);
 	cursor->finished = FALSE;
@@ -1476,8 +1476,6 @@ tracker_db_cursor_iter_next (TrackerDBCursor *cursor,
                              GCancellable    *cancellable,
                              GError         **error)
 {
-	g_return_val_if_fail (!cursor->ref_stmt->db_interface->pending, FALSE);
-
 	return db_cursor_iter_next (cursor, cancellable, error);
 }
 
@@ -1493,6 +1491,10 @@ db_cursor_iter_next (TrackerDBCursor *cursor,
 	if (!cursor->finished) {
 		guint result;
 
+		if (cursor->threadsafe) {
+			tracker_db_manager_lock ();
+		}
+
 		if (g_cancellable_is_cancelled (cancellable)) {
 			result = SQLITE_INTERRUPT;
 			sqlite3_reset (cursor->stmt);
@@ -1517,6 +1519,10 @@ db_cursor_iter_next (TrackerDBCursor *cursor,
 		}
 
 		cursor->finished = (result != SQLITE_ROW);
+
+		if (cursor->threadsafe) {
+			tracker_db_manager_unlock ();
+		}
 	}
 
 	return (!cursor->finished);
@@ -1652,7 +1658,7 @@ tracker_db_statement_start_cursor (TrackerDBStatement  *stmt,
 {
 	g_return_val_if_fail (!stmt->stmt_is_sunk, NULL);
 
-	return tracker_db_cursor_sqlite_new (stmt->stmt, stmt, NULL, 0, NULL, 0);
+	return tracker_db_cursor_sqlite_new (stmt->stmt, stmt, NULL, 0, NULL, 0, FALSE);
 }
 
 TrackerDBCursor *
@@ -1661,11 +1667,12 @@ tracker_db_statement_start_sparql_cursor (TrackerDBStatement   *stmt,
                                           gint                  n_types,
                                           const gchar         **variable_names,
                                           gint                  n_variable_names,
+                                          gboolean              threadsafe,
                                           GError              **error)
 {
 	g_return_val_if_fail (!stmt->stmt_is_sunk, NULL);
 
-	return tracker_db_cursor_sqlite_new (stmt->stmt, stmt, types, n_types, variable_names, n_variable_names);
+	return tracker_db_cursor_sqlite_new (stmt->stmt, stmt, types, n_types, variable_names, n_variable_names, threadsafe);
 }
 
 static void
diff --git a/src/libtracker-data/tracker-db-interface.h b/src/libtracker-data/tracker-db-interface.h
index d80b2c0..1a19171 100644
--- a/src/libtracker-data/tracker-db-interface.h
+++ b/src/libtracker-data/tracker-db-interface.h
@@ -137,6 +137,7 @@ TrackerDBCursor *       tracker_db_statement_start_sparql_cursor     (TrackerDBS
                                                                       gint                        n_types,
                                                                       const gchar               **variable_names,
                                                                       gint                        n_variable_names,
+                                                                      gboolean                    threadsafe,
                                                                       GError                    **error);
 void                    tracker_db_interface_set_busy_handler        (TrackerDBInterface         *db_interface,
                                                                       TrackerBusyCallback         busy_callback,
diff --git a/src/libtracker-data/tracker-db-manager.c b/src/libtracker-data/tracker-db-manager.c
index 6d3b900..f525c8e 100644
--- a/src/libtracker-data/tracker-db-manager.c
+++ b/src/libtracker-data/tracker-db-manager.c
@@ -158,6 +158,8 @@ static guint                 u_cache_size;
 
 static GStaticPrivate        interface_data_key = G_STATIC_PRIVATE_INIT;
 
+static GStaticMutex          global_mutex = G_STATIC_MUTEX_INIT;
+
 static const gchar *
 location_to_directory (TrackerDBLocation location)
 {
@@ -1680,3 +1682,21 @@ tracker_db_manager_set_last_crawl_done (gboolean done)
 
 	g_free (filename);
 }
+
+void
+tracker_db_manager_lock (void)
+{
+	g_static_mutex_lock (&global_mutex);
+}
+
+gboolean
+tracker_db_manager_trylock (void)
+{
+	return g_static_mutex_trylock (&global_mutex);
+}
+
+void
+tracker_db_manager_unlock (void)
+{
+	g_static_mutex_unlock (&global_mutex);
+}
diff --git a/src/libtracker-data/tracker-db-manager.h b/src/libtracker-data/tracker-db-manager.h
index 70e8977..a82148e 100644
--- a/src/libtracker-data/tracker-db-manager.h
+++ b/src/libtracker-data/tracker-db-manager.h
@@ -69,6 +69,10 @@ void                tracker_db_manager_init_locations         (void);
 gboolean            tracker_db_manager_has_enough_space       (void);
 void                tracker_db_manager_remove_version_file    (void);
 
+void                tracker_db_manager_lock                   (void);
+gboolean            tracker_db_manager_trylock                (void);
+void                tracker_db_manager_unlock                 (void);
+
 TrackerDBManagerFlags
                     tracker_db_manager_get_flags              (guint *select_cache_size,
                                                                guint *update_cache_size);
diff --git a/src/libtracker-data/tracker-sparql-query.vala b/src/libtracker-data/tracker-sparql-query.vala
index 7e80770..2a19776 100644
--- a/src/libtracker-data/tracker-sparql-query.vala
+++ b/src/libtracker-data/tracker-sparql-query.vala
@@ -415,19 +415,19 @@ public class Tracker.Sparql.Query : Object {
 	}
 
 
-	public DBCursor? execute_cursor () throws DBInterfaceError, Sparql.Error, DateError {
+	public DBCursor? execute_cursor (bool threadsafe) throws DBInterfaceError, Sparql.Error, DateError {
 
 		prepare_execute ();
 
 		switch (current ()) {
 		case SparqlTokenType.SELECT:
-			return execute_select_cursor ();
+			return execute_select_cursor (threadsafe);
 		case SparqlTokenType.CONSTRUCT:
 			throw get_internal_error ("CONSTRUCT is not supported");
 		case SparqlTokenType.DESCRIBE:
 			throw get_internal_error ("DESCRIBE is not supported");
 		case SparqlTokenType.ASK:
-			return execute_ask_cursor ();
+			return execute_ask_cursor (threadsafe);
 		case SparqlTokenType.INSERT:
 		case SparqlTokenType.DELETE:
 		case SparqlTokenType.DROP:
@@ -523,10 +523,10 @@ public class Tracker.Sparql.Query : Object {
 		return stmt.execute ();
 	}
 
-	DBCursor? exec_sql_cursor (string sql, PropertyType[] types, string[] variable_names) throws DBInterfaceError, Sparql.Error, DateError {
+	DBCursor? exec_sql_cursor (string sql, PropertyType[] types, string[] variable_names, bool threadsafe) throws DBInterfaceError, Sparql.Error, DateError {
 		var stmt = prepare_for_exec (sql);
 
-		return stmt.start_sparql_cursor (types, variable_names);
+		return stmt.start_sparql_cursor (types, variable_names, threadsafe);
 	}
 
 	string get_select_query (out SelectContext context) throws DBInterfaceError, Sparql.Error, DateError {
@@ -546,11 +546,11 @@ public class Tracker.Sparql.Query : Object {
 		return exec_sql (get_select_query (out context));
 	}
 
-	DBCursor? execute_select_cursor () throws DBInterfaceError, Sparql.Error, DateError {
+	DBCursor? execute_select_cursor (bool threadsafe) throws DBInterfaceError, Sparql.Error, DateError {
 		SelectContext context;
 		string sql = get_select_query (out context);
 
-		return exec_sql_cursor (sql, context.types, context.variable_names);
+		return exec_sql_cursor (sql, context.types, context.variable_names, true);
 	}
 
 	string get_ask_query () throws DBInterfaceError, Sparql.Error, DateError {
@@ -584,8 +584,8 @@ public class Tracker.Sparql.Query : Object {
 		return exec_sql (get_ask_query ());
 	}
 
-	DBCursor? execute_ask_cursor () throws DBInterfaceError, Sparql.Error, DateError {
-		return exec_sql_cursor (get_ask_query (), new PropertyType[] { PropertyType.BOOLEAN }, new string[] { "result" });
+	DBCursor? execute_ask_cursor (bool threadsafe) throws DBInterfaceError, Sparql.Error, DateError {
+		return exec_sql_cursor (get_ask_query (), new PropertyType[] { PropertyType.BOOLEAN }, new string[] { "result" }, true);
 	}
 
 	private void parse_from_or_into_param () throws Sparql.Error {
diff --git a/src/libtracker-direct/tracker-direct.vala b/src/libtracker-direct/tracker-direct.vala
index 5bf37b8..884b053 100644
--- a/src/libtracker-direct/tracker-direct.vala
+++ b/src/libtracker-direct/tracker-direct.vala
@@ -45,10 +45,10 @@ public class Tracker.Direct.Connection : Tracker.Sparql.Connection {
 		}
 	}
 
-	public override Sparql.Cursor query (string sparql, Cancellable? cancellable) throws Sparql.Error, IOError, DBusError {
+	Sparql.Cursor query_unlocked (string sparql, Cancellable? cancellable) throws Sparql.Error, IOError, DBusError {
 		try {
 			var query_object = new Sparql.Query (sparql);
-			var cursor = query_object.execute_cursor ();
+			var cursor = query_object.execute_cursor (true);
 			cursor.connection = this;
 			return cursor;
 		} catch (DBInterfaceError e) {
@@ -58,9 +58,43 @@ public class Tracker.Direct.Connection : Tracker.Sparql.Connection {
 		}
 	}
 
-	public async override Sparql.Cursor query_async (string sparql, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
-		// just creating the cursor won't block
-		return query (sparql, cancellable);
+	public override Sparql.Cursor query (string sparql, Cancellable? cancellable) throws Sparql.Error, IOError, DBusError {
+		DBManager.lock ();
+		try {
+			return query_unlocked (sparql, cancellable);
+		} finally {
+			DBManager.unlock ();
+		}
+	}
+
+	public async override Sparql.Cursor query_async (string sparql, Cancellable? cancellable) throws Sparql.Error, IOError, DBusError {
+		if (!DBManager.trylock ()) {
+			// run in a separate thread
+			Error job_error = null;
+			Sparql.Cursor result = null;
+
+			g_io_scheduler_push_job (job => {
+				try {
+					result = query (sparql, cancellable);
+				} catch (Error e) {
+					job_error = e;
+				}
+				query_async.callback ();
+				return false;
+			});
+			yield;
+
+			if (job_error != null) {
+				throw job_error;
+			} else {
+				return result;
+			}
+		}
+		try {
+			return query_unlocked (sparql, cancellable);
+		} finally {
+			DBManager.unlock ();
+		}
 	}
 }
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]