[tracker] libtracker-data: Switch to manual WAL checkpointing in a separate thread
- From: JÃrg Billeter <juergbi src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker] libtracker-data: Switch to manual WAL checkpointing in a separate thread
- Date: Wed, 29 Jun 2011 12:26:47 +0000 (UTC)
commit d7df0a6470ed0d99656a5509bb4b3593c0a42571
Author: JÃrg Billeter <j bitron ch>
Date: Thu Jun 23 14:56:17 2011 +0200
libtracker-data: Switch to manual WAL checkpointing in a separate thread
src/libtracker-data/libtracker-data.vapi | 7 ++
src/libtracker-data/tracker-db-interface-sqlite.c | 17 ++++
src/libtracker-data/tracker-db-interface-sqlite.h | 4 +
src/libtracker-data/tracker-db-manager.c | 3 +
src/tracker-store/tracker-store.vala | 88 +++++++++++++++++---
5 files changed, 105 insertions(+), 14 deletions(-)
---
diff --git a/src/libtracker-data/libtracker-data.vapi b/src/libtracker-data/libtracker-data.vapi
index fe3b9b8..eeb9158 100644
--- a/src/libtracker-data/libtracker-data.vapi
+++ b/src/libtracker-data/libtracker-data.vapi
@@ -57,10 +57,17 @@ namespace Tracker {
NONE
}
+ [CCode (has_target = false, cheader_filename = "libtracker-data/tracker-db-interface-sqlite.h")]
+ public delegate void DBWalCallback (int n_pages);
+
[CCode (cheader_filename = "libtracker-data/tracker-db-interface.h")]
public interface DBInterface : GLib.Object {
[PrintfFormat]
public abstract DBStatement create_statement (DBStatementCacheType cache_type, ...) throws DBInterfaceError;
+ [PrintfFormat]
+ public void execute_query (...) throws DBInterfaceError;
+ [CCode (cheader_filename = "libtracker-data/tracker-db-interface-sqlite.h")]
+ public void sqlite_wal_hook (DBWalCallback callback);
}
[CCode (cheader_filename = "libtracker-data/tracker-data-update.h")]
diff --git a/src/libtracker-data/tracker-db-interface-sqlite.c b/src/libtracker-data/tracker-db-interface-sqlite.c
index 0abb164..88f21d0 100644
--- a/src/libtracker-data/tracker-db-interface-sqlite.c
+++ b/src/libtracker-data/tracker-db-interface-sqlite.c
@@ -1024,6 +1024,23 @@ tracker_db_interface_sqlite_reset_collator (TrackerDBInterface *db_interface)
}
}
+static gint
+wal_hook (gpointer user_data,
+ sqlite3 *db,
+ const gchar *db_name,
+ gint n_pages)
+{
+ ((TrackerDBWalCallback) user_data) (n_pages);
+
+ return SQLITE_OK;
+}
+
+void
+tracker_db_interface_sqlite_wal_hook (TrackerDBInterface *interface,
+ TrackerDBWalCallback callback)
+{
+ sqlite3_wal_hook (interface->db, wal_hook, callback);
+}
static void
diff --git a/src/libtracker-data/tracker-db-interface-sqlite.h b/src/libtracker-data/tracker-db-interface-sqlite.h
index e58ebc6..bdd1b56 100644
--- a/src/libtracker-data/tracker-db-interface-sqlite.h
+++ b/src/libtracker-data/tracker-db-interface-sqlite.h
@@ -32,6 +32,8 @@ G_BEGIN_DECLS
#define TRACKER_COLLATION_NAME "TRACKER"
+typedef void (*TrackerDBWalCallback) (gint n_pages);
+
TrackerDBInterface *tracker_db_interface_sqlite_new (const gchar *filename,
GError **error);
TrackerDBInterface *tracker_db_interface_sqlite_new_ro (const gchar *filename,
@@ -41,6 +43,8 @@ void tracker_db_interface_sqlite_enable_shared_cache (void);
void tracker_db_interface_sqlite_fts_init (TrackerDBInterface *interface,
gboolean create);
void tracker_db_interface_sqlite_reset_collator (TrackerDBInterface *interface);
+void tracker_db_interface_sqlite_wal_hook (TrackerDBInterface *interface,
+ TrackerDBWalCallback callback);
#if HAVE_TRACKER_FTS
int tracker_db_interface_sqlite_fts_update_init (TrackerDBInterface *interface,
diff --git a/src/libtracker-data/tracker-db-manager.c b/src/libtracker-data/tracker-db-manager.c
index 5fc16b4..e6a7827 100644
--- a/src/libtracker-data/tracker-db-manager.c
+++ b/src/libtracker-data/tracker-db-manager.c
@@ -266,6 +266,9 @@ db_set_params (TrackerDBInterface *iface,
g_object_unref (stmt);
}
+ /* disable autocheckpoint */
+ tracker_db_interface_execute_query (iface, NULL, "PRAGMA wal_autocheckpoint = 0");
+
if (page_size != TRACKER_DB_PAGE_SIZE_DONT_SET) {
g_message (" Setting page size to %d", page_size);
tracker_db_interface_execute_query (iface, NULL, "PRAGMA page_size = %d", page_size);
diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala
index 120dbf4..ec4b0e9 100644
--- a/src/tracker-store/tracker-store.vala
+++ b/src/tracker-store/tracker-store.vala
@@ -30,6 +30,7 @@ public class Tracker.Store {
static bool update_running;
static ThreadPool<Task> update_pool;
static ThreadPool<Task> query_pool;
+ static ThreadPool<bool> checkpoint_pool;
static GenericArray<Task> running_tasks;
static int max_task_time;
static bool active;
@@ -212,24 +213,29 @@ public class Tracker.Store {
var cursor = Tracker.Data.query_sparql_cursor (query_task.query);
query_task.in_thread (cursor);
- } else if (task.type == TaskType.UPDATE) {
- var update_task = (UpdateTask) task;
+ } else {
+ var iface = DBManager.get_db_interface ();
+ iface.sqlite_wal_hook (wal_hook);
- Tracker.Data.update_sparql (update_task.query);
- } else if (task.type == TaskType.UPDATE_BLANK) {
- var update_task = (UpdateTask) task;
+ if (task.type == TaskType.UPDATE) {
+ var update_task = (UpdateTask) task;
- update_task.blank_nodes = Tracker.Data.update_sparql_blank (update_task.query);
- } else if (task.type == TaskType.TURTLE) {
- var turtle_task = (TurtleTask) task;
+ Tracker.Data.update_sparql (update_task.query);
+ } else if (task.type == TaskType.UPDATE_BLANK) {
+ var update_task = (UpdateTask) task;
- var file = File.new_for_path (turtle_task.path);
+ update_task.blank_nodes = Tracker.Data.update_sparql_blank (update_task.query);
+ } else if (task.type == TaskType.TURTLE) {
+ var turtle_task = (TurtleTask) task;
- Tracker.Events.freeze ();
- try {
- Tracker.Data.load_turtle_file (file);
- } finally {
- Tracker.Events.reset_pending ();
+ var file = File.new_for_path (turtle_task.path);
+
+ Tracker.Events.freeze ();
+ try {
+ Tracker.Data.load_turtle_file (file);
+ } finally {
+ Tracker.Events.reset_pending ();
+ }
}
}
} catch (Error e) {
@@ -242,6 +248,48 @@ public class Tracker.Store {
});
}
+ static void wal_checkpoint () {
+ try {
+ debug ("Checkpointing database...");
+ var iface = DBManager.get_db_interface ();
+ iface.execute_query ("PRAGMA wal_checkpoint");
+ debug ("Checkpointing complete...");
+ } catch (Error e) {
+ warning (e.message);
+ }
+ }
+
+ static int checkpointing;
+
+ static void wal_hook (int n_pages) {
+ // run in update thread
+
+ debug ("WAL: %d pages", n_pages);
+
+ if (n_pages >= 10000) {
+ // do immediate checkpointing (blocking updates)
+ // to prevent excessive wal file growth
+ wal_checkpoint ();
+ } else if (n_pages >= 1000) {
+ if (AtomicInt.compare_and_exchange (ref checkpointing, 0, 1)) {
+ // initiate asynchronous checkpointing (not blocking updates)
+ try {
+ checkpoint_pool.push (true);
+ } catch (Error e) {
+ warning (e.message);
+ AtomicInt.set (ref checkpointing, 0);
+ }
+ }
+ }
+ }
+
+ static void checkpoint_dispatch_cb (bool task) {
+ // run in checkpoint thread
+
+ wal_checkpoint ();
+ AtomicInt.set (ref checkpointing, 0);
+ }
+
public static void init () {
string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME");
if (max_task_time_env != null) {
@@ -260,6 +308,7 @@ public class Tracker.Store {
try {
update_pool = new ThreadPool<Task> (pool_dispatch_cb, 1, true);
query_pool = new ThreadPool<Task> (pool_dispatch_cb, MAX_CONCURRENT_QUERIES, true);
+ checkpoint_pool = new ThreadPool<bool> (checkpoint_dispatch_cb, 1, true);
} catch (Error e) {
warning (e.message);
}
@@ -274,6 +323,7 @@ public class Tracker.Store {
public static void shutdown () {
query_pool = null;
update_pool = null;
+ checkpoint_pool = null;
for (int i = 0; i < Priority.N_PRIORITIES; i++) {
query_queues[i] = null;
@@ -424,6 +474,16 @@ public class Tracker.Store {
active_callback = null;
}
+ if (AtomicInt.get (ref checkpointing) != 0) {
+ // this will wait for checkpointing to finish
+ checkpoint_pool = null;
+ try {
+ checkpoint_pool = new ThreadPool<bool> (checkpoint_dispatch_cb, 1, true);
+ } catch (Error e) {
+ warning (e.message);
+ }
+ }
+
if (active) {
sched ();
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]