[tracker/async-queries-wip: 22/23] threading changes
- From: Jürg Billeter <juergbi src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/async-queries-wip: 22/23] threading changes
- Date: Thu, 15 Apr 2010 12:14:39 +0000 (UTC)
commit f8bd8d5021e8b7d36522c89c0514a58f1cfe3031
Author: Jürg Billeter <j bitron ch>
Date: Thu Apr 15 13:55:31 2010 +0200
threading changes
src/libtracker-data/tracker-data-update.c | 15 ++-
src/libtracker-data/tracker-data-update.h | 2 +
src/libtracker-db/tracker-db-interface-sqlite.c | 12 +-
src/libtracker-db/tracker-db-manager.c | 24 ++-
src/tracker-store/tracker-events.c | 7 +
src/tracker-store/tracker-store.c | 261 ++++++++++++++---------
src/tracker-store/tracker-writeback.c | 7 +
7 files changed, 224 insertions(+), 104 deletions(-)
---
diff --git a/src/libtracker-data/tracker-data-update.c b/src/libtracker-data/tracker-data-update.c
index 5f7f708..06bf196 100644
--- a/src/libtracker-data/tracker-data-update.c
+++ b/src/libtracker-data/tracker-data-update.c
@@ -1962,7 +1962,7 @@ tracker_data_begin_db_transaction_for_replay (time_t time)
}
void
-tracker_data_commit_db_transaction (void)
+tracker_data_commit_db_transaction_no_notify (void)
{
TrackerDBInterface *iface;
@@ -1991,6 +1991,12 @@ tracker_data_commit_db_transaction (void)
g_hash_table_remove_all (update_buffer.resources_by_id);
g_hash_table_remove_all (update_buffer.resource_cache);
+ in_journal_replay = FALSE;
+}
+
+void
+tracker_data_commit_db_transaction_only_notify (void)
+{
if (commit_callbacks) {
guint n;
for (n = 0; n < commit_callbacks->len; n++) {
@@ -1999,8 +2005,13 @@ tracker_data_commit_db_transaction (void)
delegate->callback (delegate->user_data);
}
}
+}
- in_journal_replay = FALSE;
+void
+tracker_data_commit_db_transaction (void)
+{
+ tracker_data_commit_db_transaction_no_notify ();
+ tracker_data_commit_db_transaction_only_notify ();
}
static void
diff --git a/src/libtracker-data/tracker-data-update.h b/src/libtracker-data/tracker-data-update.h
index 2300076..2c45801 100644
--- a/src/libtracker-data/tracker-data-update.h
+++ b/src/libtracker-data/tracker-data-update.h
@@ -83,6 +83,8 @@ void tracker_data_insert_statement_with_string (const gchar *
void tracker_data_begin_db_transaction (void);
void tracker_data_begin_db_transaction_for_replay (time_t time);
void tracker_data_commit_db_transaction (void);
+void tracker_data_commit_db_transaction_no_notify (void);
+void tracker_data_commit_db_transaction_only_notify (void);
void tracker_data_begin_transaction (GError **error);
void tracker_data_commit_transaction (GError **error);
void tracker_data_rollback_transaction (void);
diff --git a/src/libtracker-db/tracker-db-interface-sqlite.c b/src/libtracker-db/tracker-db-interface-sqlite.c
index 6e056da..6996695 100644
--- a/src/libtracker-db/tracker-db-interface-sqlite.c
+++ b/src/libtracker-db/tracker-db-interface-sqlite.c
@@ -153,7 +153,7 @@ static int
my_lock (sqlite3_file* file, int locktype)
{
if (locktype >= SQLITE_LOCK_PENDING) {
- g_warning ("%p locked %d", file, locktype);
+ //g_warning ("%p locked %d", file, locktype);
db_locked = TRUE;
}
int rc = DEFAULT_METHODS(file)->xLock (file, locktype);
@@ -180,7 +180,7 @@ static int
my_open (sqlite3_vfs *vfs, const char *zName, sqlite3_file* file, int flags, int *pOutFlags)
{
int rc = default_vfs->xOpen (vfs, zName, file, flags, pOutFlags);
- g_warning ("%p %s", file, zName);
+ //g_warning ("%p %s", file, zName);
if ((flags & SQLITE_OPEN_MAIN_DB) && file->pMethods) {
sqlite3_io_methods *my_methods = g_memdup (file->pMethods, sizeof (sqlite3_io_methods));
my_methods->xLock = my_lock;
@@ -202,9 +202,17 @@ tracker_db_manager_pending_lock (void)
}
}
+static gboolean initialized = FALSE;
+
void
tracker_db_interface_sqlite_enable_shared_cache (void)
{
+ if (initialized) {
+ return;
+ }
+
+ initialized = TRUE;
+
default_vfs = sqlite3_vfs_find (NULL);
sqlite3_vfs *wrapper = g_memdup (default_vfs, sizeof (sqlite3_vfs));
wrapper->szOsFile = ALIGN_STRUCT(default_vfs->szOsFile) + sizeof (gpointer);
diff --git a/src/libtracker-db/tracker-db-manager.c b/src/libtracker-db/tracker-db-manager.c
index 6af6d5b..1daf55e 100644
--- a/src/libtracker-db/tracker-db-manager.c
+++ b/src/libtracker-db/tracker-db-manager.c
@@ -177,6 +177,7 @@ static gpointer db_type_enum_class_pointer;
static TrackerDBInterface *resources_iface;
static TrackerDBManagerFlags old_flags = 0;
+static GStaticPrivate interface_data_key = G_STATIC_PRIVATE_INIT;
static TrackerDBInterfacePool *interface_pool = NULL;
#define TRACKER_DB_INTERFACE_POOL_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_DB_INTERFACE_POOL, TrackerDBInterfacePoolPrivate))
@@ -931,6 +932,8 @@ tracker_db_manager_shutdown (void)
return;
}
+ g_static_private_free (&interface_data_key);
+
for (i = 1; i < G_N_ELEMENTS (dbs); i++) {
if (dbs[i].abs_filename) {
g_free (dbs[i].abs_filename);
@@ -1209,9 +1212,28 @@ tracker_db_manager_get_db_interfaces_ro (gint num, ...)
TrackerDBInterface *
tracker_db_manager_get_db_interface (void)
{
+ TrackerDBInterface *interface;
+
g_return_val_if_fail (initialized != FALSE, NULL);
- return resources_iface;
+ interface = g_static_private_get (&interface_data_key);
+
+ /* Ensure the interface is there */
+ if (!interface) {
+ interface = tracker_db_manager_get_db_interfaces (3,
+ TRACKER_DB_METADATA,
+ TRACKER_DB_FULLTEXT,
+ TRACKER_DB_CONTENTS);
+
+ // FIXME should probably always be FALSE except sometimes on the main thread where we might not want to initialize fts implicitly at all
+ tracker_db_interface_sqlite_fts_init (TRACKER_DB_INTERFACE_SQLITE (interface), TRUE);
+
+ g_static_private_set (&interface_data_key,
+ interface,
+ (GDestroyNotify) g_object_unref);
+ }
+
+ return interface;
}
/**
diff --git a/src/tracker-store/tracker-events.c b/src/tracker-store/tracker-events.c
index beda952..99cc630 100644
--- a/src/tracker-store/tracker-events.c
+++ b/src/tracker-store/tracker-events.c
@@ -26,6 +26,13 @@
#include "tracker-events.h"
+#define GStaticPrivate gpointer
+#undef G_STATIC_PRIVATE_INIT
+#define G_STATIC_PRIVATE_INIT NULL
+#define g_static_private_get(x) (*x)
+#define g_static_private_get(x) (*x)
+#define g_static_private_set(x,y,z) (*x = y)
+
typedef struct {
GHashTable *allowances;
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
index 75e04f5..ec22a3c 100644
--- a/src/tracker-store/tracker-store.c
+++ b/src/tracker-store/tracker-store.c
@@ -44,6 +44,9 @@ typedef struct {
GQueue *queues[TRACKER_STORE_N_PRIORITIES];
guint handler, sync_handler;
guint n_queries_running;
+ gboolean update_running;
+ GThreadPool *main_pool;
+ GThreadPool *global_pool;
} TrackerStorePrivate;
typedef enum {
@@ -60,17 +63,20 @@ typedef struct {
union {
struct {
gchar *query;
+ TrackerDBResultSet *result_set;
} query;
struct {
gchar *query;
gboolean batch;
gchar *client_id;
+ GPtrArray *blank_nodes;
} update;
struct {
gboolean in_progress;
gchar *path;
} turtle;
} data;
+ GError *error;
gpointer user_data;
GDestroyNotify destroy;
union {
@@ -190,7 +196,8 @@ end_batch (TrackerStorePrivate *private)
{
if (private->batch_mode) {
/* commit pending batch items */
- tracker_data_commit_db_transaction ();
+ tracker_data_commit_db_transaction_no_notify ();
+ tracker_data_commit_db_transaction_only_notify ();
/* the above commit will trigger a lock again, reset it */
tracker_db_manager_pending_lock ();
@@ -211,6 +218,9 @@ task_ready (TrackerStorePrivate *private)
if (private->n_queries_running >= TRACKER_STORE_MAX_CONCURRENT_QUERIES) {
/* maximum number of queries running already, cannot schedule anything else */
return FALSE;
+ } else if (private->update_running) {
+ /* update running already, cannot schedule anything else */
+ return FALSE;
}
for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
@@ -251,87 +261,106 @@ check_handler (TrackerStorePrivate *private)
}
static gboolean
-queue_idle_handler_finish (TrackerStorePrivate *private,
- TrackerStoreTask *task)
+task_finish_cb (gpointer data)
{
- g_queue_remove (private->queues[task->priority], task);
+ TrackerStorePrivate *private;
+ TrackerStoreTask *task;
- if (task->destroy) {
- task->destroy (task->user_data);
- }
+ private = g_static_private_get (&private_key);
+ task = data;
- store_task_free (task);
+ if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+ if (task->callback.query_callback) {
+ task->callback.query_callback (task->data.query.result_set, task->error, task->user_data);
+ }
- return task_ready (private);
-}
+ if (task->data.query.result_set) {
+ g_object_unref (task->data.query.result_set);
+ task->data.query.result_set = NULL;
+ }
-static void
-query_ready (GObject *source,
- GAsyncResult *res,
- void *user_data)
-{
- GError *error = NULL;
- TrackerDBResultSet *result_set;
- TrackerStorePrivate *private;
- TrackerStoreTask *task;
+ if (task->error) {
+ g_clear_error (&task->error);
+ }
- private = g_static_private_get (&private_key);
- g_return_if_fail (private != NULL);
+ private->n_queries_running--;
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
+ if (!task->data.update.batch && !task->error) {
+ tracker_data_commit_db_transaction_only_notify ();
+ }
- task = user_data;
+ if (task->callback.update_callback) {
+ task->callback.update_callback (task->error, task->user_data);
+ }
- result_set = tracker_sparql_query_execute_finish (TRACKER_SPARQL_QUERY (source), res, &error);
+ if (task->error) {
+ g_clear_error (&task->error);
+ }
- private->n_queries_running--;
+ private->update_running = FALSE;
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
+ if (!task->data.update.batch && !task->error) {
+ tracker_data_commit_db_transaction_only_notify ();
+ }
- if (task->callback.query_callback) {
- task->callback.query_callback (result_set, error, task->user_data);
- }
+ if (task->callback.update_blank_callback) {
+ if (!task->data.update.blank_nodes) {
+ /* Create empty GPtrArray for dbus-glib to be happy */
+ task->data.update.blank_nodes = g_ptr_array_new ();
+ }
+
+ task->callback.update_blank_callback (task->data.update.blank_nodes, task->error, task->user_data);
+ }
+
+ if (task->data.update.blank_nodes) {
+ gint i;
- if (result_set) {
- g_object_unref (result_set);
+ for (i = 0; i < task->data.update.blank_nodes->len; i++) {
+ g_ptr_array_foreach (task->data.update.blank_nodes->pdata[i], (GFunc) g_hash_table_unref, NULL);
+ g_ptr_array_free (task->data.update.blank_nodes->pdata[i], TRUE);
+ }
+ g_ptr_array_free (task->data.update.blank_nodes, TRUE);
+ }
+
+ if (task->error) {
+ g_clear_error (&task->error);
+ }
+
+ private->update_running = FALSE;
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+ tracker_data_commit_db_transaction_only_notify ();
+
+ if (task->callback.commit_callback) {
+ task->callback.commit_callback (task->user_data);
+ }
+
+ private->update_running = FALSE;
}
- if (error) {
- g_clear_error (&error);
+ if (task->destroy) {
+ task->destroy (task->user_data);
}
- queue_idle_handler_finish (private, task);
+ store_task_free (task);
check_handler (private);
+
+ return FALSE;
}
-static gboolean
-queue_idle_handler (gpointer user_data)
+static void
+pool_dispatch_cb (gpointer data,
+ gpointer user_data)
{
- TrackerStorePrivate *private = user_data;
- GQueue *queue;
- TrackerStoreTask *task = NULL;
- gint i;
+ TrackerStorePrivate *private;
+ TrackerStoreTask *task;
- for (i = 0; task == NULL && i < TRACKER_STORE_N_PRIORITIES; i++) {
- queue = private->queues[i];
- task = g_queue_peek_head (queue);
- }
- g_return_val_if_fail (task != NULL, FALSE);
+ private = user_data;
+ task = data;
if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
- TrackerSparqlQuery *sparql_query;
-
- private->n_queries_running++;
-
- /* pop task now, otherwise further queries won't be scheduled */
- g_queue_pop_head (queue);
-
- sparql_query = tracker_sparql_query_new (task->data.query.query);
- tracker_sparql_query_execute_async (sparql_query, NULL, query_ready, task);
- g_object_unref (sparql_query);
-
- /* suspend idle handler until above query finished */
- return FALSE;
+ task->data.query.result_set = tracker_data_query_sparql (task->data.query.query, &task->error);
} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
- GError *error = NULL;
-
if (task->data.update.batch) {
begin_batch (private);
} else {
@@ -339,30 +368,19 @@ queue_idle_handler (gpointer user_data)
tracker_data_begin_db_transaction ();
}
- tracker_data_update_sparql (task->data.update.query, &error);
+ tracker_data_update_sparql (task->data.update.query, &task->error);
if (task->data.update.batch) {
- if (!error) {
+ if (!task->error) {
private->batch_count++;
if (tracker_db_manager_pending_lock ()) {
end_batch (private);
}
}
} else {
- tracker_data_commit_db_transaction ();
- }
-
- if (task->callback.update_callback) {
- task->callback.update_callback (error, task->user_data);
- }
-
- if (error) {
- g_clear_error (&error);
+ tracker_data_commit_db_transaction_no_notify ();
}
} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
- GError *error = NULL;
- GPtrArray *blank_nodes;
-
if (task->data.update.batch) {
begin_batch (private);
} else {
@@ -370,47 +388,68 @@ queue_idle_handler (gpointer user_data)
tracker_data_begin_db_transaction ();
}
- blank_nodes = tracker_data_update_sparql_blank (task->data.update.query, &error);
+ task->data.update.blank_nodes = tracker_data_update_sparql_blank (task->data.update.query, &task->error);
if (task->data.update.batch) {
- if (!error) {
+ if (!task->error) {
private->batch_count++;
if (tracker_db_manager_pending_lock ()) {
end_batch (private);
}
}
} else {
- tracker_data_commit_db_transaction ();
+ tracker_data_commit_db_transaction_no_notify ();
}
- if (task->callback.update_blank_callback) {
- if (!blank_nodes) {
- /* Create empty GPtrArray for dbus-glib to be happy */
- blank_nodes = g_ptr_array_new ();
- }
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+ end_batch (private);
+ }
- task->callback.update_blank_callback (blank_nodes, error, task->user_data);
- }
+ g_idle_add (task_finish_cb, task);
+}
- if (blank_nodes) {
- gint i;
+static void
+task_run_async (TrackerStorePrivate *private,
+ TrackerStoreTask *task)
+{
+ if (private->n_queries_running > 1) {
+ /* use global pool if main pool might already be occupied */
+ g_thread_pool_push (private->global_pool, task, NULL);
+ } else {
+ /* use main pool for updates and non-parallel queries */
+ g_thread_pool_push (private->main_pool, task, NULL);
+ }
+}
- for (i = 0; i < blank_nodes->len; i++) {
- g_ptr_array_foreach (blank_nodes->pdata[i], (GFunc) g_hash_table_unref, NULL);
- g_ptr_array_free (blank_nodes->pdata[i], TRUE);
- }
- g_ptr_array_free (blank_nodes, TRUE);
- }
+static gboolean
+queue_idle_handler (gpointer user_data)
+{
+ TrackerStorePrivate *private = user_data;
+ GQueue *queue;
+ TrackerStoreTask *task = NULL;
+ gint i;
- if (error) {
- g_clear_error (&error);
- }
- } else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
- end_batch (private);
+ for (i = 0; task == NULL && i < TRACKER_STORE_N_PRIORITIES; i++) {
+ queue = private->queues[i];
+ task = g_queue_peek_head (queue);
+ }
+ g_return_val_if_fail (task != NULL, FALSE);
- if (task->callback.commit_callback) {
- task->callback.commit_callback (task->user_data);
- }
+ if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+ /* pop task now, otherwise further queries won't be scheduled */
+ g_queue_pop_head (queue);
+
+ private->n_queries_running++;
+
+ task_run_async (private, task);
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE ||
+ task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK ||
+ task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
+ g_queue_pop_head (queue);
+
+ private->update_running = TRUE;
+
+ task_run_async (private, task);
} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
GError *error = NULL;
static TrackerTurtleReader *turtle_reader = NULL;
@@ -457,11 +496,19 @@ queue_idle_handler (gpointer user_data)
if (error) {
g_clear_error (&error);
}
+
+ g_queue_remove (private->queues[task->priority], task);
+
+ if (task->destroy) {
+ task->destroy (task->user_data);
+ }
+
+ store_task_free (task);
}
}
out:
- return queue_idle_handler_finish (private, task);
+ return task_ready (private);
}
static void
@@ -485,6 +532,19 @@ tracker_store_init (void)
private->queues[i] = g_queue_new ();
}
+ private->main_pool = g_thread_pool_new (pool_dispatch_cb,
+ private, 1,
+ TRUE, NULL);
+ private->global_pool = g_thread_pool_new (pool_dispatch_cb,
+ private, TRACKER_STORE_MAX_CONCURRENT_QUERIES,
+ FALSE, NULL);
+
+ /* as the following settings are global for unknown reasons,
+ let's use the same settings as gio, otherwise the used settings
+ are rather random */
+ g_thread_pool_set_max_idle_time (15 * 1000);
+ g_thread_pool_set_max_unused_threads (2);
+
g_static_private_set (&private_key,
private,
private_free);
@@ -499,6 +559,9 @@ tracker_store_shutdown (void)
private = g_static_private_get (&private_key);
g_return_if_fail (private != NULL);
+ g_thread_pool_free (private->global_pool, FALSE, TRUE);
+ g_thread_pool_free (private->main_pool, FALSE, TRUE);
+
if (private->have_handler) {
g_source_remove (private->handler);
private->have_handler = FALSE;
diff --git a/src/tracker-store/tracker-writeback.c b/src/tracker-store/tracker-writeback.c
index 629e79a..b7e7490 100644
--- a/src/tracker-store/tracker-writeback.c
+++ b/src/tracker-store/tracker-writeback.c
@@ -31,6 +31,13 @@ typedef struct {
GHashTable *events;
} WritebackPrivate;
+#define GStaticPrivate gpointer
+#undef G_STATIC_PRIVATE_INIT
+#define G_STATIC_PRIVATE_INIT NULL
+#define g_static_private_get(x) (*x)
+#define g_static_private_get(x) (*x)
+#define g_static_private_set(x,y,z) (*x = y)
+
static GStaticPrivate private_key = G_STATIC_PRIVATE_INIT;
static GStrv
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]