[tracker/wip/carlosg/domain-ontologies: 20/30] libtracker-direct: Allow creating readwrite databases through direct connection
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/wip/carlosg/domain-ontologies: 20/30] libtracker-direct: Allow creating readwrite databases through direct connection
- Date: Sun, 4 Jun 2017 15:47:26 +0000 (UTC)
commit 0a0627df33e7e951450bc2beabb36850e44eb097
Author: Carlos Garnacho <carlosg gnome org>
Date: Sat Jun 3 00:32:34 2017 +0200
libtracker-direct: Allow creating readwrite databases through direct connection
This will allow creating sparql databases that are private to applications. They
will be able to get multiple readonly handles, and updates will be handled by
a single private thread. Pretty much like tracker-store (and the refactor took
heavy inspiration from it), just sans dbus.
So far, only the current direct connections (i.e. readonly, pointing to
~/.cache/tracker) have been replaced.
src/libtracker-direct/tracker-direct.vala | 276 ++++++++++++++++----
src/libtracker-sparql-backend/tracker-backend.vala | 11 +-
2 files changed, 237 insertions(+), 50 deletions(-)
---
diff --git a/src/libtracker-direct/tracker-direct.vala b/src/libtracker-direct/tracker-direct.vala
index da9feaa..aac1fe5 100644
--- a/src/libtracker-direct/tracker-direct.vala
+++ b/src/libtracker-direct/tracker-direct.vala
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2010, Nokia <ivan frade nokia com>
+ * Copyright (C) 2017, Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@@ -17,76 +18,194 @@
* Boston, MA 02110-1301, USA.
*/
-public class Tracker.Direct.Connection : Tracker.Sparql.Connection {
- static int use_count;
- bool initialized;
+public class Tracker.Direct.Connection : Tracker.Sparql.Connection, AsyncInitable, Initable {
+ File? database_loc;
+ File? journal_loc;
+ File? ontology_loc;
+ Sparql.ConnectionFlags flags;
+
+ // Mutex to hold datamanager
private Mutex mutex = Mutex ();
+ Thread<void*> thread;
- public Connection () throws Sparql.Error, IOError, DBusError {
- try {
- if (use_count == 0) {
- // make sure that current locale vs db locale are the same,
- // otherwise return an error
- Locale.init ();
- DBManager.locale_changed ();
+ // Initialization stuff, both sync and async
+ private Mutex init_mutex = Mutex ();
+ private Cond init_cond = Cond ();
+ private bool initialized;
+ private Error init_error;
+ public SourceFunc init_callback;
- uint select_cache_size = 100;
- string env_cache_size = Environment.get_variable
("TRACKER_SPARQL_CACHE_SIZE");
+ private AsyncQueue<Task> update_queue;
- if (env_cache_size != null) {
- select_cache_size = int.parse (env_cache_size);
- }
+ enum TaskType {
+ QUERY,
+ UPDATE,
+ UPDATE_BLANK,
+ TURTLE,
+ }
- Data.Manager.init (DBManagerFlags.READONLY | DBManagerFlags.ENABLE_MUTEXES,
- null /*loc */, null /* domain */ , null /* ontology */,
- null, false, false, select_cache_size, 0, null, null);
- }
+ abstract class Task {
+ public TaskType type;
+ public int priority;
+ public Cancellable? cancellable;
+ public SourceFunc callback;
+ public Error error;
+ }
+
+ private class UpdateTask : Task {
+ public string sparql;
+ public Variant blank_nodes;
+
+ private void set (TaskType type, string sparql, int priority = Priority.DEFAULT, Cancellable?
cancellable = null) {
+ this.type = type;
+ this.sparql = sparql;
+ this.priority = priority;
+ this.cancellable = cancellable;
+ }
+
+ public UpdateTask (string sparql, int priority = Priority.DEFAULT, Cancellable? cancellable) {
+ this.set (TaskType.UPDATE, sparql, priority, cancellable);
+ }
- use_count++;
- initialized = true;
+ public UpdateTask.blank (string sparql, int priority = Priority.DEFAULT, Cancellable?
cancellable) {
+ this.set (TaskType.UPDATE_BLANK, sparql, priority, cancellable);
+ }
+ }
+
+ private class TurtleTask : Task {
+ public File file;
+
+ public TurtleTask (File file, Cancellable? cancellable) {
+ this.type = TaskType.TURTLE;
+ this.file = file;
+ this.priority = Priority.DEFAULT;
+ this.cancellable = cancellable;
+ }
+ }
+
+ static void wal_checkpoint (DBInterface iface, bool blocking) {
+ try {
+ debug ("Checkpointing database...");
+ iface.sqlite_wal_checkpoint (blocking);
+ debug ("Checkpointing complete...");
} catch (Error e) {
- throw new Sparql.Error.INTERNAL (e.message);
+ warning (e.message);
}
}
- public Connection.custom_ontology (File loc, File? journal_loc, File? ontology_loc) throws
Sparql.Error, IOError, DBusError {
+ static void wal_checkpoint_on_thread () {
+ new Thread<void*> ("wal-checkpoint", () => {
+ var iface = DBManager.get_db_interface ();
+ wal_checkpoint (iface, false);
+ return null;
+ });
+ }
+
+ static void wal_hook (DBInterface iface, int n_pages) {
+ if (n_pages >= 10000) {
+ // do immediate checkpointing (blocking updates)
+ // to prevent excessive wal file growth
+ wal_checkpoint (iface, true);
+ } else if (n_pages >= 1000) {
+ wal_checkpoint_on_thread ();
+ }
+ }
+
+ private void* thread_func () {
+ init_mutex.lock ();
+
try {
- if (use_count == 0) {
- // make sure that current locale vs db locale are the same,
- // otherwise return an error
- Locale.init ();
- DBManager.locale_changed ();
+ Locale.init ();
+ DBManagerFlags db_flags = DBManagerFlags.ENABLE_MUTEXES;
+ if ((flags & Sparql.ConnectionFlags.READONLY) != 0)
+ db_flags |= DBManagerFlags.READONLY;
+ Data.Manager.init (db_flags,
+ database_loc, journal_loc, ontology_loc,
+ null, false, false, 100, 100, null, null);
- uint select_cache_size = 100;
- string env_cache_size = Environment.get_variable
("TRACKER_SPARQL_CACHE_SIZE");
+ var iface = DBManager.get_db_interface ();
+ iface.sqlite_wal_hook (wal_hook);
+ } catch (Error e) {
+ init_error = e;
+ } finally {
+ if (init_callback != null) {
+ init_callback ();
+ } else {
+ initialized = true;
+ init_cond.signal ();
+ init_mutex.unlock ();
+ }
+ }
- if (env_cache_size != null) {
- select_cache_size = int.parse (env_cache_size);
- }
+ while (true) {
+ var task = update_queue.pop();
- Data.Manager.init (DBManagerFlags.READONLY | DBManagerFlags.ENABLE_MUTEXES,
- loc, journal_loc, ontology_loc,
- null, false, false, select_cache_size, 0, null, null);
+ try {
+ switch (task.type) {
+ case TaskType.UPDATE:
+ UpdateTask update_task = (UpdateTask) task;
+ update (update_task.sparql, update_task.priority,
update_task.cancellable);
+ break;
+ case TaskType.UPDATE_BLANK:
+ UpdateTask update_task = (UpdateTask) task;
+ update_task.blank_nodes = update_blank (update_task.sparql,
update_task.priority, update_task.cancellable);
+ break;
+ case TaskType.TURTLE:
+ TurtleTask turtle_task = (TurtleTask) task;
+ load (turtle_task.file, turtle_task.cancellable);
+ break;
+ default:
+ break;
+ }
+ } catch (Error e) {
+ task.error = e;
}
- use_count++;
- initialized = true;
+ task.callback ();
+ }
+ }
+
+ public async bool init_async (int io_priority, Cancellable? cancellable) throws Error {
+ init_callback = init_async.callback;
+ thread = new Thread<void*> ("database", thread_func);
+
+ return initialized;
+ }
+
+ public bool init (Cancellable? cancellable) throws Error {
+ try {
+ thread = new Thread<void*> ("database", thread_func);
+
+ init_mutex.lock ();
+ while (!initialized)
+ init_cond.wait(init_mutex);
+ init_mutex.unlock ();
+
+ if (init_error != null)
+ throw init_error;
} catch (Error e) {
throw new Sparql.Error.INTERNAL (e.message);
}
+
+ return true;
}
+
+ public Connection (Sparql.ConnectionFlags connection_flags, File loc, File? journal, File? ontology)
throws Sparql.Error, IOError, DBusError {
+ database_loc = loc;
+ journal_loc = journal;
+ ontology_loc = ontology;
+ flags = connection_flags;
+
+ update_queue = new AsyncQueue<Task> ();
+ }
+
~Connection () {
if (!initialized) {
- // use_count did not get increased if initialization failed
return;
}
// Clean up connection
- use_count--;
-
- if (use_count == 0) {
- Data.Manager.shutdown ();
- }
+ Data.Manager.shutdown ();
}
Sparql.Cursor query_unlocked (string sparql) throws Sparql.Error, DBusError {
@@ -125,7 +244,7 @@ public class Tracker.Direct.Connection : Tracker.Sparql.Connection {
Sparql.Cursor result = null;
var context = MainContext.get_thread_default ();
- g_io_scheduler_push_job (job => {
+ IOSchedulerJob.push ((job, cancellable) => {
try {
result = query (sparql, cancellable);
} catch (IOError e_io) {
@@ -136,15 +255,14 @@ public class Tracker.Direct.Connection : Tracker.Sparql.Connection {
dbus_error = e_dbus;
}
- var source = new IdleSource ();
- source.set_callback (() => {
+ context.invoke (() => {
query_async.callback ();
return false;
});
- source.attach (context);
return false;
}, Priority.DEFAULT, cancellable);
+
yield;
if (cancellable != null && cancellable.is_cancelled ()) {
@@ -159,4 +277,66 @@ public class Tracker.Direct.Connection : Tracker.Sparql.Connection {
return result;
}
}
+
+ public override void update (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable?
cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error {
+ mutex.lock ();
+ try {
+ Tracker.Data.update_sparql (sparql);
+ } finally {
+ mutex.unlock ();
+ }
+ }
+
+ public async override void update_async (string sparql, int priority = GLib.Priority.DEFAULT,
Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error {
+ var task = new UpdateTask (sparql, priority, cancellable);
+ task.callback = update_async.callback;
+ update_queue.push (task);
+ yield;
+
+ if (task.error != null)
+ throw task.error;
+ }
+
+ public override GLib.Variant? update_blank (string sparql, int priority = GLib.Priority.DEFAULT,
Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error {
+ GLib.Variant? blank_nodes = null;
+ mutex.lock ();
+ try {
+ blank_nodes = Tracker.Data.update_sparql_blank (sparql);
+ } finally {
+ mutex.unlock ();
+ }
+
+ return blank_nodes;
+ }
+
+ public async override GLib.Variant? update_blank_async (string sparql, int priority =
GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error {
+ var task = new UpdateTask.blank (sparql, priority, cancellable);
+ task.callback = update_blank_async.callback;
+ update_queue.push (task);
+ yield;
+
+ if (task.error != null)
+ throw task.error;
+
+ return task.blank_nodes;
+ }
+
+ public override void load (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError,
DBusError {
+ mutex.lock ();
+ try {
+ Tracker.Data.load_turtle_file (file);
+ } finally {
+ mutex.unlock ();
+ }
+ }
+
+ public async override void load_async (File file, Cancellable? cancellable = null) throws
Sparql.Error, IOError, DBusError {
+ var task = new TurtleTask (file, cancellable);
+ task.callback = load_async.callback;
+ update_queue.push (task);
+ yield;
+
+ if (task.error != null)
+ throw new Sparql.Error.INTERNAL (task.error.message);
+ }
}
diff --git a/src/libtracker-sparql-backend/tracker-backend.vala
b/src/libtracker-sparql-backend/tracker-backend.vala
index 6d7e982..68eba33 100644
--- a/src/libtracker-sparql-backend/tracker-backend.vala
+++ b/src/libtracker-sparql-backend/tracker-backend.vala
@@ -182,6 +182,13 @@ class Tracker.Sparql.Backend : Connection {
return yield bus.statistics_async (cancellable);
}
+ private Connection create_readonly_direct () throws GLib.Error, Sparql.Error, IOError, DBusError {
+ File store = File.new_for_path (Path.build_filename (Environment.get_user_cache_dir(),
"tracker"));
+ var conn = new Tracker.Direct.Connection (Tracker.Sparql.ConnectionFlags.READONLY, store,
null, null);
+ conn.init ();
+ return conn;
+ }
+
// Plugin loading functions
private void load_plugins () throws GLib.Error {
string env_backend = Environment.get_variable ("TRACKER_SPARQL_BACKEND");
@@ -206,7 +213,7 @@ class Tracker.Sparql.Backend : Connection {
switch (backend) {
case Backend.AUTO:
try {
- direct = new Tracker.Direct.Connection ();
+ direct = create_readonly_direct ();
} catch (Error e) {
warning ("Falling back to bus backend, the direct backend failed to
initialize: " + e.message);
}
@@ -215,7 +222,7 @@ class Tracker.Sparql.Backend : Connection {
break;
case Backend.DIRECT:
- direct = new Tracker.Direct.Connection ();
+ direct = create_readonly_direct ();
break;
case Backend.BUS:
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]