[tracker/wip/carlosg/domain-ontologies: 21/116] libtracker-direct: Allow creating readwrite databases through direct connection



commit 691b707fb2648052d8840b935db1c539f49f1f15
Author: Carlos Garnacho <carlosg gnome org>
Date:   Sat Jun 3 00:32:34 2017 +0200

    libtracker-direct: Allow creating readwrite databases through direct connection
    
    This will allow creating sparql databases that are private to applications. They
    will be able to get multiple readonly handles, and updates will be handled by
    a single private thread. Pretty much like tracker-store (and the refactor took
    heavy inspiration from it), just sans dbus.
    
    So far, only the current direct connections (i.e. readonly, pointing to
    ~/.cache/tracker) have been replaced.

 src/libtracker-direct/tracker-direct.vala          |  276 ++++++++++++++++----
 src/libtracker-sparql-backend/tracker-backend.vala |   11 +-
 2 files changed, 237 insertions(+), 50 deletions(-)
---
diff --git a/src/libtracker-direct/tracker-direct.vala b/src/libtracker-direct/tracker-direct.vala
index da9feaa..aac1fe5 100644
--- a/src/libtracker-direct/tracker-direct.vala
+++ b/src/libtracker-direct/tracker-direct.vala
@@ -1,5 +1,6 @@
 /*
  * Copyright (C) 2010, Nokia <ivan frade nokia com>
+ * Copyright (C) 2017, Red Hat, Inc.
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -17,76 +18,194 @@
  * Boston, MA  02110-1301, USA.
  */
 
-public class Tracker.Direct.Connection : Tracker.Sparql.Connection {
-       static int use_count;
-       bool initialized;
+public class Tracker.Direct.Connection : Tracker.Sparql.Connection, AsyncInitable, Initable {
+       File? database_loc;
+       File? journal_loc;
+       File? ontology_loc;
+       Sparql.ConnectionFlags flags;
+
+       // Mutex to hold datamanager
        private Mutex mutex = Mutex ();
+       Thread<void*> thread;
 
-       public Connection () throws Sparql.Error, IOError, DBusError {
-               try {
-                       if (use_count == 0) {
-                               // make sure that current locale vs db locale are the same,
-                               // otherwise return an error
-                               Locale.init ();
-                               DBManager.locale_changed ();
+       // Initialization stuff, both sync and async
+       private Mutex init_mutex = Mutex ();
+       private Cond init_cond = Cond ();
+       private bool initialized;
+       private Error init_error;
+       public SourceFunc init_callback;
 
-                               uint select_cache_size = 100;
-                               string env_cache_size = Environment.get_variable 
("TRACKER_SPARQL_CACHE_SIZE");
+       private AsyncQueue<Task> update_queue;
 
-                               if (env_cache_size != null) {
-                                       select_cache_size = int.parse (env_cache_size);
-                               }
+       enum TaskType {
+               QUERY,
+               UPDATE,
+               UPDATE_BLANK,
+               TURTLE,
+       }
 
-                               Data.Manager.init (DBManagerFlags.READONLY | DBManagerFlags.ENABLE_MUTEXES,
-                                                  null /*loc */, null /* domain */ , null /* ontology */,
-                                                  null, false, false, select_cache_size, 0, null, null);
-                       }
+       abstract class Task {
+               public TaskType type;
+               public int priority;
+               public Cancellable? cancellable;
+               public SourceFunc callback;
+               public Error error;
+       }
+
+       private class UpdateTask : Task {
+               public string sparql;
+               public Variant blank_nodes;
+
+               private void set (TaskType type, string sparql, int priority = Priority.DEFAULT, Cancellable? 
cancellable = null) {
+                       this.type = type;
+                       this.sparql = sparql;
+                       this.priority = priority;
+                       this.cancellable = cancellable;
+               }
+
+               public UpdateTask (string sparql, int priority = Priority.DEFAULT, Cancellable? cancellable) {
+                       this.set (TaskType.UPDATE, sparql, priority, cancellable);
+               }
 
-                       use_count++;
-                       initialized = true;
+               public UpdateTask.blank (string sparql, int priority = Priority.DEFAULT, Cancellable? 
cancellable) {
+                       this.set (TaskType.UPDATE_BLANK, sparql, priority, cancellable);
+               }
+       }
+
+       private class TurtleTask : Task {
+               public File file;
+
+               public TurtleTask (File file, Cancellable? cancellable) {
+                       this.type = TaskType.TURTLE;
+                       this.file = file;
+                       this.priority = Priority.DEFAULT;
+                       this.cancellable = cancellable;
+               }
+       }
+
+       static void wal_checkpoint (DBInterface iface, bool blocking) {
+               try {
+                       debug ("Checkpointing database...");
+                       iface.sqlite_wal_checkpoint (blocking);
+                       debug ("Checkpointing complete...");
                } catch (Error e) {
-                       throw new Sparql.Error.INTERNAL (e.message);
+                       warning (e.message);
                }
        }
 
-        public Connection.custom_ontology (File loc, File? journal_loc, File? ontology_loc) throws 
Sparql.Error, IOError, DBusError {
+       static void wal_checkpoint_on_thread () {
+               new Thread<void*> ("wal-checkpoint", () => {
+                       var iface = DBManager.get_db_interface ();
+                       wal_checkpoint (iface, false);
+                       return null;
+               });
+       }
+
+       static void wal_hook (DBInterface iface, int n_pages) {
+               if (n_pages >= 10000) {
+                       // do immediate checkpointing (blocking updates)
+                       // to prevent excessive wal file growth
+                       wal_checkpoint (iface, true);
+               } else if (n_pages >= 1000) {
+                       wal_checkpoint_on_thread ();
+               }
+       }
+
+       private void* thread_func () {
+               init_mutex.lock ();
+
                try {
-                       if (use_count == 0) {
-                               // make sure that current locale vs db locale are the same,
-                               // otherwise return an error
-                               Locale.init ();
-                               DBManager.locale_changed ();
+                       Locale.init ();
+                       DBManagerFlags db_flags = DBManagerFlags.ENABLE_MUTEXES;
+                       if ((flags & Sparql.ConnectionFlags.READONLY) != 0)
+                               db_flags |= DBManagerFlags.READONLY;
+                       Data.Manager.init (db_flags,
+                                          database_loc, journal_loc, ontology_loc,
+                                          null, false, false, 100, 100, null, null);
 
-                               uint select_cache_size = 100;
-                               string env_cache_size = Environment.get_variable 
("TRACKER_SPARQL_CACHE_SIZE");
+                       var iface = DBManager.get_db_interface ();
+                       iface.sqlite_wal_hook (wal_hook);
+               } catch (Error e) {
+                       init_error = e;
+               } finally {
+                       if (init_callback != null) {
+                               init_callback ();
+                       } else {
+                               initialized = true;
+                               init_cond.signal ();
+                               init_mutex.unlock ();
+                       }
+               }
 
-                               if (env_cache_size != null) {
-                                       select_cache_size = int.parse (env_cache_size);
-                               }
+               while (true) {
+                       var task = update_queue.pop();
 
-                               Data.Manager.init (DBManagerFlags.READONLY | DBManagerFlags.ENABLE_MUTEXES,
-                                                  loc, journal_loc, ontology_loc,
-                                                  null, false, false, select_cache_size, 0, null, null);
+                       try {
+                               switch (task.type) {
+                               case TaskType.UPDATE:
+                                       UpdateTask update_task = (UpdateTask) task;
+                                       update (update_task.sparql, update_task.priority, 
update_task.cancellable);
+                                       break;
+                               case TaskType.UPDATE_BLANK:
+                                       UpdateTask update_task = (UpdateTask) task;
+                                       update_task.blank_nodes = update_blank (update_task.sparql, 
update_task.priority, update_task.cancellable);
+                                       break;
+                               case TaskType.TURTLE:
+                                       TurtleTask turtle_task = (TurtleTask) task;
+                                       load (turtle_task.file, turtle_task.cancellable);
+                                       break;
+                               default:
+                                       break;
+                               }
+                       } catch (Error e) {
+                               task.error = e;
                        }
 
-                       use_count++;
-                       initialized = true;
+                       task.callback ();
+               }
+       }
+
+       public async bool init_async (int io_priority, Cancellable? cancellable) throws Error {
+               init_callback = init_async.callback;
+               thread = new Thread<void*> ("database", thread_func);
+
+               return initialized;
+       }
+
+       public bool init (Cancellable? cancellable) throws Error {
+               try {
+                       thread = new Thread<void*> ("database", thread_func);
+
+                       init_mutex.lock ();
+                       while (!initialized)
+                               init_cond.wait(init_mutex);
+                       init_mutex.unlock ();
+
+                       if (init_error != null)
+                               throw init_error;
                } catch (Error e) {
                        throw new Sparql.Error.INTERNAL (e.message);
                }
+
+               return true;
        }
+
+       public Connection (Sparql.ConnectionFlags connection_flags, File loc, File? journal, File? ontology) 
throws Sparql.Error, IOError, DBusError {
+               database_loc = loc;
+               journal_loc = journal;
+               ontology_loc = ontology;
+               flags = connection_flags;
+
+               update_queue = new AsyncQueue<Task> ();
+       }
+
        ~Connection () {
                if (!initialized) {
-                       // use_count did not get increased if initialization failed
                        return;
                }
 
                // Clean up connection
-               use_count--;
-
-               if (use_count == 0) {
-                       Data.Manager.shutdown ();
-               }
+               Data.Manager.shutdown ();
        }
 
        Sparql.Cursor query_unlocked (string sparql) throws Sparql.Error, DBusError {
@@ -125,7 +244,7 @@ public class Tracker.Direct.Connection : Tracker.Sparql.Connection {
                Sparql.Cursor result = null;
                var context = MainContext.get_thread_default ();
 
-               g_io_scheduler_push_job (job => {
+               IOSchedulerJob.push ((job, cancellable) => {
                        try {
                                result = query (sparql, cancellable);
                        } catch (IOError e_io) {
@@ -136,15 +255,14 @@ public class Tracker.Direct.Connection : Tracker.Sparql.Connection {
                                dbus_error = e_dbus;
                        }
 
-                       var source = new IdleSource ();
-                       source.set_callback (() => {
+                       context.invoke (() => {
                                query_async.callback ();
                                return false;
                        });
-                       source.attach (context);
 
                        return false;
                }, Priority.DEFAULT, cancellable);
+
                yield;
 
                if (cancellable != null && cancellable.is_cancelled ()) {
@@ -159,4 +277,66 @@ public class Tracker.Direct.Connection : Tracker.Sparql.Connection {
                        return result;
                }
        }
+
+       public override void update (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? 
cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error {
+               mutex.lock ();
+               try {
+                       Tracker.Data.update_sparql (sparql);
+               } finally {
+                       mutex.unlock ();
+               }
+       }
+
+       public async override void update_async (string sparql, int priority = GLib.Priority.DEFAULT, 
Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error {
+               var task = new UpdateTask (sparql, priority, cancellable);
+               task.callback = update_async.callback;
+               update_queue.push (task);
+               yield;
+
+               if (task.error != null)
+                       throw task.error;
+       }
+
+       public override GLib.Variant? update_blank (string sparql, int priority = GLib.Priority.DEFAULT, 
Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error {
+               GLib.Variant? blank_nodes = null;
+               mutex.lock ();
+               try {
+                       blank_nodes = Tracker.Data.update_sparql_blank (sparql);
+               } finally {
+                       mutex.unlock ();
+               }
+
+               return blank_nodes;
+       }
+
+       public async override GLib.Variant? update_blank_async (string sparql, int priority = 
GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error {
+               var task = new UpdateTask.blank (sparql, priority, cancellable);
+               task.callback = update_blank_async.callback;
+               update_queue.push (task);
+               yield;
+
+               if (task.error != null)
+                       throw task.error;
+
+               return task.blank_nodes;
+       }
+
+       public override void load (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, 
DBusError {
+               mutex.lock ();
+               try {
+                       Tracker.Data.load_turtle_file (file);
+               } finally {
+                       mutex.unlock ();
+               }
+       }
+
+       public async override void load_async (File file, Cancellable? cancellable = null) throws 
Sparql.Error, IOError, DBusError {
+               var task = new TurtleTask (file, cancellable);
+               task.callback = load_async.callback;
+               update_queue.push (task);
+               yield;
+
+               if (task.error != null)
+                       throw new Sparql.Error.INTERNAL (task.error.message);
+       }
 }
diff --git a/src/libtracker-sparql-backend/tracker-backend.vala 
b/src/libtracker-sparql-backend/tracker-backend.vala
index 6d7e982..68eba33 100644
--- a/src/libtracker-sparql-backend/tracker-backend.vala
+++ b/src/libtracker-sparql-backend/tracker-backend.vala
@@ -182,6 +182,13 @@ class Tracker.Sparql.Backend : Connection {
                return yield bus.statistics_async (cancellable);
        }
 
+       private Connection create_readonly_direct () throws GLib.Error, Sparql.Error, IOError, DBusError {
+               File store = File.new_for_path (Path.build_filename (Environment.get_user_cache_dir(), 
"tracker"));
+               var conn = new Tracker.Direct.Connection (Tracker.Sparql.ConnectionFlags.READONLY, store, 
null, null);
+               conn.init ();
+               return conn;
+       }
+
        // Plugin loading functions
        private void load_plugins () throws GLib.Error {
                string env_backend = Environment.get_variable ("TRACKER_SPARQL_BACKEND");
@@ -206,7 +213,7 @@ class Tracker.Sparql.Backend : Connection {
                switch (backend) {
                case Backend.AUTO:
                        try {
-                               direct = new Tracker.Direct.Connection ();
+                               direct = create_readonly_direct ();
                        } catch (Error e) {
                                warning ("Falling back to bus backend, the direct backend failed to 
initialize: " + e.message);
                        }
@@ -215,7 +222,7 @@ class Tracker.Sparql.Backend : Connection {
                        break;
 
                case Backend.DIRECT:
-                       direct = new Tracker.Direct.Connection ();
+                       direct = create_readonly_direct ();
                        break;
 
                case Backend.BUS:


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]