[tracker/sam/tracker-3.0-functional-tests: 1/4] Update functional tests for Tracker 3.0 changes



commit 76cc3e4d758a5422b21aa15687750c7ecd600018
Author: Sam Thursfield <sam afuera me uk>
Date:   Tue Jan 21 00:29:01 2020 +0100

    Update functional tests for Tracker 3.0 changes
    
    This commit updates the functional tests in line with the
    Tracker 3.0 architectural changes.
    
    These tests used to spawn a private tracker-store daemon and communicate
    over D-Bus. Most of the tests now create a local database in-process and
    run tests against that, using the PyGObject bindings to drive
    libtracker-sparql. It's good to test our Python bindings in this way.
    Some tests also test connecting to a database over D-Bus. The 'fixtures'
    module makes it easy to do either of these things.
    
    The 08-unique-insertions test is gone, it claimed that "we can't test
    tracker-miner-fs itself", but we can.
    
    The 14-signals test is now a test of the TrackerNotifier API.
    
    The 15-statistics test is removed as the API that it tests is gone.
    
    Numbers were removed from the filenames of all tests.
    
    It's debatable if the name "functional tests" really applies here any
    more, perhaps a bigger distinction from the "unit tests" which also run
    against libtracker-sparql is that these are written using Python and
    PyGObject instead of C. In tracker-miners.git we will still be testing
    multiple processes so the name will still make sense there.

 meson.build                                        |   1 -
 src/libtracker-sparql-backend/meson.build          |   2 +
 tests/functional-tests/08-unique-insertions.py     |  78 ---
 tests/functional-tests/15-statistics.py            | 126 -----
 .../{05-coalesce.py => coalesce.py}                |   8 +-
 .../{16-collation.py => collation.py}              |   9 +-
 ...{09-concurrent-query.py => concurrent-query.py} |  48 +-
 tests/functional-tests/configuration.json.in       |   6 +-
 tests/functional-tests/configuration.py            |  19 +-
 .../{06-distance.py => distance.py}                |   8 +-
 tests/functional-tests/expectedFailure.py          |  44 --
 tests/functional-tests/fixtures.py                 | 124 +++++
 .../{03-fts-functions.py => fts-functions.py}      |   8 +-
 tests/functional-tests/{07-graph.py => graph.py}   |   9 +-
 .../{04-group-concat.py => group-concat.py}        |   8 +-
 .../{01-insertion.py => insertion.py}              |  96 +---
 tests/functional-tests/ipc/meson.build             |   9 +-
 .../ipc/test-bus-query-cancellation.c              |   1 -
 tests/functional-tests/meson.build                 |  52 +-
 .../{14-signals.py => notifier.py}                 | 153 +++---
 ...{17-ontology-changes.py => ontology-changes.py} |  57 +-
 tests/functional-tests/query.py                    |  73 +++
 .../{02-sparql-bugs.py => sparql-bugs.py}          |   7 +-
 tests/functional-tests/storetest.py                |  64 ---
 tests/functional-tests/trackertest                 |   1 -
 tests/libtracker-sparql/meson.build                |   1 -
 utils/trackertestutils/helpers.py                  | 574 ++++++++++-----------
 utils/trackertestutils/mainloop.py                 |   3 +
 28 files changed, 669 insertions(+), 920 deletions(-)
---
diff --git a/meson.build b/meson.build
index 32bc55bb7..23d3c61b2 100644
--- a/meson.build
+++ b/meson.build
@@ -317,7 +317,6 @@ test_c_args = tracker_c_args + [
 ]
 
 tracker_uninstalled_cli_dir = join_paths(meson.current_build_dir(), 'src', 'tracker')
-tracker_uninstalled_domain_rule = join_paths(meson.current_source_dir(), 'src', 'tracker-store', 
'default.rule')
 tracker_uninstalled_nepomuk_ontologies_dir = join_paths(meson.current_source_dir(), 'src', 'ontologies', 
'nepomuk')
 tracker_uninstalled_stop_words_dir = join_paths(meson.current_source_dir(), 'src', 'libtracker-common', 
'stop-words')
 tracker_uninstalled_testutils_dir = join_paths(meson.current_source_dir(), 'utils')
diff --git a/src/libtracker-sparql-backend/meson.build b/src/libtracker-sparql-backend/meson.build
index ac16a9c79..504f77fa8 100644
--- a/src/libtracker-sparql-backend/meson.build
+++ b/src/libtracker-sparql-backend/meson.build
@@ -59,3 +59,5 @@ tracker_sparql_gir = gnome.generate_gir(libtracker_sparql,
         'libtracker-sparql/tracker-sparql.h',
         '-DTRACKER_COMPILATION',
     ])
+
+tracker_sparql_uninstalled_dir = meson.current_build_dir()
diff --git a/tests/functional-tests/05-coalesce.py b/tests/functional-tests/coalesce.py
similarity index 95%
rename from tests/functional-tests/05-coalesce.py
rename to tests/functional-tests/coalesce.py
index 176ae6b66..e03ac2644 100644
--- a/tests/functional-tests/05-coalesce.py
+++ b/tests/functional-tests/coalesce.py
@@ -16,14 +16,16 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 #
+
 """
-Test tracker:coalesce function in Sparql. Only uses the Store
+Test tracker:coalesce function in SPARQL.
 """
+
 import unittest as ut
-from storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
+import fixtures
 
 
-class TestCoalesce (CommonTrackerStoreTest):
+class TestCoalesce (fixtures.TrackerSparqlDirectTest):
     """
     Insert and instance with some values, and tracker coalesce of some of them
     with different combinations (first NULL, none NULL, all NULL...)
diff --git a/tests/functional-tests/16-collation.py b/tests/functional-tests/collation.py
similarity index 95%
rename from tests/functional-tests/16-collation.py
rename to tests/functional-tests/collation.py
index 962db9640..db662c4d9 100644
--- a/tests/functional-tests/16-collation.py
+++ b/tests/functional-tests/collation.py
@@ -20,17 +20,18 @@
 #
 
 """
-Stand-alone tests cases for the store, checking the collation is working
+Checking the collation is working
 """
+
 import time
 import random
 import locale
 
 import unittest as ut
-from storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
+import fixtures
 
 
-class TrackerStoreCollationTests (CommonTrackerStoreTest):
+class TrackerStoreCollationTests (fixtures.TrackerSparqlDirectTest):
     """
     Insert few instances with a text field containing collation-problematic words.
     Ask for those instances order by the field and check the results.
@@ -47,7 +48,6 @@ class TrackerStoreCollationTests (CommonTrackerStoreTest):
         for uri in self.clean_up_instances:
             self.tracker.update("DELETE { <%s> a rdfs:Resource. }" % (uri))
         self.clean_up_instances = []
-        time.sleep(1)
 
     def __insert_text(self, text):
         uri = "test://collation-01-%d" % (random.randint(1, 1000))
@@ -125,6 +125,7 @@ class TrackerStoreCollationTests (CommonTrackerStoreTest):
         expected = ["a", "ä", "e", "i", "o", "ö", "u"]
         self.__collation_test(input_dt, expected)
 
+
 if __name__ == "__main__":
     print("""
     TODO:
diff --git a/tests/functional-tests/09-concurrent-query.py b/tests/functional-tests/concurrent-query.py
similarity index 78%
rename from tests/functional-tests/09-concurrent-query.py
rename to tests/functional-tests/concurrent-query.py
index 7164babb6..90a0eb80c 100644
--- a/tests/functional-tests/09-concurrent-query.py
+++ b/tests/functional-tests/concurrent-query.py
@@ -19,22 +19,24 @@
 """
 Send concurrent inserts and queries to the daemon to check the concurrency.
 """
+
 from gi.repository import GLib
 
 import unittest as ut
-from storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
+
+import fixtures
+
 
 AMOUNT_OF_TEST_INSTANCES = 100
 AMOUNT_OF_QUERIES = 10
 
 
-class TestConcurrentQuery (CommonTrackerStoreTest):
+class ConcurrentQueryTests():
     """
     Send a bunch of queries to the daemon asynchronously, to test the queue
     holding those queries
     """
-
-    def setUp(self):
+    def test_setup(self):
         self.main_loop = GLib.MainLoop()
 
         self.mock_data_insert()
@@ -65,36 +67,42 @@ class TestConcurrentQuery (CommonTrackerStoreTest):
         QUERY = "SELECT ?u WHERE { ?u a nco:PersonContact. FILTER regex (?u, 'test-09:ins')}"
         UPDATE = "INSERT { <test-09:picture-%d> a nmm:Photo. }"
         for i in range(0, AMOUNT_OF_QUERIES):
-            self.tracker.query(
-                QUERY,
-                result_handler=self.reply_cb,
-                error_handler=self.error_handler)
-            self.tracker.update(
-                UPDATE % (i),
-                result_handler=self.update_cb,
-                error_handler=self.error_handler)
+            self.conn.query_async(QUERY, None, self.query_cb)
+            self.conn.update_async(UPDATE % (i), 0, None, self.update_cb)
 
         # Safeguard of 60 seconds. The last reply should quit the loop
         GLib.timeout_add_seconds(60, self.timeout_cb)
         self.main_loop.run()
 
-    def reply_cb(self, obj, results, data):
+    def query_cb(self, obj, result):
+        cursor = self.conn.query_finish(result)
+
+        rows = 0
+        while cursor.next(): rows += 1
+        self.assertEqual(rows, AMOUNT_OF_TEST_INSTANCES)
+
         self.finish_counter += 1
-        self.assertEqual(len(results), AMOUNT_OF_TEST_INSTANCES)
         if (self.finish_counter >= AMOUNT_OF_QUERIES):
             self.timeout_cb()
 
-    def update_cb(self, obj, results, data):
-        self.assertTrue(True)
-
-    def error_handler(self, obj, e, user_data):
-        print("ERROR in DBus call: %s" % e)
-        raise(e)
+    def update_cb(self, obj, result):
+        self.conn.update_finish(result)
 
     def timeout_cb(self):
         self.mock_data_delete()
         self.main_loop.quit()
         return False
 
+
+class TestConcurrentQueryLocal(fixtures.TrackerSparqlDirectTest, ConcurrentQueryTests):
+    def setUp(self):
+        self.test_setup()
+
+
+class TestConcurrentQueryBus(fixtures.TrackerSparqlBusTest, ConcurrentQueryTests):
+    def setUp(self):
+        self.test_setup()
+
+
 if __name__ == "__main__":
     ut.main(verbosity=2)
diff --git a/tests/functional-tests/configuration.json.in b/tests/functional-tests/configuration.json.in
index d50080141..0b2cd5f9d 100644
--- a/tests/functional-tests/configuration.json.in
+++ b/tests/functional-tests/configuration.json.in
@@ -1,7 +1,3 @@
 {
-    "TEST_DBUS_DAEMON_CONFIG_FILE": "@TEST_DBUS_DAEMON_CONFIG_FILE@",
-    "TEST_DCONF_PROFILE": "@TEST_DCONF_PROFILE@",
-    "TEST_LANGUAGE_STOP_WORDS_DIR": "@TEST_LANGUAGE_STOP_WORDS_DIR@",
-    "TEST_ONTOLOGIES_DIR": "@TEST_ONTOLOGIES_DIR@",
-    "TEST_DOMAIN_ONTOLOGY_RULE": "@TEST_DOMAIN_ONTOLOGY_RULE@"
+    "TEST_ONTOLOGIES_DIR": "@TEST_ONTOLOGIES_DIR@"
 }
diff --git a/tests/functional-tests/configuration.py b/tests/functional-tests/configuration.py
index 1dff3f417..8146ceef4 100644
--- a/tests/functional-tests/configuration.py
+++ b/tests/functional-tests/configuration.py
@@ -25,6 +25,9 @@ import pathlib
 import sys
 
 
+DEFAULT_TIMEOUT = 10
+
+
 if 'TRACKER_FUNCTIONAL_TEST_CONFIG' not in os.environ:
     raise RuntimeError("The TRACKER_FUNCTIONAL_TEST_CONFIG environment "
                        "variable must be set to point to the location of "
@@ -34,20 +37,8 @@ with open(os.environ['TRACKER_FUNCTIONAL_TEST_CONFIG']) as f:
     config = json.load(f)
 
 
-TEST_DBUS_DAEMON_CONFIG_FILE = config['TEST_DBUS_DAEMON_CONFIG_FILE']
-
-
-def test_environment(tmpdir):
-    return {
-        'DCONF_PROFILE': config['TEST_DCONF_PROFILE'],
-        'TRACKER_DB_ONTOLOGIES_DIR': config['TEST_ONTOLOGIES_DIR'],
-        'TRACKER_LANGUAGE_STOP_WORDS_DIR': config['TEST_LANGUAGE_STOP_WORDS_DIR'],
-        'TRACKER_TEST_DOMAIN_ONTOLOGY_RULE': config['TEST_DOMAIN_ONTOLOGY_RULE'],
-        'XDG_CACHE_HOME': os.path.join(tmpdir, 'cache'),
-        'XDG_CONFIG_HOME': os.path.join(tmpdir, 'config'),
-        'XDG_DATA_HOME': os.path.join(tmpdir, 'data'),
-        'XDG_RUNTIME_DIR': os.path.join(tmpdir, 'run'),
-    }
+def ontologies_dir():
+    return config['TEST_ONTOLOGIES_DIR']
 
 
 def get_environment_boolean(variable):
diff --git a/tests/functional-tests/06-distance.py b/tests/functional-tests/distance.py
similarity index 95%
rename from tests/functional-tests/06-distance.py
rename to tests/functional-tests/distance.py
index 42989a946..47e016b3a 100644
--- a/tests/functional-tests/06-distance.py
+++ b/tests/functional-tests/distance.py
@@ -16,18 +16,20 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 #
+
 """
-Test the distance-calculation functions in Sparql. Only requires the Store
+Test the distance-calculation functions in SPARQL.
 """
+
 import unittest as ut
-from storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
+import fixtures
 
 POINT_COORDS = [
     (0, 0), (1, 1), (2, 2), (3, 3), (4, 4)
 ]
 
 
-class TestDistanceFunctions (CommonTrackerStoreTest):
+class TestDistanceFunctions (fixtures.TrackerSparqlDirectTest):
     """
     Insert some points and get the distance between them.
     """
diff --git a/tests/functional-tests/fixtures.py b/tests/functional-tests/fixtures.py
new file mode 100644
index 000000000..e90b4583a
--- /dev/null
+++ b/tests/functional-tests/fixtures.py
@@ -0,0 +1,124 @@
+#
+# Copyright (C) 2010, Nokia <ivan frade nokia com>
+# Copyright (C) 2018-2020, Sam Thursfield <sam afuera me uk>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+#
+
+"""
+Fixtures used by the Tracker functional-tests.
+"""
+
+import gi
+gi.require_version('Tracker', '3.0')
+from gi.repository import Gio, GLib
+from gi.repository import Tracker
+
+import logging
+import os
+import multiprocessing
+import shutil
+import tempfile
+import time
+import unittest as ut
+
+import trackertestutils.helpers
+import configuration as cfg
+
+log = logging.getLogger(__name__)
+
+
+class TrackerSparqlDirectTest(ut.TestCase):
+    """
+    Fixture for tests using a direct (local) connection to a Tracker database.
+    """
+
+    @classmethod
+    def setUpClass(self):
+        self.tmpdir = tempfile.mkdtemp(prefix='tracker-test-')
+
+        try:
+            self.conn = Tracker.SparqlConnection.new(
+                Tracker.SparqlConnectionFlags.NONE,
+                Gio.File.new_for_path(self.tmpdir),
+                Gio.File.new_for_path(cfg.ontologies_dir()),
+                None)
+
+            self.tracker = trackertestutils.helpers.StoreHelper(self.conn)
+        except Exception as e:
+            shutil.rmtree(self.tmpdir, ignore_errors=True)
+            raise
+
+    @classmethod
+    def tearDownClass(self):
+        self.conn.close()
+        shutil.rmtree(self.tmpdir, ignore_errors=True)
+
+
+class TrackerSparqlBusTest (ut.TestCase):
+    """
+    Fixture for tests using a D-Bus connection to a Tracker database.
+
+    The database is managed by a separate subprocess, spawned during the
+    fixture setup.
+    """
+
+    @classmethod
+    def database_process_fn(self, message_queue):
+        # This runs in a separate process and provides a clean Tracker database
+        # exported over D-Bus to the main test process.
+
+        log.info("Started database subprocess")
+        bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
+
+        conn = Tracker.SparqlConnection.new(
+            Tracker.SparqlConnectionFlags.NONE,
+            Gio.File.new_for_path(self.tmpdir),
+            Gio.File.new_for_path(cfg.ontologies_dir()),
+            None)
+
+        endpoint = Tracker.EndpointDBus.new(conn, bus, None, None)
+
+        message_queue.put(bus.get_unique_name())
+
+        loop = GLib.MainLoop.new(None, False)
+        loop.run()
+
+    @classmethod
+    def setUpClass(self):
+        self.tmpdir = tempfile.mkdtemp(prefix='tracker-test-')
+
+        message_queue = multiprocessing.Queue()
+        self.process = multiprocessing.Process(target=self.database_process_fn,
+                                               args=(message_queue,))
+        try:
+            self.process.start()
+            service_name = message_queue.get()
+            log.debug("Got service name: %s", service_name)
+
+            self.conn = Tracker.SparqlConnection.bus_new(service_name, None, None)
+
+            self.tracker = trackertestutils.helpers.StoreHelper(self.conn)
+        except Exception as e:
+            self.process.terminate()
+            shutil.rmtree(self.tmpdir, ignore_errors=True)
+            raise
+
+    @classmethod
+    def tearDownClass(self):
+        self.conn.close()
+        self.process.terminate()
+        shutil.rmtree(self.tmpdir, ignore_errors=True)
diff --git a/tests/functional-tests/03-fts-functions.py b/tests/functional-tests/fts-functions.py
similarity index 94%
rename from tests/functional-tests/03-fts-functions.py
rename to tests/functional-tests/fts-functions.py
index ec23a3ed6..a92a8ff3f 100644
--- a/tests/functional-tests/03-fts-functions.py
+++ b/tests/functional-tests/fts-functions.py
@@ -18,14 +18,14 @@
 #
 
 """
-These tests use only the store. They insert instances with known text
-and run sparql with fts functions to check the results.
+Test the full-text search.
 """
+
 import unittest as ut
-from storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
+import fixtures
 
 
-class TestFTSFunctions (CommonTrackerStoreTest):
+class TestFTSFunctions (fixtures.TrackerSparqlDirectTest):
     """
     Insert data with text and check the fts:xxxx functions are returning the expected results
     """
diff --git a/tests/functional-tests/07-graph.py b/tests/functional-tests/graph.py
similarity index 97%
rename from tests/functional-tests/07-graph.py
rename to tests/functional-tests/graph.py
index c1a6e8697..f24323769 100644
--- a/tests/functional-tests/07-graph.py
+++ b/tests/functional-tests/graph.py
@@ -15,15 +15,16 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
-#
+
 """
-Tests graphs in Sparql. Only requires the store.
+Tests graphs in SPARQL.
 """
+
 import unittest as ut
-from storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
+import fixtures
 
 
-class TestGraphs (CommonTrackerStoreTest):
+class TestGraphs (fixtures.TrackerSparqlDirectTest):
     """
     Insert triplets in different graphs and check the query results asking in
     one specific graph, in all of them and so on.
diff --git a/tests/functional-tests/04-group-concat.py b/tests/functional-tests/group-concat.py
similarity index 94%
rename from tests/functional-tests/04-group-concat.py
rename to tests/functional-tests/group-concat.py
index d36523004..9b471d71c 100644
--- a/tests/functional-tests/04-group-concat.py
+++ b/tests/functional-tests/group-concat.py
@@ -16,14 +16,16 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 #
+
 """
-Test the GROUP_CONCAT function in Sparql. Only requires the store.
+Test the GROUP_CONCAT function in SPARQL.
 """
+
 import unittest as ut
-from storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
+import fixtures
 
 
-class TestGroupConcat (CommonTrackerStoreTest):
+class TestGroupConcat (fixtures.TrackerSparqlDirectTest):
     """
     Insert a multivalued property and request the results in GROUP_CONCAT
     """
diff --git a/tests/functional-tests/01-insertion.py b/tests/functional-tests/insertion.py
similarity index 89%
rename from tests/functional-tests/01-insertion.py
rename to tests/functional-tests/insertion.py
index 8b5b40c80..cd56fa85c 100644
--- a/tests/functional-tests/01-insertion.py
+++ b/tests/functional-tests/insertion.py
@@ -18,19 +18,16 @@
 #
 
 """
-Stand-alone tests cases for the store, inserting, removing information
-in pure sparql and checking that the data is really there
+Test that inserting data in a Tracker database works as expected.
 """
-import sys
-import time
-import random
-import datetime
 
+import random
 import unittest as ut
-from storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
+
+import fixtures
 
 
-class TrackerStoreInsertionTests (CommonTrackerStoreTest):
+class TrackerStoreInsertionTests (fixtures.TrackerSparqlDirectTest):
     """
     Insert single and multiple-valued properties, dates (ok and broken)
     and check the results
@@ -634,7 +631,7 @@ class TrackerStoreInsertionTests (CommonTrackerStoreTest):
             """DELETE { <test://instance-ds3> a rdfs:Resource. }""")
 
 
-class TrackerStoreDeleteTests (CommonTrackerStoreTest):
+class TrackerStoreDeleteTests (fixtures.TrackerSparqlDirectTest):
     """
     Use DELETE in Sparql and check the information is actually removed
     """
@@ -720,76 +717,7 @@ class TrackerStoreDeleteTests (CommonTrackerStoreTest):
         self.assertEqual(after_removal, initial)
 
 
-class TrackerStoreBatchUpdateTest (CommonTrackerStoreTest):
-    """
-    Insert data using the BatchSparqlUpdate method in the store
-    """
-
-    def test_batch_insert_01(self):
-        """
-        batch insertion of 100 contacts:
-        1. insert 100 contacts.
-        2. delete the inserted contacts.
-        """
-        NUMBER_OF_TEST_CONTACTS = 3
-
-        # query no. of existing contacts. (predefined instances in the DB)
-        count_before_insert = self.tracker.count_instances("nco:PersonContact")
-
-        # insert contacts.
-        CONTACT_TEMPLATE = """
-                   <test://instance-contact-%d> a nco:PersonContact ;
-                      nco:nameGiven 'Contact-name %d';
-                      nco:nameFamily 'Contact-family %d';
-                      nie:generator 'test-instance-to-remove' ;
-                      nco:contactUID '%d';
-                      nco:hasPhoneNumber <tel:%s> .
-                """
-
-        global contact_list
-        contact_list = []
-
-        def complete_contact(contact_template):
-            random_phone = "".join([str(random.randint(0, 9))
-                                    for i in range(0, 9)])
-            contact_counter = random.randint(0, 10000)
-
-            # Avoid duplicates
-            while contact_counter in contact_list:
-                contact_counter = random.randint(0, 10000)
-            contact_list.append(contact_counter)
-
-            return contact_template % (contact_counter,
-                                       contact_counter,
-                                       contact_counter,
-                                       contact_counter,
-                                       random_phone)
-
-        contacts = list(
-            map(complete_contact, [CONTACT_TEMPLATE] * NUMBER_OF_TEST_CONTACTS))
-        INSERT_SPARQL = "\n".join(["INSERT {"] + contacts + ["}"])
-        self.tracker.batch_update(INSERT_SPARQL)
-
-        # Check all instances are in
-        count_after_insert = self.tracker.count_instances("nco:PersonContact")
-        self.assertEqual(count_before_insert +
-                         NUMBER_OF_TEST_CONTACTS, count_after_insert)
-
-        """ Delete the inserted contacts """
-        DELETE_SPARQL = """
-                DELETE {
-                  ?x a rdfs:Resource .
-                } WHERE {
-                  ?x a nco:PersonContact ;
-                      nie:generator 'test-instance-to-remove' .
-                }
-                """
-        self.tracker.update(DELETE_SPARQL)
-        count_final = self.tracker.count_instances("nco:PersonContact")
-        self.assertEqual(count_before_insert, count_final)
-
-
-class TrackerStorePhoneNumberTest (CommonTrackerStoreTest):
+class TrackerStorePhoneNumberTest (fixtures.TrackerSparqlDirectTest):
     """
     Tests around phone numbers (maemo specific). Inserting correct/incorrect ones
     and running query to get the contact from the number.
@@ -801,16 +729,6 @@ class TrackerStorePhoneNumberTest (CommonTrackerStoreTest):
         2. Receiving a message  from a contact whose localPhoneNumber is saved.
         3. Query messages from the local phone number
         """
-        PhoneNumber = str(random.randint(0, sys.maxsize))
-        UUID = str(time.time())
-        UUID1 = str(random.randint(0, sys.maxsize))
-        UUID2 = str(random.randint(0, sys.maxsize))
-        localNumber = PhoneNumber[-7:]
-        d = datetime.datetime.now()
-        Received = d.isoformat()
-        ID = int(time.time()) % 1000
-        Given_Name = 'test_GN_' + repr(ID)
-        Family_Name = 'test_FN_' + repr(ID)
 
         INSERT_CONTACT_PHONE = """
                 INSERT {
diff --git a/tests/functional-tests/ipc/meson.build b/tests/functional-tests/ipc/meson.build
index 73b41b3b3..b19d1a1ab 100644
--- a/tests/functional-tests/ipc/meson.build
+++ b/tests/functional-tests/ipc/meson.build
@@ -1,25 +1,18 @@
 functional_ipc_test_c_args = [
-  '-DTEST_DOMAIN_ONTOLOGY_RULE="@0@"'.format(tracker_uninstalled_domain_rule),
   '-DTEST_ONTOLOGIES_DIR="@0@"'.format(tracker_uninstalled_nepomuk_ontologies_dir),
 ]
 
-sandbox_args = ['-m', 'trackertestutils', '--dbus-config', test_dbus_config, '--debug-sandbox', 
'--index-tmpdir', '--']
-
-sandbox_env = environment()
-
 test_env.set('LANG', 'en_GB.utf-8')
 test_env.prepend('PYTHONPATH', tracker_uninstalled_testutils_dir)
 test_env.set('TRACKER_DB_ONTOLOGIES_DIR', tracker_uninstalled_nepomuk_ontologies_dir)
 test_env.set('TRACKER_LANGUAGE_STOP_WORDS_DIR', tracker_uninstalled_stop_words_dir)
-test_env.set('TRACKER_TEST_DOMAIN_ONTOLOGY_RULE', tracker_uninstalled_domain_rule)
 
 bus_query_cancellation_test = executable('test-bus-query-cancellation',
   'test-bus-query-cancellation.c',
   c_args: functional_ipc_test_c_args,
   dependencies: [tracker_common_dep, tracker_sparql_dep])
 
-test('bus-query-cancellation', python,
-  args: sandbox_args + [bus_query_cancellation_test],
+test('bus-query-cancellation', bus_query_cancellation_test,
   env: test_env,
   suite: ['functional', 'ipc'],
   timeout: 60)
diff --git a/tests/functional-tests/ipc/test-bus-query-cancellation.c 
b/tests/functional-tests/ipc/test-bus-query-cancellation.c
index 795887961..def7abdaf 100644
--- a/tests/functional-tests/ipc/test-bus-query-cancellation.c
+++ b/tests/functional-tests/ipc/test-bus-query-cancellation.c
@@ -176,7 +176,6 @@ main (gint argc, gchar **argv)
 
        setlocale (LC_ALL, "");
 
-       g_setenv ("TRACKER_TEST_DOMAIN_ONTOLOGY_RULE", TEST_DOMAIN_ONTOLOGY_RULE, TRUE);
        g_setenv ("TRACKER_DB_ONTOLOGIES_DIR", TEST_ONTOLOGIES_DIR, TRUE);
 
        g_test_init (&argc, &argv, NULL);
diff --git a/tests/functional-tests/meson.build b/tests/functional-tests/meson.build
index f3f3305ca..48f88eeb7 100644
--- a/tests/functional-tests/meson.build
+++ b/tests/functional-tests/meson.build
@@ -1,18 +1,10 @@
 python = find_program('python3')
 
-# Configure functional tests to run completely from source tree.
 testconf = configuration_data()
 
 config_json_full_path = join_paths(meson.current_build_dir(), 'configuration.json')
-dconf_profile_full_path = join_paths(meson.current_source_dir(), 'trackertest')
 
-test_dbus_config = join_paths(build_root, 'tests', 'test-bus.conf')
-
-testconf.set('TEST_DBUS_DAEMON_CONFIG_FILE', test_dbus_config)
-testconf.set('TEST_DCONF_PROFILE', dconf_profile_full_path)
-testconf.set('TEST_DOMAIN_ONTOLOGY_RULE', tracker_uninstalled_domain_rule)
 testconf.set('TEST_ONTOLOGIES_DIR', tracker_uninstalled_nepomuk_ontologies_dir)
-testconf.set('TEST_LANGUAGE_STOP_WORDS_DIR', tracker_uninstalled_stop_words_dir)
 
 config_json = configure_file(
   input: 'configuration.json.in',
@@ -21,43 +13,31 @@ config_json = configure_file(
 )
 
 functional_tests = [
-  '01-insertion',
-  '02-sparql-bugs',
-  '04-group-concat',
-  '05-coalesce',
-  '06-distance',
-  '07-graph',
-  '08-unique-insertions',
-  '09-concurrent-query',
-  '14-signals',
-  '15-statistics',
-  '16-collation',
-  '17-ontology-changes',
+  'insertion',
+  'query',
+  'sparql-bugs',
+  'group-concat',
+  'coalesce',
+  'distance',
+  'graph',
+  'concurrent-query',
+  'notifier',
+  'collation',
+  'ontology-changes',
 ]
 
 if get_option('fts')
-  functional_tests += '03-fts-functions'
+  functional_tests += 'fts-functions'
 endif
 
 test_env = environment()
+test_env.prepend('GI_TYPELIB_PATH', tracker_sparql_uninstalled_dir)
+test_env.prepend('LD_LIBRARY_PATH', tracker_sparql_uninstalled_dir)
 test_env.prepend('PYTHONPATH', tracker_uninstalled_testutils_dir)
 test_env.set('TRACKER_FUNCTIONAL_TEST_CONFIG', config_json_full_path)
 
-foreach t: functional_tests
-  file = '@0@.py'.format(t)
-  test_parts = t.split('-')
-  test_name = t
-  if test_parts.length() > 1
-    parts = []
-    i = 0
-    foreach p: test_parts
-      if i > 0
-        parts += p
-      endif
-      i += 1
-    endforeach
-    test_name = '-'.join(parts)
-  endif
+foreach test_name: functional_tests
+  file = '@0@.py'.format(test_name)
   test(test_name, python,
     args: [file],
     env: test_env,
diff --git a/tests/functional-tests/14-signals.py b/tests/functional-tests/notifier.py
similarity index 51%
rename from tests/functional-tests/14-signals.py
rename to tests/functional-tests/notifier.py
index 944e57a7c..c6265f0d1 100644
--- a/tests/functional-tests/14-signals.py
+++ b/tests/functional-tests/notifier.py
@@ -16,132 +16,104 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 #
+
 """
-Test that after insertion/remove/updates in the store, the signals
-are emitted. Theses tests are not extensive (only few selected signals
-are tested)
+Test change notifications using TrackerNotifier.
 """
 
-from gi.repository import Gio
+import gi
+gi.require_version('Tracker', '3.0')
 from gi.repository import GLib
+from gi.repository import Tracker
 
-import time
+import logging
 import unittest as ut
 
-from storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
-
-
-GRAPH_UPDATED_SIGNAL = "GraphUpdated"
-
-CONTACT_CLASS_URI = "http://www.semanticdesktop.org/ontologies/2007/03/22/nco#PersonContact";
-
-REASONABLE_TIMEOUT = 10  # Time waiting for the signal to be emitted
+import configuration
+import fixtures
+import trackertestutils.mainloop
 
 
-class TrackerStoreSignalsTests (CommonTrackerStoreTest):
-    """
-    Insert/update/remove instances from nco:PersonContact
-    and check that the signals are emitted
+class TrackerNotifierTests():
     """
+    Test cases for TrackerNotifier.
 
-    def setUp(self):
-        self.clean_up_list = []
+    To allow testing with both local and D-Bus connections, this test suite is
+    a mixin, which is combined with different fixtures below.
+    """
 
-        self.loop = GLib.MainLoop()
+    def base_setup(self):
+        self.loop = trackertestutils.mainloop.MainLoop()
         self.timeout_id = 0
 
-        self.bus = self.sandbox.get_connection()
-
-        self.results_classname = None
-        self.results_deletes = None
-        self.results_inserts = None
+        self.results_deletes = []
+        self.results_inserts = []
+        self.results_updates = []
 
-    def tearDown(self):
-        for uri in self.clean_up_list:
-            self.tracker.update("DELETE { <%s> a rdfs:Resource }" % uri)
-
-        self.clean_up_list = []
-
-    def __connect_signal(self):
-        """
-        After connecting to the signal, call self.__wait_for_signal.
-        """
-        self.cb_id = self.bus.signal_subscribe(
-            sender=self.tracker.TRACKER_BUSNAME,
-            interface_name=self.tracker.RESOURCES_IFACE,
-            member=GRAPH_UPDATED_SIGNAL,
-            object_path=self.tracker.TRACKER_OBJ_PATH,
-            arg0=CONTACT_CLASS_URI,
-            flags=Gio.DBusSignalFlags.NONE,
-            callback=self.__signal_received_cb)
+        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.QUERY_URN)
+        self.notifier.connect('events', self.__signal_received_cb)
 
     def __wait_for_signal(self):
         """
         In the callback of the signals, there should be a self.loop.quit ()
         """
         self.timeout_id = GLib.timeout_add_seconds(
-            REASONABLE_TIMEOUT, self.__timeout_on_idle)
-        self.loop.run()
+            configuration.DEFAULT_TIMEOUT, self.__timeout_on_idle)
+        self.loop.run_checked()
 
     def __timeout_on_idle(self):
         self.loop.quit()
-        self.fail("Timeout, the signal never came!")
-
-    def __pretty_print_array(self, array):
-        for g, s, o, p in array:
-            uri, prop, value = self.tracker.query(
-                "SELECT tracker:uri (%s), tracker:uri (%s), tracker:uri (%s) WHERE {}" % (s, o, p))
-            print(" - (", "-".join([g, uri, prop, value]), ")")
+        self.fail("Timeout, the signal never came after %i seconds!" % configuration.DEFAULT_TIMEOUT)
 
-    def __signal_received_cb(self, connection, sender_name, object_path, interface_name, signal_name, 
parameters):
+    def __signal_received_cb(self, notifier, service, graph, events):
         """
         Save the content of the signal and disconnect the callback
         """
-        classname, deletes, inserts = parameters.unpack()
-
-        self.results_classname = classname
-        self.results_deletes = deletes
-        self.results_inserts = inserts
-
-        if (self.timeout_id != 0):
+        for event in events:
+            if event.get_event_type() == Tracker.NotifierEventType.CREATE:
+                self.results_inserts.append(event)
+            elif event.get_event_type() == Tracker.NotifierEventType.UPDATE:
+                self.results_updates.append(event)
+            elif event.get_event_type() == Tracker.NotifierEventType.DELETE:
+                self.results_deletes.append(event)
+
+        if self.timeout_id != 0:
             GLib.source_remove(self.timeout_id)
             self.timeout_id = 0
         self.loop.quit()
-        self.bus.signal_unsubscribe(self.cb_id)
 
     def test_01_insert_contact(self):
-        self.clean_up_list.append("test://signals-contact-add")
         CONTACT = """
         INSERT {
         <test://signals-contact-add> a nco:PersonContact ;
-             nco:nameGiven 'Contact-name added';
-             nco:nameFamily 'Contact-family added';
-             nie:generator 'test-14-signals' ;
-             nco:contactUID '1321321312312';
-             nco:hasPhoneNumber <tel:555555555> .
+                nco:nameGiven 'Contact-name added';
+                nco:nameFamily 'Contact-family added';
+                nie:generator 'test-14-signals' ;
+                nco:contactUID '1321321312312';
+                nco:hasPhoneNumber <tel:555555555> .
         }
         """
-        self.__connect_signal()
         self.tracker.update(CONTACT)
         self.__wait_for_signal()
 
         # validate results
         self.assertEqual(len(self.results_deletes), 0)
-        self.assertEqual(len(self.results_inserts), 6)
+        self.assertEqual(len(self.results_inserts), 0)
+        self.assertEqual(len(self.results_updates), 1)
+        print(self.results_updates[0])
+        assert self.results_updates[0].get_urn() == 'test://signals-contact-add'
 
     def test_02_remove_contact(self):
         CONTACT = """
         INSERT {
-         <test://signals-contact-remove> a nco:PersonContact ;
-             nco:nameGiven 'Contact-name removed';
-             nco:nameFamily 'Contact-family removed'.
+            <test://signals-contact-remove> a nco:PersonContact ;
+                nco:nameGiven 'Contact-name removed';
+                nco:nameFamily 'Contact-family removed'.
         }
         """
-        self.__connect_signal()
         self.tracker.update(CONTACT)
         self.__wait_for_signal()
 
-        self.__connect_signal()
         self.tracker.update ("""
             DELETE { <test://signals-contact-remove> a rdfs:Resource }
             """)
@@ -153,14 +125,10 @@ class TrackerStoreSignalsTests (CommonTrackerStoreTest):
         self.assertEqual(len(self.results_inserts), 0)
 
     def test_03_update_contact(self):
-        self.clean_up_list.append("test://signals-contact-update")
-
-        self.__connect_signal()
         self.tracker.update(
             "INSERT { <test://signals-contact-update> a nco:PersonContact }")
         self.__wait_for_signal()
 
-        self.__connect_signal()
         self.tracker.update(
             "INSERT { <test://signals-contact-update> nco:fullname 'wohoo'}")
         self.__wait_for_signal()
@@ -170,14 +138,10 @@ class TrackerStoreSignalsTests (CommonTrackerStoreTest):
         self.assertEqual(len(self.results_deletes), 0)
 
     def test_04_fullupdate_contact(self):
-        self.clean_up_list.append("test://signals-contact-fullupdate")
-
-        self.__connect_signal()
         self.tracker.update(
             "INSERT { <test://signals-contact-fullupdate> a nco:PersonContact; nco:fullname 'first value' }")
         self.__wait_for_signal()
 
-        self.__connect_signal()
         self.tracker.update ("""
                DELETE { <test://signals-contact-fullupdate> nco:fullname ?x }
                WHERE { <test://signals-contact-fullupdate> a nco:PersonContact; nco:fullname ?x }
@@ -186,8 +150,29 @@ class TrackerStoreSignalsTests (CommonTrackerStoreTest):
                """)
         self.__wait_for_signal()
 
-        self.assertEqual(len(self.results_deletes), 1)
-        self.assertEqual(len(self.results_inserts), 1)
+        self.assertEqual(len(self.results_deletes), 0)
+        self.assertEqual(len(self.results_inserts), 0)
+        self.assertEqual(len(self.results_updates), 2)
+
+
+class TrackerLocalNotifierTest (fixtures.TrackerSparqlDirectTest, TrackerNotifierTests):
+    """
+    Insert/update/remove instances from nco:PersonContact
+    and check that the signals are emitted.
+    """
+
+    def setUp(self):
+        self.base_setup()
+
+
+class TrackerBusNotifierTest (fixtures.TrackerSparqlBusTest, TrackerNotifierTests):
+    """
+    Insert/update/remove instances from nco:PersonContact
+    and check that the signals are emitted.
+    """
+
+    def setUp(self):
+        self.base_setup()
 
 
 if __name__ == "__main__":
diff --git a/tests/functional-tests/17-ontology-changes.py b/tests/functional-tests/ontology-changes.py
similarity index 95%
rename from tests/functional-tests/17-ontology-changes.py
rename to tests/functional-tests/ontology-changes.py
index defa1e431..867159e35 100644
--- a/tests/functional-tests/17-ontology-changes.py
+++ b/tests/functional-tests/ontology-changes.py
@@ -18,11 +18,14 @@
 #
 
 """
-Stand-alone tests cases for the store, booting it with different ontology
-changes and checking if the data is still there.
+Test how the database handles various kinds of schema updates.
 """
 
+import gi
+gi.require_version('Tracker', '3.0')
 from gi.repository import GLib
+from gi.repository import Gio
+from gi.repository import Tracker
 
 import os
 import pathlib
@@ -36,7 +39,6 @@ import trackertestutils.dconf
 import trackertestutils.helpers
 
 import configuration as cfg
-from expectedFailure import expectedFailureJournal
 
 
 RDFS_RANGE = "http://www.w3.org/2000/01/rdf-schema#range";
@@ -67,38 +69,33 @@ class OntologyChangeTestTemplate (ut.TestCase):
     def tearDown(self):
         shutil.rmtree(self.tmpdir, ignore_errors=True)
 
-    def get_ontology_dir(self, param):
-        return str(pathlib.Path(__file__).parent.joinpath('test-ontologies', param))
+    def ontology_path(self, param):
+        return pathlib.Path(__file__).parent.joinpath('test-ontologies', param)
 
     def template_test_ontology_change(self):
         self.set_ontology_dirs()
 
         self.__assert_ontology_dates(self.FIRST_ONTOLOGY_DIR, self.SECOND_ONTOLOGY_DIR)
 
-        extra_env = cfg.test_environment(self.tmpdir)
-        extra_env['LANG'] = 'en_GB.utf8'
-        extra_env['LC_COLLATE'] = 'en_GB.utf8'
-        extra_env['TRACKER_DB_ONTOLOGIES_DIR'] = self.get_ontology_dir(self.FIRST_ONTOLOGY_DIR)
-
-        sandbox1 = trackertestutils.helpers.TrackerDBusSandbox(
-            cfg.TEST_DBUS_DAEMON_CONFIG_FILE, extra_env=extra_env)
-        sandbox1.start()
-
-        self.tracker = trackertestutils.helpers.StoreHelper(sandbox1.get_connection())
-        self.tracker.start_and_wait_for_ready()
+        # Create a local store with the first set of ontologies.
+        conn1 = Tracker.SparqlConnection.new(
+            Tracker.SparqlConnectionFlags.NONE,
+            Gio.File.new_for_path(self.tmpdir),
+            Gio.File.new_for_path(str(self.ontology_path(self.FIRST_ONTOLOGY_DIR))),
+            None)
 
+        self.tracker = trackertestutils.helpers.StoreHelper(conn1)
         self.insert_data()
 
-        sandbox1.stop()
-
-        # Boot the second set of ontologies
-        extra_env['TRACKER_DB_ONTOLOGIES_DIR'] = self.get_ontology_dir(self.SECOND_ONTOLOGY_DIR)
-        sandbox2 = trackertestutils.helpers.TrackerDBusSandbox(
-            cfg.TEST_DBUS_DAEMON_CONFIG_FILE, extra_env=extra_env)
-        sandbox2.start()
+        conn1.close()
 
-        self.tracker = trackertestutils.helpers.StoreHelper(sandbox2.get_connection())
-        self.tracker.start_and_wait_for_ready()
+        # Reopen the local store with the second set of ontologies.
+        conn2 = Tracker.SparqlConnection.new(
+            Tracker.SparqlConnectionFlags.NONE,
+            Gio.File.new_for_path(self.tmpdir),
+            Gio.File.new_for_path(str(self.ontology_path(self.SECOND_ONTOLOGY_DIR))),
+            None)
+        self.tracker = trackertestutils.helpers.StoreHelper(conn2)
 
         self.validate_status()
 
@@ -168,9 +165,9 @@ class OntologyChangeTestTemplate (ut.TestCase):
                         break
 
         first_date = get_ontology_date(
-            os.path.join(self.get_ontology_dir(first), "91-test.ontology"))
+            self.ontology_path(first).joinpath("91-test.ontology"))
         second_date = get_ontology_date(
-            os.path.join(self.get_ontology_dir(second), "91-test.ontology"))
+            self.ontology_path(second).joinpath("91-test.ontology"))
         if first_date >= second_date:
             self.fail("nao:modifiedTime in '%s' is not more recent in the second ontology" % (
                 "91-test.ontology"))
@@ -182,7 +179,6 @@ class PropertyRangeStringToDate (OntologyChangeTestTemplate):
     """
 
     @ut.skip("Fails with: basic-future/91-test.ontology: Unsupported ontology change for 
http://example.org/ns#a_string: can't change rdfs:range (old=http://www.w3.org/2001/XMLSchema#dateTime, 
attempted new=http://www.w3.org/2001/XMLSchema#string)")
-    @expectedFailureJournal()
     def test_property_range_string_to_date(self):
         self.template_test_ontology_change()
 
@@ -212,7 +208,6 @@ class PropertyRangeDateToString (OntologyChangeTestTemplate):
     Change the range of a property from date to string. There shouldn't be any data loss.
     """
 
-    @expectedFailureJournal()
     @ut.skip("fails with: basic-future/91-test.ontology: Unsupported ontology change for 
http://example.org/ns#a_string: can't change rdfs:range (old=http://www.w3.org/2001/XMLSchema#dateTime, 
attempted new=http://www.w3.org/2001/XMLSchema#string)")
     def test_property_range_date_to_string(self):
         self.template_test_ontology_change()
@@ -760,7 +755,6 @@ class SuperclassRemovalTest (OntologyChangeTestTemplate):
     """
     Remove the superclass relation between two classes
     """
-    @expectedFailureJournal()
     @ut.skip("Fails with: Unsupported ontology change for http://example.org/ns#B: can't change 
rdfs:subClassOf (old=-, attempted new=-)")
     def test_superclass_removal(self):
         self.template_test_ontology_change()
@@ -805,7 +799,6 @@ class SuperclassAdditionTest (OntologyChangeTestTemplate):
     Add a superclass to a class with no superclass previously
     """
     @ut.skip("Fails with: basic-future/91-test.ontology: Unsupported ontology change for test:B: can't 
change rdfs:subClassOf (old=-, attempted new=test:A)")
-    @expectedFailureJournal()
     def test_superclass_addition(self):
         self.template_test_ontology_change()
 
@@ -849,7 +842,6 @@ class PropertyPromotionTest (OntologyChangeTestTemplate):
     Move a property to the superclass
     """
     @ut.skip("Fails with: basic-future/91-test.ontology: Unsupported ontology change for test:b_property: 
can't change rdfs:domain (old=test:A, attempted new=test:B)")
-    @expectedFailureJournal()
     def test_property_promotion(self):
         self.template_test_ontology_change()
 
@@ -886,7 +878,6 @@ class PropertyRelegationTest (OntologyChangeTestTemplate):
     Move a property to the subclass
     """
     @ut.skip("Fails")
-    @expectedFailureJournal()
     def test_property_relegation(self):
         self.template_test_ontology_change()
 
diff --git a/tests/functional-tests/query.py b/tests/functional-tests/query.py
new file mode 100644
index 000000000..e9ec3fabd
--- /dev/null
+++ b/tests/functional-tests/query.py
@@ -0,0 +1,73 @@
+# Copyright (C) 2020, Sam Thursfield <sam afuera me uk>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+#
+
+"""
+Test queries using libtracker-sparql.
+"""
+
+import gi
+gi.require_version('Tracker', '3.0')
+from gi.repository import Tracker
+
+import unittest as ut
+
+import fixtures
+# We must import configuration to enable the default logging behaviour.
+import configuration
+
+import trackertestutils.helpers
+
+
+class TrackerQueryTests():
+    """
+    Query test cases for TrackerSparqlConnection.
+
+    To allow testing with both local and D-Bus connections, this test suite is
+    a mixin, which is combined with different fixtures below.
+    """
+
+    def test_row_types(self):
+        """Check the value types returned by TrackerSparqlCursor."""
+
+        CONTACT = """
+        INSERT {
+        <test://test1> a nfo:Document, nfo:FileDataObject ;
+             nie:url <file://test.test> ;
+             nfo:fileSize 1234 .
+        }
+        """
+        self.tracker.update(CONTACT)
+
+        cursor = self.conn.query('SELECT ?url ?filesize { ?url a nfo:FileDataObject ; nfo:fileSize ?filesize 
}')
+
+        cursor.next()
+        assert cursor.get_n_columns() == 2
+        self.assertEqual(cursor.get_value_type(0), Tracker.SparqlValueType.URI)
+        self.assertEqual(cursor.get_value_type(1), Tracker.SparqlValueType.INTEGER)
+
+
+class TrackerLocalQueryTest (fixtures.TrackerSparqlDirectTest, TrackerQueryTests):
+    pass
+
+
+class TrackerBusQueryTest (fixtures.TrackerSparqlBusTest, TrackerQueryTests):
+    pass
+
+
+if __name__ == "__main__":
+    ut.main(verbosity=2)
diff --git a/tests/functional-tests/02-sparql-bugs.py b/tests/functional-tests/sparql-bugs.py
similarity index 98%
rename from tests/functional-tests/02-sparql-bugs.py
rename to tests/functional-tests/sparql-bugs.py
index e312e9109..9d6f527d4 100644
--- a/tests/functional-tests/02-sparql-bugs.py
+++ b/tests/functional-tests/sparql-bugs.py
@@ -18,15 +18,16 @@
 #
 
 """
-Peculiar Sparql behavour reported in bugs
+Peculiar SPARQL behavour reported in bugs
 """
+
 from gi.repository import GLib
 
 import unittest as ut
-from storetest import CommonTrackerStoreTest as CommonTrackerStoreTest
+import fixtures
 
 
-class TrackerStoreSparqlBugsTests (CommonTrackerStoreTest):
+class TrackerStoreSparqlBugsTests (fixtures.TrackerSparqlDirectTest):
 
     def test_01_NB217566_union_exists_filter(self):
         """
diff --git a/tests/libtracker-sparql/meson.build b/tests/libtracker-sparql/meson.build
index 071ed688c..153ce221c 100644
--- a/tests/libtracker-sparql/meson.build
+++ b/tests/libtracker-sparql/meson.build
@@ -1,6 +1,5 @@
 libtracker_sparql_test_c_args = tracker_c_args + [
   '-DTEST',
-  '-DTEST_DOMAIN_ONTOLOGY_RULE="@0@"'.format(tracker_uninstalled_domain_rule),
   '-DTEST_ONTOLOGIES_DIR="@0@"'.format(tracker_uninstalled_nepomuk_ontologies_dir),
 ]
 
diff --git a/utils/trackertestutils/helpers.py b/utils/trackertestutils/helpers.py
index 037eb032c..2fd0d852e 100644
--- a/utils/trackertestutils/helpers.py
+++ b/utils/trackertestutils/helpers.py
@@ -18,10 +18,14 @@
 # 02110-1301, USA.
 #
 
-from gi.repository import Gio
+import gi
+gi.require_version('Tracker', '3.0')
+from gi.repository import Tracker
 from gi.repository import GLib
+from gi.repository import GObject
 
 import atexit
+import dataclasses
 import logging
 import os
 import signal
@@ -34,7 +38,11 @@ from . import psutil_mini as psutil
 log = logging.getLogger(__name__)
 
 
-class GraphUpdateTimeoutException(RuntimeError):
+class AwaitException(RuntimeError):
+    pass
+
+
+class AwaitTimeoutException(AwaitException):
     pass
 
 
@@ -42,7 +50,7 @@ class NoMetadataException (Exception):
     pass
 
 
-REASONABLE_TIMEOUT = 30
+DEFAULT_TIMEOUT = 10
 
 
 _process_list = []
@@ -57,335 +65,319 @@ def _cleanup_processes():
 atexit.register(_cleanup_processes)
 
 
-class StoreHelper():
-    """
-    Helper for testing the tracker-store daemon.
-    """
+@dataclasses.dataclass
+class InsertedResource():
+    """Wraps the 'urn' value returned by await_insert context manager.
 
-    TRACKER_BUSNAME = 'org.freedesktop.Tracker1'
-    TRACKER_OBJ_PATH = '/org/freedesktop/Tracker1/Resources'
-    RESOURCES_IFACE = "org.freedesktop.Tracker1.Resources"
+    We can't return the value directly as we don't know it until the context
+    manager exits.
 
-    TRACKER_BACKUP_OBJ_PATH = "/org/freedesktop/Tracker1/Backup"
-    BACKUP_IFACE = "org.freedesktop.Tracker1.Backup"
+    """
+    urn: str
+    id: int
 
-    TRACKER_STATS_OBJ_PATH = "/org/freedesktop/Tracker1/Statistics"
-    STATS_IFACE = "org.freedesktop.Tracker1.Statistics"
 
-    TRACKER_STATUS_OBJ_PATH = "/org/freedesktop/Tracker1/Status"
-    STATUS_IFACE = "org.freedesktop.Tracker1.Status"
+class await_insert():
+    """Context manager to await data insertion by Tracker miners & extractors.
 
-    def __init__(self, dbus_connection):
-        self.log = logging.getLogger(__name__)
-        self.loop = mainloop.MainLoop()
+    Use like this:
 
-        self.bus = dbus_connection
-        self.graph_updated_handler_id = 0
+        expected = 'a nfo:Document; nie:url <test://url>'
+        with self.tracker.await_update(expected) as resource:
+            # Create or update a file that's indexed by tracker-miner-fs.
+            #
+            # The context manager will not exit from the 'with' block until the
+            # data has been inserted in the store.
 
-        self.resources = Gio.DBusProxy.new_sync(
-            self.bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START_AT_CONSTRUCTION, None,
-            self.TRACKER_BUSNAME, self.TRACKER_OBJ_PATH, self.RESOURCES_IFACE)
+        print(f"Inserted resource with urn: {resource.urn}")
 
-        self.backup_iface = Gio.DBusProxy.new_sync(
-            self.bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START_AT_CONSTRUCTION, None,
-            self.TRACKER_BUSNAME, self.TRACKER_BACKUP_OBJ_PATH, self.BACKUP_IFACE)
+    The function expects an insertion to happen, and will raise an error if the
+    expected data is already present in the store. You can use
+    ensure_resource() if you just want to ensure that some data is present.
 
-        self.stats_iface = Gio.DBusProxy.new_sync(
-            self.bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START_AT_CONSTRUCTION, None,
-            self.TRACKER_BUSNAME, self.TRACKER_STATS_OBJ_PATH, self.STATS_IFACE)
+    """
+    def __init__(self, conn, predicates, timeout=DEFAULT_TIMEOUT,
+                 _check_inserted=True):
+        self.conn = conn
+        self.predicates = predicates
+        self.timeout = timeout
+        self._check_inserted = _check_inserted
 
-        self.status_iface = Gio.DBusProxy.new_sync(
-            self.bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START_AT_CONSTRUCTION, None,
-            self.TRACKER_BUSNAME, self.TRACKER_STATUS_OBJ_PATH, self.STATUS_IFACE)
+        self.loop = mainloop.MainLoop()
+        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+
+        self.result = InsertedResource(None, 0)
+
+    def __enter__(self):
+        log.info("Awaiting insertion of resource with data %s", self.predicates)
+
+        if self._check_inserted:
+            query_check = ' '.join([
+                'SELECT ?urn tracker:id(?urn) '
+                ' WHERE { '
+                '   ?urn a rdfs:Resource ; ',
+                self.predicates,
+                '}'
+            ])
+            cursor = self.conn.query(query_check)
+            if cursor.next():
+                raise AwaitException("Expected data is already present in the store.")
+
+        query_filtered = ' '.join([
+            'SELECT ?urn tracker:id(?urn) '
+            ' WHERE { '
+            '   ?urn a rdfs:Resource ; ',
+            self.predicates,
+            #'   FILTER (tracker:id(?urn) = ~id) '
+            '   . FILTER (tracker:id(?urn) = %s) '
+            '}'
+        ])
+
+        # FIXME: doesn't work with bus backend: https://gitlab.gnome.org/GNOME/tracker/issues/179
+        #stmt = self.conn.query_statement(query, None)
+
+        def match_cb(notifier, service, graph, events):
+            for event in events:
+                if event.get_event_type() in [Tracker.NotifierEventType.CREATE,
+                                              Tracker.NotifierEventType.UPDATE]:
+                    log.debug("Processing %s event for id %s", event.get_event_type(), event.get_id())
+                    #stmt.bind_int('~id', event.get_id())
+                    #cursor = stmt.execute(None)
+                    stmt = query_filtered % event.get_id()
+                    log.debug("Running %s", stmt)
+                    cursor = self.conn.query(stmt)
+
+                    if cursor.next():
+                        self.result.urn = cursor.get_string(0)[0]
+                        self.result.id = cursor.get_integer(1)
+                        log.debug("Query matched! Got urn %s", self.result.urn)
+
+                        self.loop.quit()
+
+        def timeout_cb():
+            log.info("Timeout fired after %s seconds", self.timeout)
+            raise AwaitTimeoutException()
+
+        self.signal_id = self.notifier.connect('events', match_cb)
+        self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+
+        return self.result
+
+    def __exit__(self, etype, evalue, etraceback):
+        if etype is not None:
+            return False
 
-    def start_and_wait_for_ready(self):
-        # The daemon is autostarted as soon as a method is called.
-        #
-        # We set a big timeout to avoid interfering when a daemon is being
-        # interactively debugged.
-        self.log.debug("Calling %s.Wait() method", self.STATUS_IFACE)
-        self.status_iface.call_sync('Wait', None, Gio.DBusCallFlags.NONE, 1000000, None)
-        self.log.debug("Ready")
-
-    def start_watching_updates(self):
-        assert self.graph_updated_handler_id == 0
-
-        self.reset_graph_updates_tracking()
-
-        def signal_handler(proxy, sender_name, signal_name, parameters):
-            if signal_name == 'GraphUpdated':
-                self._graph_updated_cb(*parameters.unpack())
-
-        self.graph_updated_handler_id = self.resources.connect(
-            'g-signal', signal_handler)
-        self.log.debug("Watching for updates from Resources interface")
-
-    def stop_watching_updates(self):
-        if self.graph_updated_handler_id != 0:
-            self.log.debug("No longer watching for updates from Resources interface")
-            self.resources.disconnect(self.graph_updated_handler_id)
-            self.graph_updated_handler_id = 0
-
-    # A system to follow GraphUpdated and make sure all changes are tracked.
-    # This code saves every change notification received, and exposes methods
-    # to await insertion or deletion of a certain resource which first check
-    # the list of events already received and wait for more if the event has
-    # not yet happened.
-
-    def reset_graph_updates_tracking(self):
-        self.class_to_track = None
-        self.inserts_list = []
-        self.deletes_list = []
-        self.inserts_match_function = None
-        self.deletes_match_function = None
-
-    def _graph_updated_timeout_cb(self):
-        raise GraphUpdateTimeoutException()
-
-    def _graph_updated_cb(self, class_name, deletes_list, inserts_list):
-        """
-        Process notifications from tracker-store on resource changes.
-        """
-        exit_loop = False
-
-        if class_name == self.class_to_track:
-            self.log.debug("GraphUpdated for %s: %i deletes, %i inserts", class_name, len(deletes_list), 
len(inserts_list))
-
-            if inserts_list is not None:
-                if self.inserts_match_function is not None:
-                    # The match function will remove matched entries from the list
-                    (exit_loop, inserts_list) = self.inserts_match_function(inserts_list)
-                self.inserts_list += inserts_list
-
-            if not exit_loop and deletes_list is not None:
-                if self.deletes_match_function is not None:
-                    (exit_loop, deletes_list) = self.deletes_match_function(deletes_list)
-                self.deletes_list += deletes_list
-
-            if exit_loop:
-                GLib.source_remove(self.graph_updated_timeout_id)
-                self.graph_updated_timeout_id = 0
-                self.loop.quit()
-        else:
-            self.log.debug("Ignoring GraphUpdated for class %s, currently tracking %s", class_name, 
self.class_to_track)
+        while self.result.urn is None:
+            self.loop.run_checked()
+            log.debug("Got urn %s", self.result.urn)
 
-    def _enable_await_timeout(self):
-        self.graph_updated_timeout_id = GLib.timeout_add_seconds(REASONABLE_TIMEOUT,
-                                                                 self._graph_updated_timeout_cb)
+        GLib.source_remove(self.timeout_id)
+        GObject.signal_handler_disconnect(self.notifier, self.signal_id)
 
-    def await_resource_inserted(self, rdf_class, url=None, title=None, required_property=None):
-        """
-        Block until a resource matching the parameters becomes available
-        """
-        assert (self.inserts_match_function == None)
-        assert (self.class_to_track == None), "Already waiting for resource of type %s" % self.class_to_track
-        assert (self.graph_updated_handler_id != 0), "You must call start_watching_updates() first."
-
-        self.class_to_track = rdf_class
-
-        self.matched_resource_urn = None
-        self.matched_resource_id = None
-
-        self.log.debug("Await new %s (%i existing inserts)", rdf_class, len(self.inserts_list))
-
-        if required_property is not None:
-            required_property_id = self.get_resource_id_by_uri(required_property)
-            self.log.debug("Required property %s id %i", required_property, required_property_id)
-
-        def find_resource_insertion(inserts_list):
-            matched_creation = (self.matched_resource_id is not None)
-            matched_required_property = False
-            remaining_events = []
-
-            # FIXME: this could be done in an easier way: build one query that filters
-            # based on every subject id in inserts_list, and returns the id of the one
-            # that matched :)
-            for insert in inserts_list:
-                id = insert[1]
-
-                if not matched_creation:
-                    where = "  ?urn a <%s> " % rdf_class
-
-                    if url is not None:
-                        where += "; nie:url \"%s\"" % url
-
-                    if title is not None:
-                        where += "; nie:title \"%s\"" % title
-
-                    query = "SELECT ?urn WHERE { %s FILTER (tracker:id(?urn) = %s)}" % (where, insert[1])
-                    result_set = self.query(query)
-
-                    if len(result_set) > 0:
-                        matched_creation = True
-                        self.matched_resource_urn = result_set[0][0]
-                        self.matched_resource_id = insert[1]
-                        self.log.debug("Matched creation of resource %s (%i)",
-                            self.matched_resource_urn,
-                             self.matched_resource_id)
-                        if required_property is not None:
-                            self.log.debug("Waiting for property %s (%i) to be set",
-                                required_property, required_property_id)
-
-                if required_property is not None and matched_creation and not matched_required_property:
-                    if id == self.matched_resource_id and insert[2] == required_property_id:
-                        matched_required_property = True
-                        self.log.debug("Matched %s %s", self.matched_resource_urn, required_property)
-
-                if not matched_creation or id != self.matched_resource_id:
-                    remaining_events += [insert]
-
-            matched = matched_creation if required_property is None else matched_required_property
-            return matched, remaining_events
-
-        def match_cb(inserts_list):
-            matched, remaining_events = find_resource_insertion(inserts_list)
-            exit_loop = matched
-            return exit_loop, remaining_events
-
-        # Check the list of previously received events for matches
-        (existing_match, self.inserts_list) = find_resource_insertion(self.inserts_list)
-
-        if not existing_match:
-            self._enable_await_timeout()
-            self.inserts_match_function = match_cb
-            # Run the event loop until the correct notification arrives
-            try:
-                self.loop.run_checked()
-            except GraphUpdateTimeoutException:
-                raise GraphUpdateTimeoutException("Timeout waiting for resource: class %s, URL %s, title %s" 
% (rdf_class, url, title)) from None
-            self.inserts_match_function = None
-
-        self.class_to_track = None
-        return (self.matched_resource_id, self.matched_resource_urn)
-
-    def await_resource_deleted(self, rdf_class, id):
-        """
-        Block until we are notified of a resources deletion
-        """
-        assert (self.deletes_match_function == None)
-        assert (self.class_to_track == None)
-        assert (self.graph_updated_handler_id != 0), "You must call start_watching_updates() first."
+        return True
 
-        def find_resource_deletion(deletes_list):
-            self.log.debug("find_resource_deletion: looking for %i in %s", id, deletes_list)
 
-            matched = False
-            remaining_events = []
+class await_update():
+    """Context manager to await data updates by Tracker miners & extractors.
 
-            for delete in deletes_list:
-                if delete[1] == id:
-                    matched = True
-                else:
-                    remaining_events += [delete]
+    Use like this:
 
-            return matched, remaining_events
+        before = 'nie:url <test://url1>'
+        after = 'nie:url <test://url2>'
+        with self.tracker.await_update(resource_id, before, after):
+            # Trigger an update of the data.
 
-        def match_cb(deletes_list):
-            matched, remaining_events = find_resource_deletion(deletes_list)
-            exit_loop = matched
-            return exit_loop, remaining_events
+    """
+    def __init__(self, conn, resource_id, before_predicates, after_predicates,
+                 timeout=DEFAULT_TIMEOUT):
+        self.conn = conn
+        self.resource_id = resource_id
+        self.before_predicates = before_predicates
+        self.after_predicates = after_predicates
+        self.timeout = timeout
 
-        self.log.debug("Await deletion of %i (%i existing)", id, len(self.deletes_list))
+        self.loop = mainloop.MainLoop()
+        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+        self.matched = False
+
+    def __enter__(self):
+        log.info("Awaiting update of resource id %s", self.resource_id)
+
+        query_before = ' '.join([
+            'SELECT ?urn tracker:id(?urn) '
+            ' WHERE { '
+            '   ?urn a rdfs:Resource ; ',
+            self.before_predicates,
+            '   . FILTER (tracker:id(?urn) = %s) '
+            '}'
+        ]) % self.resource_id
+        cursor = self.conn.query(query_before)
+        if not cursor.next():
+            raise AwaitException("Expected data is not present in the store.")
+
+        query_after = ' '.join([
+            'SELECT ?urn tracker:id(?urn) '
+            ' WHERE { '
+            '   ?urn a rdfs:Resource ; ',
+            self.after_predicates,
+            '   . FILTER (tracker:id(?urn) = %s) '
+            '}'
+        ]) % self.resource_id
+
+        def match_cb(notifier, service, graph, events):
+            for event in events:
+                if event.get_event_type() == Tracker.NotifierEventType.UPDATE and event.get_id() == 
self.resource_id:
+                    log.debug("Processing %s event for id %s", event.get_event_type(), event.get_id())
+                    log.debug("Running %s", query_after)
+                    cursor = self.conn.query(query_after)
+
+                    if cursor.next():
+                        log.debug("Query matched!")
+                        self.matched = True
+                        self.loop.quit()
+
+        def timeout_cb():
+            log.info("Timeout fired after %s seconds", self.timeout)
+            raise AwaitTimeoutException()
+
+        self.signal_id = self.notifier.connect('events', match_cb)
+        self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+
+    def __exit__(self, etype, evalue, etraceback):
+        if etype is not None:
+            return False
 
-        (existing_match, self.deletes_list) = find_resource_deletion(self.deletes_list)
+        while not self.matched:
+            self.loop.run_checked()
 
-        if not existing_match:
-            self._enable_await_timeout()
-            self.class_to_track = rdf_class
-            self.deletes_match_function = match_cb
-            # Run the event loop until the correct notification arrives
-            try:
-                self.loop.run_checked()
-            except GraphUpdateTimeoutException as e:
-                raise GraphUpdateTimeoutException("Resource %i has not been deleted." % id) from e
-            self.deletes_match_function = None
-            self.class_to_track = None
+        GLib.source_remove(self.timeout_id)
+        GObject.signal_handler_disconnect(self.notifier, self.signal_id)
 
-        return
+        return True
 
-    def await_property_changed(self, rdf_class, subject_id, property_uri):
-        """
-        Block until a property of a resource is updated or inserted.
-        """
-        assert (self.inserts_match_function == None)
-        assert (self.deletes_match_function == None)
-        assert (self.class_to_track == None)
-        assert (self.graph_updated_handler_id != 0), "You must call start_watching_updates() first."
 
-        self.log.debug("Await change to %i %s (%i, %i existing)", subject_id, property_uri, 
len(self.inserts_list), len(self.deletes_list))
+class await_delete():
+    """Context manager to await removal of a resource."""
 
-        self.class_to_track = rdf_class
+    def __init__(self, conn, resource_id, timeout=DEFAULT_TIMEOUT):
+        self.conn = conn
+        self.resource_id = resource_id
+        self.timeout = timeout
 
-        property_id = self.get_resource_id_by_uri(property_uri)
+        self.loop = mainloop.MainLoop()
+        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+        self.matched = False
+
+    def __enter__(self):
+        log.info("Awaiting deletion of resource id %s", self.resource_id)
+
+        query_check = ' '.join([
+            'SELECT ?urn tracker:id(?urn) '
+            ' WHERE { '
+            '   ?urn a rdfs:Resource ; ',
+            '   . FILTER (tracker:id(?urn) = %s) '
+            '}'
+        ])
+        cursor = self.conn.query(query_check % self.resource_id)
+        if not cursor.next():
+            raise AwaitException(
+                "Resource with id %i isn't present in the store.", self.resource_id)
+
+        def match_cb(notifier, service, graph, events):
+            for event in events:
+                if event.get_event_type() == Tracker.NotifierEventType.DELETE:
+                    log.debug("Received %s event for id %s", event.get_event_type(), event.get_id())
+                    if event.get_id() == self.resource_id:
+                        log.debug("Matched expected id %s", self.resource_id)
+                        self.matched = True
+                        self.loop.quit()
+                else:
+                    log.debug("Received %s event for id %s", event.get_event_type(), event.get_id())
 
-        def find_property_change(event_list):
-            matched = False
-            remaining_events = []
+        def timeout_cb():
+            log.info("Timeout fired after %s seconds", self.timeout)
+            raise AwaitTimeoutException()
 
-            for event in event_list:
-                if event[1] == subject_id and event[2] == property_id:
-                    self.log.debug("Matched property change: %s", str(event))
-                    matched = True
-                else:
-                    remaining_events += [event]
+        self.signal_id = self.notifier.connect('events', match_cb)
+        self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
 
-            return matched, remaining_events
+        return None
 
-        def match_cb(event_list):
-            matched, remaining_events = find_property_change(event_list)
-            exit_loop = matched
-            return exit_loop, remaining_events
+    def __exit__(self, etype, evalue, etraceback):
+        if etype is not None:
+            return False
 
-        # Check the list of previously received events for matches
-        (existing_match, self.inserts_list) = find_property_change(self.inserts_list)
-        if not existing_match:
-            (existing_match, self.deletes_list) = find_property_change(self.deletes_list)
+        while not self.matched:
+            self.loop.run_checked()
 
-        if not existing_match:
-            self._enable_await_timeout()
-            self.inserts_match_function = match_cb
-            self.deletes_match_function = match_cb
-            # Run the event loop until the correct notification arrives
-            try:
-                self.loop.run_checked()
-            except GraphUpdateTimeoutException:
-                raise GraphUpdateTimeoutException(
-                    "Timeout waiting for property change, subject %i property %s (%i)" % (subject_id, 
property_uri, property_id))
+        GLib.source_remove(self.timeout_id)
+        GObject.signal_handler_disconnect(self.notifier, self.signal_id)
 
-        self.inserts_match_function = None
-        self.deletes_match_function = None
-        self.class_to_track = None
+        return True
 
-    # Note: The methods below call the tracker-store D-Bus API directly. This
-    # is useful for testing this API surface, but we recommand that all regular
-    # applications use libtracker-sparql library to talk to the database.
 
-    def query(self, query, **kwargs):
-        return self.resources.SparqlQuery('(s)', query, **kwargs)
+class StoreHelper():
+    """
+    Helper for testing database access with libtracker-sparql.
+    """
 
-    def update(self, update_sparql, **kwargs):
-        return self.resources.SparqlUpdate('(s)', update_sparql, **kwargs)
+    def __init__(self, conn):
+        self.log = logging.getLogger(__name__)
+        self.loop = mainloop.MainLoop()
 
-    def load(self, ttl_uri, **kwargs):
-        return self.resources.Load('(s)', ttl_uri, **kwargs)
+        self.conn = conn
 
-    def batch_update(self, update_sparql, **kwargs):
-        return self.resources.BatchSparqlUpdate('(s)', update_sparql, **kwargs)
+    def await_insert(self, predicates, timeout=DEFAULT_TIMEOUT):
+        """Context manager that blocks until a resource is inserted."""
+        return await_insert(self.conn, predicates, timeout)
 
-    def batch_commit(self, **kwargs):
-        return self.resources.BatchCommit(**kwargs)
+    def await_update(self, resource_id, before_predicates, after_predicates,
+                     timeout=DEFAULT_TIMEOUT):
+        """Context manager that blocks until a resource is updated."""
+        return await_update(self.conn, resource_id, before_predicates,
+                            after_predicates, timeout)
 
-    def backup(self, backup_file, **kwargs):
-        return self.backup_iface.Save('(s)', backup_file, **kwargs)
+    def await_delete(self, resource_id, timeout=DEFAULT_TIMEOUT):
+        """Context manager that blocks until a resource is deleted."""
+        return await_delete(self.conn, resource_id, timeout)
 
-    def restore(self, backup_file, **kwargs):
-        return self.backup_iface.Restore('(s)', backup_file, **kwargs)
+    def ensure_resource(self, predicates, timeout=DEFAULT_TIMEOUT):
+        """Ensure that a resource matching 'predicates' exists.
 
-    def get_stats(self, **kwargs):
-        return self.stats_iface.Get(**kwargs)
+        This function will block if the resource is not yet created.
 
-    def get_tracker_iface(self):
-        return self.resources
+        """
+        await_ctx_mgr = await_insert(self.conn, predicates, timeout, _check_inserted=False)
+        with await_ctx_mgr as resource:
+            # Check if the data was committed *before* the function was called.
+            query_initial = ' '.join([
+                'SELECT ?urn tracker:id(?urn) '
+                ' WHERE { '
+                '   ?urn a rdfs:Resource ; ',
+                predicates,
+                '}'
+            ])
+
+            log.debug("Running: %s", query_initial)
+            cursor = self.conn.query(query_initial)
+            if cursor.next():
+                resource.urn = cursor.get_string(0)[0]
+                resource.id = cursor.get_integer(1)
+                return resource
+        return resource
+
+    def query(self, query):
+        cursor = self.conn.query(query, None)
+        result = []
+        while cursor.next():
+            row = []
+            for i in range(0, cursor.get_n_columns()):
+                row.append(cursor.get_string(i)[0])
+            result.append(row)
+        return result
+
+    def update(self, update_sparql):
+        self.conn.update(update_sparql, 0, None)
 
     def count_instances(self, ontology_class):
         QUERY = """
@@ -393,7 +385,7 @@ class StoreHelper():
             ?u a %s .
         }
         """
-        result = self.resources.SparqlQuery('(s)', QUERY % (ontology_class))
+        result = self.query(QUERY % ontology_class)
 
         if (len(result) == 1):
             return int(result[0][0])
@@ -405,7 +397,7 @@ class StoreHelper():
         Get the internal ID for a given resource, identified by URI.
         """
         result = self.query(
-            'SELECT tracker:id(%s) WHERE { }' % uri)
+            'SELECT tracker:id(<%s>) WHERE { }' % uri)
         if len(result) == 1:
             return int(result[0][0])
         elif len(result) == 0:
diff --git a/utils/trackertestutils/mainloop.py b/utils/trackertestutils/mainloop.py
index 1e7a46c87..74484f456 100644
--- a/utils/trackertestutils/mainloop.py
+++ b/utils/trackertestutils/mainloop.py
@@ -37,6 +37,9 @@ class MainLoop():
     def quit(self):
         self._loop.quit()
 
+    def run(self):
+        raise NotImplemented("Use .run_checked() to ensure correct exception handling.")
+
     def run_checked(self):
         '''Run the loop until quit(), then raise any unhandled exception.'''
         self._exception = None


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]