[tracker/sam/umockdev: 1/3] trackertestutils: Split helpers.py into dbussandbox and storehelper



commit 456361c5a38ed38e745634f51314db4003d1868a
Author: Sam Thursfield <sam afuera me uk>
Date:   Wed May 20 21:37:48 2020 +0200

    trackertestutils: Split helpers.py into dbussandbox and storehelper
    
    These two modules aren't related, it's cleaner to split them.

 utils/trackertestutils/dbussandbox.py | 131 +++++++
 utils/trackertestutils/helpers.py     | 627 +---------------------------------
 utils/trackertestutils/storehelper.py | 533 +++++++++++++++++++++++++++++
 3 files changed, 669 insertions(+), 622 deletions(-)
---
diff --git a/utils/trackertestutils/dbussandbox.py b/utils/trackertestutils/dbussandbox.py
new file mode 100644
index 000000000..01dcc35ed
--- /dev/null
+++ b/utils/trackertestutils/dbussandbox.py
@@ -0,0 +1,131 @@
+# Copyright (C) 2010, Nokia <jean-luc lamadon nokia com>
+# Copyright (C) 2019, Sam Thursfield <sam afuera me uk>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+#
+
+"""
+Sandbox environment for running tests.
+
+The sandbox is essentially a private D-Bus daemon.
+"""
+
+import atexit
+import logging
+import os
+import signal
+
+from . import dbusdaemon
+from . import psutil_mini as psutil
+
+log = logging.getLogger(__name__)
+
+TRACKER_DBUS_PREFIX = 'org.freedesktop.Tracker3'
+TRACKER_MINER_FS_BUSNAME = 'org.freedesktop.Tracker3.Miner.Files'
+
+_process_list = []
+
+
+def _cleanup_processes():
+    for process in _process_list:
+        log.debug("helpers._cleanup_processes: stopping %s", process)
+        process.stop()
+
+
+atexit.register(_cleanup_processes)
+
+
+class TrackerDBusSandbox:
+    """
+    Private D-Bus session bus which executes a sandboxed Tracker instance.
+
+    """
+    def __init__(self, dbus_daemon_config_file, extra_env=None):
+        self.dbus_daemon_config_file = dbus_daemon_config_file
+        self.extra_env = extra_env or {}
+
+        self.daemon = dbusdaemon.DBusDaemon()
+
+    def start(self, new_session=False):
+        env = os.environ
+        env.update(self.extra_env)
+        env['G_MESSAGES_PREFIXED'] = 'all'
+
+        # This avoids an issue where gvfsd-fuse can start up while the bus is
+        # shutting down. If it fails to connect to the bus, it continues to
+        # run anyway which leads to our dbus-daemon failing to shut down.
+        #
+        # Since https://gitlab.gnome.org/GNOME/gvfs/issues/323 was implemented
+        # in GVFS 1.42 this problem may have gone away.
+        env['GVFS_DISABLE_FUSE'] = '1'
+
+        # Precreate runtime dir, to avoid this warning from dbus-daemon:
+        #
+        #    Unable to set up transient service directory: XDG_RUNTIME_DIR 
"/home/sam/tracker-tests/tmp_59i3ev1/run" not available: No such file or directory
+        #
+        xdg_runtime_dir = env.get('XDG_RUNTIME_DIR')
+        if xdg_runtime_dir:
+            os.makedirs(xdg_runtime_dir, exist_ok=True)
+
+        log.info("Starting D-Bus daemon for sandbox.")
+        log.debug("Added environment variables: %s", self.extra_env)
+        self.daemon.start(self.dbus_daemon_config_file, env=env, new_session=new_session)
+
+    def stop(self):
+        tracker_processes = []
+
+        log.info("Looking for active Tracker processes on the bus")
+        for busname in self.daemon.list_names_sync():
+            if busname.startswith(TRACKER_DBUS_PREFIX):
+                pid = self.daemon.get_connection_unix_process_id_sync(busname)
+                if pid is not None:
+                    tracker_processes.append(pid)
+
+        log.info("Terminating %i Tracker processes", len(tracker_processes))
+        for pid in tracker_processes:
+            os.kill(pid, signal.SIGTERM)
+
+        log.info("Waiting for %i Tracker processes", len(tracker_processes))
+        for pid in tracker_processes:
+            psutil.wait_pid(pid)
+
+        # We need to wait until Tracker processes have stopped before we
+        # terminate the D-Bus daemon, otherwise lots of criticals like this
+        # appear in the log output:
+        #
+        #  (tracker-miner-fs:14955): GLib-GIO-CRITICAL **: 11:38:40.386: Error  while sending AddMatch() 
message: The connection is closed
+
+        log.info("Stopping D-Bus daemon for sandbox.")
+        self.daemon.stop()
+
+    def stop_daemon(self, busname):
+        """Stops the daemon that owns 'busname'.
+
+        This can be used if you want to force the miner-fs to exit, for
+        example.
+
+        """
+        log.info("Stopping daemon process that owns %s.", busname)
+        pid = self.daemon.get_connection_unix_process_id_sync(busname)
+        if pid:
+            os.kill(pid, signal.SIGTERM)
+            psutil.wait_pid(pid)
+            log.info("Process %i has stopped.", pid)
+        else:
+            log.info("Couldn't find a process owning %s.", busname)
+
+    def get_connection(self):
+        return self.daemon.get_connection()
diff --git a/utils/trackertestutils/helpers.py b/utils/trackertestutils/helpers.py
index 0e69dd197..9e03a0dea 100644
--- a/utils/trackertestutils/helpers.py
+++ b/utils/trackertestutils/helpers.py
@@ -1,6 +1,4 @@
-#
-# Copyright (C) 2010, Nokia <jean-luc lamadon nokia com>
-# Copyright (C) 2019, Sam Thursfield <sam afuera me uk>
+# Copyright (C) 2020, Sam Thursfield <sam afuera me uk>
 #
 # This program is free software; you can redistribute it and/or
 # modify it under the terms of the GNU General Public License
@@ -16,623 +14,8 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
-#
-
-import gi
-gi.require_version('Tracker', '3.0')
-from gi.repository import Tracker
-from gi.repository import GLib
-from gi.repository import GObject
-
-import atexit
-import dataclasses
-import logging
-import os
-import signal
-
-from . import dbusdaemon
-from . import mainloop
-from . import psutil_mini as psutil
-
-
-log = logging.getLogger(__name__)
-
-
-TRACKER_DBUS_PREFIX = 'org.freedesktop.Tracker3'
-TRACKER_MINER_FS_BUSNAME = 'org.freedesktop.Tracker3.Miner.Files'
-
-
-class AwaitException(RuntimeError):
-    pass
-
-
-class AwaitTimeoutException(AwaitException):
-    pass
-
-
-class NoMetadataException (Exception):
-    pass
-
-
-DEFAULT_TIMEOUT = 10
-
-
-_process_list = []
-
-
-def _cleanup_processes():
-    for process in _process_list:
-        log.debug("helpers._cleanup_processes: stopping %s", process)
-        process.stop()
-
-
-atexit.register(_cleanup_processes)
-
-
-@dataclasses.dataclass
-class InsertedResource():
-    """Wraps the 'urn' value returned by await_insert context manager.
-
-    We can't return the value directly as we don't know it until the context
-    manager exits.
-
-    """
-    urn: str
-    id: int
-
-
-class await_insert():
-    """Context manager to await data insertion by Tracker miners & extractors.
-
-    Use like this:
-
-        expected = 'a nfo:Document; nie:isStoredAs <test://url>'
-        with self.tracker.await_update(DOCUMENTS_GRAPH, expected) as resource:
-            # Create or update a file that's indexed by tracker-miner-fs.
-            #
-            # The context manager will not exit from the 'with' block until the
-            # data has been inserted in the store.
-
-        print(f"Inserted resource with urn: {resource.urn}")
-
-    The function expects an insertion to happen, and will raise an error if the
-    expected data is already present in the store. You can use
-    ensure_resource() if you just want to ensure that some data is present.
-
-    """
-    def __init__(self, conn, graph, predicates, timeout=DEFAULT_TIMEOUT,
-                 _check_inserted=True):
-        self.conn = conn
-        self.graph = graph
-        self.predicates = predicates
-        self.timeout = timeout
-        self._check_inserted = _check_inserted
-
-        self.loop = mainloop.MainLoop()
-        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
-
-        self.result = InsertedResource(None, 0)
-
-    def __enter__(self):
-        log.info("Awaiting insertion of resource with data %s", self.predicates)
-
-        if self._check_inserted:
-            query_check = ' '.join([
-                'SELECT ?urn tracker:id(?urn) ',
-                f' FROM NAMED <{self.graph}> ',
-                ' WHERE { ',
-                '   ?urn a rdfs:Resource ; ',
-                self.predicates,
-                '}'
-            ])
-            cursor = self.conn.query(query_check)
-            if cursor.next():
-                raise AwaitException("Expected data is already present in the store.")
-
-        query_filtered = ' '.join([
-            'SELECT ?urn tracker:id(?urn) ',
-            ' FROM NAMED <%s> ',
-            ' WHERE { ',
-            '   ?urn a rdfs:Resource ; ',
-            self.predicates,
-            #'   FILTER (tracker:id(?urn) = ~id) '
-            '   . FILTER (tracker:id(?urn) = %s) '
-            '}'
-        ])
-
-        # FIXME: doesn't work with bus backend: https://gitlab.gnome.org/GNOME/tracker/issues/179
-        #stmt = self.conn.query_statement(query, None)
-
-        def match_cb(notifier, service, graph, events):
-            for event in events:
-                if event.get_event_type() in [Tracker.NotifierEventType.CREATE,
-                                              Tracker.NotifierEventType.UPDATE]:
-                    log.debug("Processing %s event for id %s", event.get_event_type().value_nick, 
event.get_id())
-                    #stmt.bind_int('~id', event.get_id())
-                    #cursor = stmt.execute(None)
-                    stmt = query_filtered % (self.graph, event.get_id())
-                    cursor = self.conn.query(stmt)
-
-                    if cursor.next():
-                        self.result.urn = cursor.get_string(0)[0]
-                        self.result.id = cursor.get_integer(1)
-                        log.debug("Query matched! Got urn %s", self.result.urn)
-
-                        self.loop.quit()
-
-        def timeout_cb():
-            log.info("Timeout fired after %s seconds", self.timeout)
-            raise AwaitTimeoutException(
-                f"Timeout awaiting insert of resource matching: {self.predicates}")
-
-        self.signal_id = self.notifier.connect('events', match_cb)
-        self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
-
-        return self.result
-
-    def __exit__(self, etype, evalue, etraceback):
-        if etype is not None:
-            return False
-
-        while self.result.urn is None:
-            self.loop.run_checked()
-            log.debug("Got urn %s", self.result.urn)
-
-        GLib.source_remove(self.timeout_id)
-        GObject.signal_handler_disconnect(self.notifier, self.signal_id)
-
-        return True
-
-
-class await_property_update():
-    """Context manager to await data updates by Tracker miners & extractors.
-
-    Use like this:
-
-        before = 'nie:isStoredAs <test://url1>'
-        after = 'nie:isStoredAs <test://url2>'
-        with self.tracker.await_property_update(resource_id, before, after):
-            # Trigger an update of the data.
-
-    """
-    def __init__(self, conn, graph, resource_id, before_predicates, after_predicates,
-                 timeout=DEFAULT_TIMEOUT):
-        self.conn = conn
-        self.graph = graph
-        self.resource_id = resource_id
-        self.before_predicates = before_predicates
-        self.after_predicates = after_predicates
-        self.timeout = timeout
-
-        self.loop = mainloop.MainLoop()
-        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
-        self.matched = False
-
-    def __enter__(self):
-        log.info("Awaiting update of resource id %s", self.resource_id)
-
-        query_before = ' '.join([
-            'SELECT ?urn tracker:id(?urn) ',
-            ' FROM NAMED <%s> ',
-            ' WHERE { ',
-            '   ?urn a rdfs:Resource ; ',
-            self.before_predicates,
-            '   . FILTER (tracker:id(?urn) = %s) '
-            '}'
-        ]) % (self.graph, self.resource_id)
-
-        cursor = self.conn.query(query_before)
-        if not cursor.next():
-            raise AwaitException("Expected data is not present in the store.")
-
-        query_after = ' '.join([
-            'SELECT ?urn tracker:id(?urn) '
-            ' FROM NAMED <%s> ',
-            ' WHERE { '
-           '   ?urn a rdfs:Resource ; ',
-            self.after_predicates,
-            '   . FILTER (tracker:id(?urn) = %s) '
-            '}'
-        ]) % (self.graph, self.resource_id)
-
-        def match_cb(notifier, service, graph, events):
-            for event in events:
-                if event.get_event_type() == Tracker.NotifierEventType.UPDATE and event.get_id() == 
self.resource_id:
-                    log.debug("Processing %s event for id %s", event.get_event_type(), event.get_id())
-                    cursor = self.conn.query(query_after)
-
-                    if cursor.next():
-                        log.debug("Query matched!")
-                        self.matched = True
-                        self.loop.quit()
-
-        def timeout_cb():
-            log.info("Timeout fired after %s seconds", self.timeout)
-            raise AwaitTimeoutException(
-                f"Timeout awaiting update of resource {self.resource_id} "
-                f"matching: {self.after_predicates}")
-
-        self.signal_id = self.notifier.connect('events', match_cb)
-        self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
-
-    def __exit__(self, etype, evalue, etraceback):
-        if etype is not None:
-            return False
-        while not self.matched:
-            self.loop.run_checked()
-
-        GLib.source_remove(self.timeout_id)
-        GObject.signal_handler_disconnect(self.notifier, self.signal_id)
-
-        return True
-
-
-class await_content_update():
-    """Context manager to await updates to file contents.
-
-    When a file is updated, the old information it contained is deleted from
-    the store, and the new information is inserted as a new resource.
-
-    """
-    def __init__(self, conn, graph, before_resource_id, before_predicates, after_predicates,
-                 timeout=DEFAULT_TIMEOUT):
-        self.conn = conn
-        self.graph = graph
-        self.before_resource_id = before_resource_id
-        self.before_predicates = before_predicates
-        self.after_predicates = after_predicates
-        self.timeout = timeout
-
-        self.loop = mainloop.MainLoop()
-        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
-        self.matched = False
-
-        self.result = InsertedResource(None, 0)
-
-    def __enter__(self):
-        log.info("Awaiting delete of resource id %s and creation of a new one", self.before_resource_id)
-
-        query_before = ' '.join([
-            'SELECT nie:isStoredAs(?urn) ?urn tracker:id(?urn) '
-            ' FROM NAMED <%s> ',
-            ' WHERE { '
-            '   ?urn a rdfs:Resource ; ',
-            self.before_predicates,
-            '   . FILTER (tracker:id(?urn) = %s) '
-            '}'
-        ]) % (self.graph, self.before_resource_id)
-        cursor = self.conn.query(query_before)
-        if not cursor.next():
-            raise AwaitException("Expected data is not present in the store.")
-        file_url = cursor.get_string(0)[0]
-
-        query_after = ' '.join([
-            'SELECT ?urn tracker:id(?urn) '
-            ' FROM NAMED <%s> ',
-            ' WHERE { '
-            '   ?urn a rdfs:Resource ; ',
-            '      nie:isStoredAs <%s> ; ',
-            self.after_predicates,
-            '}'
-        ]) % (self.graph, file_url)
-
-        # When a file is updated, the DataObject representing the file gets
-        # an UPDATE notification. The InformationElement representing the
-        # content gets a DELETE and CREATE notification, because it is
-        # deleted and recreated. We detect the latter situation.
-
-        self.matched_delete = False
-        def match_cb(notifier, service, graph, events):
-            for event in events:
-                log.debug("Received %s event for id %s", event.get_event_type().value_nick, event.get_id())
-                if event.get_id() == self.before_resource_id and event.get_event_type() == 
Tracker.NotifierEventType.DELETE:
-                    log.debug("  Matched delete")
-                    self.matched_delete = True
-
-                # The after_predicates may match after the miner-fs creates
-                # the new resource, or they may only match once the extractor
-                # processes the resource. The latter will be an UPDATE event.
-                elif self.matched_delete and event.get_event_type() in [Tracker.NotifierEventType.CREATE, 
Tracker.NotifierEventType.UPDATE]:
-                    cursor = self.conn.query(query_after)
-
-                    if cursor.next():
-                        self.result.urn = cursor.get_string(0)[0]
-                        self.result.id = cursor.get_integer(1)
-                        log.debug("Query matched! Got new urn %s", self.result.urn)
-
-                        self.matched = True
-                        self.loop.quit()
-
-        def timeout_cb():
-            log.info("Timeout fired after %s seconds", self.timeout)
-            raise AwaitTimeoutException(
-                f"Timeout awaiting update of resource {self.before_resource_id} "
-                f"with URL {file_url} matching: {self.after_predicates}")
-
-        self.signal_id = self.notifier.connect('events', match_cb)
-        self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
-
-        return self.result
-
-    def __exit__(self, etype, evalue, etraceback):
-        if etype is not None:
-            return False
-
-        while not self.matched:
-            self.loop.run_checked()
-
-        GLib.source_remove(self.timeout_id)
-        GObject.signal_handler_disconnect(self.notifier, self.signal_id)
-
-        return True
-
-
-class await_delete():
-    """Context manager to await removal of a resource."""
-
-    def __init__(self, conn, graph, resource_id, timeout=DEFAULT_TIMEOUT):
-        self.conn = conn
-        self.graph = graph
-        self.resource_id = resource_id
-        self.timeout = timeout
-
-        self.loop = mainloop.MainLoop()
-        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
-        self.matched = False
-
-    def __enter__(self):
-        log.info("Awaiting deletion of resource id %s", self.resource_id)
-
-        query_check = ' '.join([
-            'SELECT ?urn tracker:id(?urn) ',
-            'FROM NAMED <%s> ',
-            ' WHERE { ',
-            '   ?urn a rdfs:Resource ; ',
-            '   . FILTER (tracker:id(?urn) = %s) '
-            '}'
-        ])
-        cursor = self.conn.query(query_check % (self.graph, self.resource_id))
-        if not cursor.next():
-            raise AwaitException(
-                "Resource with id %i isn't present in the store.", self.resource_id)
-
-        def match_cb(notifier, service, graph, events):
-            for event in events:
-                if event.get_event_type() == Tracker.NotifierEventType.DELETE:
-                    log.debug("Received %s event for id %s", event.get_event_type(), event.get_id())
-                    if event.get_id() == self.resource_id:
-                        log.debug("Matched expected id %s", self.resource_id)
-                        self.matched = True
-                        self.loop.quit()
-                else:
-                    log.debug("Received %s event for id %s", event.get_event_type(), event.get_id())
-
-        def timeout_cb():
-            log.info("Timeout fired after %s seconds", self.timeout)
-            raise AwaitTimeoutException(
-                f"Timeout awaiting removal of resource {self.resource_id} ")
-
-        self.signal_id = self.notifier.connect('events', match_cb)
-        self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
-
-        return None
-
-    def __exit__(self, etype, evalue, etraceback):
-        if etype is not None:
-            return False
-
-        while not self.matched:
-            self.loop.run_checked()
-
-        GLib.source_remove(self.timeout_id)
-        GObject.signal_handler_disconnect(self.notifier, self.signal_id)
-
-        return True
-
-
-class StoreHelper():
-    """
-    Helper for testing database access with libtracker-sparql.
-    """
-
-    def __init__(self, conn):
-        self.log = logging.getLogger(__name__)
-        self.loop = mainloop.MainLoop()
-
-        self.conn = conn
-
-    def await_insert(self, graph, predicates, timeout=DEFAULT_TIMEOUT):
-        """Context manager that blocks until a resource is inserted."""
-        return await_insert(self.conn, graph, predicates, timeout)
-
-    def await_property_update(self, graph, resource_id, before_predicates, after_predicates,
-                     timeout=DEFAULT_TIMEOUT):
-        """Context manager that blocks until a resource property is updated."""
-        return await_property_update(self.conn, graph, resource_id, before_predicates,
-                                     after_predicates, timeout)
-
-    def await_content_update(self, graph, before_resource_id, before_predicates, after_predicates,
-                     timeout=DEFAULT_TIMEOUT):
-        """Context manager that blocks until a resource is deleted and recreated."""
-        return await_content_update(self.conn, graph, before_resource_id, before_predicates,
-                                    after_predicates, timeout)
-
-    def await_delete(self, graph, resource_id, timeout=DEFAULT_TIMEOUT):
-        """Context manager that blocks until a resource is deleted."""
-        return await_delete(self.conn, graph, resource_id, timeout)
-
-    def ensure_resource(self, graph, predicates, timeout=DEFAULT_TIMEOUT):
-        """Ensure that a resource matching 'predicates' exists in 'graph'.
-
-        This function will block if the resource is not yet created.
-
-        """
-        await_ctx_mgr = await_insert(self.conn, graph, predicates, timeout, _check_inserted=False)
-        with await_ctx_mgr as resource:
-            # Check if the data was committed *before* the function was called.
-            query_initial = ' '.join([
-                'SELECT ?urn tracker:id(?urn) '
-                f' FROM NAMED <{graph}>',
-                ' WHERE { '
-                '   ?urn a rdfs:Resource ; ',
-                predicates,
-                '}'
-            ])
-
-            cursor = self.conn.query(query_initial)
-            if cursor.next():
-                resource.urn = cursor.get_string(0)[0]
-                resource.id = cursor.get_integer(1)
-                return resource
-        return resource
-
-    def query(self, query):
-        cursor = self.conn.query(query, None)
-        result = []
-        while cursor.next():
-            row = []
-            for i in range(0, cursor.get_n_columns()):
-                row.append(cursor.get_string(i)[0])
-            result.append(row)
-        return result
-
-    def update(self, update_sparql):
-        self.conn.update(update_sparql, 0, None)
-
-    def count_instances(self, ontology_class):
-        QUERY = """
-        SELECT COUNT(?u) WHERE {
-            ?u a %s .
-        }
-        """
-        result = self.query(QUERY % ontology_class)
-
-        if (len(result) == 1):
-            return int(result[0][0])
-        else:
-            return -1
-
-    def get_resource_id_by_uri(self, uri):
-        """
-        Get the internal ID for a given resource, identified by URI.
-        """
-        result = self.query(
-            'SELECT tracker:id(<%s>) WHERE { }' % uri)
-        if len(result) == 1:
-            return int(result[0][0])
-        elif len(result) == 0:
-            raise Exception("No entry for resource %s" % uri)
-        else:
-            raise Exception("Multiple entries for resource %s" % uri)
-
-    def get_content_resource_id(self, url):
-        """
-        Gets the internal ID for an nie:InformationElement resource.
-
-        The InformationElement represents data stored in a file, not
-        the file itself. The 'url' parameter is the URL of the file
-        that stores the given content.
-
-        """
-        result = self.query(
-            'SELECT tracker:id(?r) WHERE { ?r a nie:InformationElement; nie:isStoredAs "%s" }' % url)
-        if len(result) == 1:
-            return int(result[0][0])
-        elif len(result) == 0:
-            raise Exception("No entry for resource %s" % url)
-        else:
-            raise Exception("Multiple entries for resource %s" % url)
-
-    def ask(self, ask_query):
-        assert ask_query.strip().startswith("ASK")
-        result = self.query(ask_query)
-        assert len(result) == 1
-        if result[0][0] == "true":
-            return True
-        elif result[0][0] == "false":
-            return False
-        else:
-            raise Exception("Something fishy is going on")
-
-
-class TrackerDBusSandbox:
-    """
-    Private D-Bus session bus which executes a sandboxed Tracker instance.
-
-    """
-    def __init__(self, dbus_daemon_config_file, extra_env=None):
-        self.dbus_daemon_config_file = dbus_daemon_config_file
-        self.extra_env = extra_env or {}
-
-        self.daemon = dbusdaemon.DBusDaemon()
-
-    def start(self, new_session=False):
-        env = os.environ
-        env.update(self.extra_env)
-        env['G_MESSAGES_PREFIXED'] = 'all'
-
-        # This avoids an issue where gvfsd-fuse can start up while the bus is
-        # shutting down. If it fails to connect to the bus, it continues to
-        # run anyway which leads to our dbus-daemon failing to shut down.
-        #
-        # Since https://gitlab.gnome.org/GNOME/gvfs/issues/323 was implemented
-        # in GVFS 1.42 this problem may have gone away.
-        env['GVFS_DISABLE_FUSE'] = '1'
-
-        # Precreate runtime dir, to avoid this warning from dbus-daemon:
-        #
-        #    Unable to set up transient service directory: XDG_RUNTIME_DIR 
"/home/sam/tracker-tests/tmp_59i3ev1/run" not available: No such file or directory
-        #
-        xdg_runtime_dir = env.get('XDG_RUNTIME_DIR')
-        if xdg_runtime_dir:
-            os.makedirs(xdg_runtime_dir, exist_ok=True)
-
-        log.info("Starting D-Bus daemon for sandbox.")
-        log.debug("Added environment variables: %s", self.extra_env)
-        self.daemon.start(self.dbus_daemon_config_file, env=env, new_session=new_session)
-
-    def stop(self):
-        tracker_processes = []
-
-        log.info("Looking for active Tracker processes on the bus")
-        for busname in self.daemon.list_names_sync():
-            if busname.startswith(TRACKER_DBUS_PREFIX):
-                pid = self.daemon.get_connection_unix_process_id_sync(busname)
-                if pid is not None:
-                    tracker_processes.append(pid)
-
-        log.info("Terminating %i Tracker processes", len(tracker_processes))
-        for pid in tracker_processes:
-            os.kill(pid, signal.SIGTERM)
-
-        log.info("Waiting for %i Tracker processes", len(tracker_processes))
-        for pid in tracker_processes:
-            psutil.wait_pid(pid)
-
-        # We need to wait until Tracker processes have stopped before we
-        # terminate the D-Bus daemon, otherwise lots of criticals like this
-        # appear in the log output:
-        #
-        #  (tracker-miner-fs:14955): GLib-GIO-CRITICAL **: 11:38:40.386: Error  while sending AddMatch() 
message: The connection is closed
-
-        log.info("Stopping D-Bus daemon for sandbox.")
-        self.daemon.stop()
-
-    def stop_daemon(self, busname):
-        """Stops the daemon that owns 'busname'.
-
-        This can be used if you want to force the miner-fs to exit, for
-        example.
-
-        """
-        log.info("Stopping daemon process that owns %s.", busname)
-        pid = self.daemon.get_connection_unix_process_id_sync(busname)
-        if pid:
-            os.kill(pid, signal.SIGTERM)
-            psutil.wait_pid(pid)
-            log.info("Process %i has stopped.", pid)
-        else:
-            log.info("Couldn't find a process owning %s.", busname)
 
-    def get_connection(self):
-        return self.daemon.get_connection()
+# FIXME: Compatibility module due to recent API breaks.
+# Remove this before 3.0.
+from .dbussandbox import TrackerDBusSandbox
+from .storehelper import StoreHelper
diff --git a/utils/trackertestutils/storehelper.py b/utils/trackertestutils/storehelper.py
new file mode 100644
index 000000000..19a212861
--- /dev/null
+++ b/utils/trackertestutils/storehelper.py
@@ -0,0 +1,533 @@
+#
+# Copyright (C) 2010, Nokia <jean-luc lamadon nokia com>
+# Copyright (C) 2019, Sam Thursfield <sam afuera me uk>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+#
+
+"""Test helpers for libtracker-sparql data stores."""
+
+import gi
+gi.require_version('Tracker', '3.0')
+from gi.repository import GLib
+from gi.repository import GObject
+from gi.repository import Tracker
+
+import dataclasses
+import logging
+
+from . import mainloop
+
+log = logging.getLogger(__name__)
+
+DEFAULT_TIMEOUT = 10
+
+
+class AwaitException(RuntimeError):
+    pass
+
+
+class AwaitTimeoutException(AwaitException):
+    pass
+
+
+class NoMetadataException (Exception):
+    pass
+
+@dataclasses.dataclass
+class InsertedResource():
+    """Wraps the 'urn' value returned by await_insert context manager.
+
+    We can't return the value directly as we don't know it until the context
+    manager exits.
+
+    """
+    urn: str
+    id: int
+
+
+class await_insert():
+    """Context manager to await data insertion by Tracker miners & extractors.
+
+    Use like this:
+
+        expected = 'a nfo:Document; nie:isStoredAs <test://url>'
+        with self.tracker.await_update(DOCUMENTS_GRAPH, expected) as resource:
+            # Create or update a file that's indexed by tracker-miner-fs.
+            #
+            # The context manager will not exit from the 'with' block until the
+            # data has been inserted in the store.
+
+        print(f"Inserted resource with urn: {resource.urn}")
+
+    The function expects an insertion to happen, and will raise an error if the
+    expected data is already present in the store. You can use
+    ensure_resource() if you just want to ensure that some data is present.
+
+    """
+    def __init__(self, conn, graph, predicates, timeout=DEFAULT_TIMEOUT,
+                 _check_inserted=True):
+        self.conn = conn
+        self.graph = graph
+        self.predicates = predicates
+        self.timeout = timeout
+        self._check_inserted = _check_inserted
+
+        self.loop = mainloop.MainLoop()
+        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+
+        self.result = InsertedResource(None, 0)
+
+    def __enter__(self):
+        log.info("Awaiting insertion of resource with data %s", self.predicates)
+
+        if self._check_inserted:
+            query_check = ' '.join([
+                'SELECT ?urn tracker:id(?urn) ',
+                f' FROM NAMED <{self.graph}> ',
+                ' WHERE { ',
+                '   ?urn a rdfs:Resource ; ',
+                self.predicates,
+                '}'
+            ])
+            cursor = self.conn.query(query_check)
+            if cursor.next():
+                raise AwaitException("Expected data is already present in the store.")
+
+        query_filtered = ' '.join([
+            'SELECT ?urn tracker:id(?urn) ',
+            ' FROM NAMED <%s> ',
+            ' WHERE { ',
+            '   ?urn a rdfs:Resource ; ',
+            self.predicates,
+            #'   FILTER (tracker:id(?urn) = ~id) '
+            '   . FILTER (tracker:id(?urn) = %s) '
+            '}'
+        ])
+
+        # FIXME: doesn't work with bus backend: https://gitlab.gnome.org/GNOME/tracker/issues/179
+        #stmt = self.conn.query_statement(query, None)
+
+        def match_cb(notifier, service, graph, events):
+            for event in events:
+                if event.get_event_type() in [Tracker.NotifierEventType.CREATE,
+                                              Tracker.NotifierEventType.UPDATE]:
+                    log.debug("Processing %s event for id %s", event.get_event_type().value_nick, 
event.get_id())
+                    #stmt.bind_int('~id', event.get_id())
+                    #cursor = stmt.execute(None)
+                    stmt = query_filtered % (self.graph, event.get_id())
+                    cursor = self.conn.query(stmt)
+
+                    if cursor.next():
+                        self.result.urn = cursor.get_string(0)[0]
+                        self.result.id = cursor.get_integer(1)
+                        log.debug("Query matched! Got urn %s", self.result.urn)
+
+                        self.loop.quit()
+
+        def timeout_cb():
+            log.info("Timeout fired after %s seconds", self.timeout)
+            raise AwaitTimeoutException(
+                f"Timeout awaiting insert of resource matching: {self.predicates}")
+
+        self.signal_id = self.notifier.connect('events', match_cb)
+        self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+
+        return self.result
+
+    def __exit__(self, etype, evalue, etraceback):
+        if etype is not None:
+            return False
+
+        while self.result.urn is None:
+            self.loop.run_checked()
+            log.debug("Got urn %s", self.result.urn)
+
+        GLib.source_remove(self.timeout_id)
+        GObject.signal_handler_disconnect(self.notifier, self.signal_id)
+
+        return True
+
+
+class await_property_update():
+    """Context manager to await data updates by Tracker miners & extractors.
+
+    Use like this:
+
+        before = 'nie:isStoredAs <test://url1>'
+        after = 'nie:isStoredAs <test://url2>'
+        with self.tracker.await_property_update(resource_id, before, after):
+            # Trigger an update of the data.
+
+    """
+    def __init__(self, conn, graph, resource_id, before_predicates, after_predicates,
+                 timeout=DEFAULT_TIMEOUT):
+        self.conn = conn
+        self.graph = graph
+        self.resource_id = resource_id
+        self.before_predicates = before_predicates
+        self.after_predicates = after_predicates
+        self.timeout = timeout
+
+        self.loop = mainloop.MainLoop()
+        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+        self.matched = False
+
+    def __enter__(self):
+        log.info("Awaiting update of resource id %s", self.resource_id)
+
+        query_before = ' '.join([
+            'SELECT ?urn tracker:id(?urn) ',
+            ' FROM NAMED <%s> ',
+            ' WHERE { ',
+            '   ?urn a rdfs:Resource ; ',
+            self.before_predicates,
+            '   . FILTER (tracker:id(?urn) = %s) '
+            '}'
+        ]) % (self.graph, self.resource_id)
+
+        cursor = self.conn.query(query_before)
+        if not cursor.next():
+            raise AwaitException("Expected data is not present in the store.")
+
+        query_after = ' '.join([
+            'SELECT ?urn tracker:id(?urn) '
+            ' FROM NAMED <%s> ',
+            ' WHERE { '
+           '   ?urn a rdfs:Resource ; ',
+            self.after_predicates,
+            '   . FILTER (tracker:id(?urn) = %s) '
+            '}'
+        ]) % (self.graph, self.resource_id)
+
+        def match_cb(notifier, service, graph, events):
+            for event in events:
+                if event.get_event_type() == Tracker.NotifierEventType.UPDATE and event.get_id() == 
self.resource_id:
+                    log.debug("Processing %s event for id %s", event.get_event_type(), event.get_id())
+                    cursor = self.conn.query(query_after)
+
+                    if cursor.next():
+                        log.debug("Query matched!")
+                        self.matched = True
+                        self.loop.quit()
+
+        def timeout_cb():
+            log.info("Timeout fired after %s seconds", self.timeout)
+            raise AwaitTimeoutException(
+                f"Timeout awaiting update of resource {self.resource_id} "
+                f"matching: {self.after_predicates}")
+
+        self.signal_id = self.notifier.connect('events', match_cb)
+        self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+
+    def __exit__(self, etype, evalue, etraceback):
+        if etype is not None:
+            return False
+        while not self.matched:
+            self.loop.run_checked()
+
+        GLib.source_remove(self.timeout_id)
+        GObject.signal_handler_disconnect(self.notifier, self.signal_id)
+
+        return True
+
+
+class await_content_update():
+    """Context manager to await updates to file contents.
+
+    When a file is updated, the old information it contained is deleted from
+    the store, and the new information is inserted as a new resource.
+
+    """
+    def __init__(self, conn, graph, before_resource_id, before_predicates, after_predicates,
+                 timeout=DEFAULT_TIMEOUT):
+        self.conn = conn
+        self.graph = graph
+        self.before_resource_id = before_resource_id
+        self.before_predicates = before_predicates
+        self.after_predicates = after_predicates
+        self.timeout = timeout
+
+        self.loop = mainloop.MainLoop()
+        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+        self.matched = False
+
+        self.result = InsertedResource(None, 0)
+
+    def __enter__(self):
+        log.info("Awaiting delete of resource id %s and creation of a new one", self.before_resource_id)
+
+        query_before = ' '.join([
+            'SELECT nie:isStoredAs(?urn) ?urn tracker:id(?urn) '
+            ' FROM NAMED <%s> ',
+            ' WHERE { '
+            '   ?urn a rdfs:Resource ; ',
+            self.before_predicates,
+            '   . FILTER (tracker:id(?urn) = %s) '
+            '}'
+        ]) % (self.graph, self.before_resource_id)
+        cursor = self.conn.query(query_before)
+        if not cursor.next():
+            raise AwaitException("Expected data is not present in the store.")
+        file_url = cursor.get_string(0)[0]
+
+        query_after = ' '.join([
+            'SELECT ?urn tracker:id(?urn) '
+            ' FROM NAMED <%s> ',
+            ' WHERE { '
+            '   ?urn a rdfs:Resource ; ',
+            '      nie:isStoredAs <%s> ; ',
+            self.after_predicates,
+            '}'
+        ]) % (self.graph, file_url)
+
+        # When a file is updated, the DataObject representing the file gets
+        # an UPDATE notification. The InformationElement representing the
+        # content gets a DELETE and CREATE notification, because it is
+        # deleted and recreated. We detect the latter situation.
+
+        self.matched_delete = False
+        def match_cb(notifier, service, graph, events):
+            for event in events:
+                log.debug("Received %s event for id %s", event.get_event_type().value_nick, event.get_id())
+                if event.get_id() == self.before_resource_id and event.get_event_type() == 
Tracker.NotifierEventType.DELETE:
+                    log.debug("  Matched delete")
+                    self.matched_delete = True
+
+                # The after_predicates may match after the miner-fs creates
+                # the new resource, or they may only match once the extractor
+                # processes the resource. The latter will be an UPDATE event.
+                elif self.matched_delete and event.get_event_type() in [Tracker.NotifierEventType.CREATE, 
Tracker.NotifierEventType.UPDATE]:
+                    cursor = self.conn.query(query_after)
+
+                    if cursor.next():
+                        self.result.urn = cursor.get_string(0)[0]
+                        self.result.id = cursor.get_integer(1)
+                        log.debug("Query matched! Got new urn %s", self.result.urn)
+
+                        self.matched = True
+                        self.loop.quit()
+
+        def timeout_cb():
+            log.info("Timeout fired after %s seconds", self.timeout)
+            raise AwaitTimeoutException(
+                f"Timeout awaiting update of resource {self.before_resource_id} "
+                f"with URL {file_url} matching: {self.after_predicates}")
+
+        self.signal_id = self.notifier.connect('events', match_cb)
+        self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+
+        return self.result
+
+    def __exit__(self, etype, evalue, etraceback):
+        if etype is not None:
+            return False
+
+        while not self.matched:
+            self.loop.run_checked()
+
+        GLib.source_remove(self.timeout_id)
+        GObject.signal_handler_disconnect(self.notifier, self.signal_id)
+
+        return True
+
+
+class await_delete():
+    """Context manager to await removal of a resource."""
+
+    def __init__(self, conn, graph, resource_id, timeout=DEFAULT_TIMEOUT):
+        self.conn = conn
+        self.graph = graph
+        self.resource_id = resource_id
+        self.timeout = timeout
+
+        self.loop = mainloop.MainLoop()
+        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+        self.matched = False
+
+    def __enter__(self):
+        log.info("Awaiting deletion of resource id %s", self.resource_id)
+
+        query_check = ' '.join([
+            'SELECT ?urn tracker:id(?urn) ',
+            'FROM NAMED <%s> ',
+            ' WHERE { ',
+            '   ?urn a rdfs:Resource ; ',
+            '   . FILTER (tracker:id(?urn) = %s) '
+            '}'
+        ])
+        cursor = self.conn.query(query_check % (self.graph, self.resource_id))
+        if not cursor.next():
+            raise AwaitException(
+                "Resource with id %i isn't present in the store.", self.resource_id)
+
+        def match_cb(notifier, service, graph, events):
+            for event in events:
+                if event.get_event_type() == Tracker.NotifierEventType.DELETE:
+                    log.debug("Received %s event for id %s", event.get_event_type(), event.get_id())
+                    if event.get_id() == self.resource_id:
+                        log.debug("Matched expected id %s", self.resource_id)
+                        self.matched = True
+                        self.loop.quit()
+                else:
+                    log.debug("Received %s event for id %s", event.get_event_type(), event.get_id())
+
+        def timeout_cb():
+            log.info("Timeout fired after %s seconds", self.timeout)
+            raise AwaitTimeoutException(
+                f"Timeout awaiting removal of resource {self.resource_id} ")
+
+        self.signal_id = self.notifier.connect('events', match_cb)
+        self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+
+        return None
+
+    def __exit__(self, etype, evalue, etraceback):
+        if etype is not None:
+            return False
+
+        while not self.matched:
+            self.loop.run_checked()
+
+        GLib.source_remove(self.timeout_id)
+        GObject.signal_handler_disconnect(self.notifier, self.signal_id)
+
+        return True
+
+
+class StoreHelper():
+    """
+    Helper for testing database access with libtracker-sparql.
+    """
+
+    def __init__(self, conn):
+        self.log = logging.getLogger(__name__)
+        self.loop = mainloop.MainLoop()
+
+        self.conn = conn
+
+    def await_insert(self, graph, predicates, timeout=DEFAULT_TIMEOUT):
+        """Context manager that blocks until a resource is inserted."""
+        return await_insert(self.conn, graph, predicates, timeout)
+
+    def await_property_update(self, graph, resource_id, before_predicates, after_predicates,
+                     timeout=DEFAULT_TIMEOUT):
+        """Context manager that blocks until a resource property is updated."""
+        return await_property_update(self.conn, graph, resource_id, before_predicates,
+                                     after_predicates, timeout)
+
+    def await_content_update(self, graph, before_resource_id, before_predicates, after_predicates,
+                     timeout=DEFAULT_TIMEOUT):
+        """Context manager that blocks until a resource is deleted and recreated."""
+        return await_content_update(self.conn, graph, before_resource_id, before_predicates,
+                                    after_predicates, timeout)
+
+    def await_delete(self, graph, resource_id, timeout=DEFAULT_TIMEOUT):
+        """Context manager that blocks until a resource is deleted."""
+        return await_delete(self.conn, graph, resource_id, timeout)
+
+    def ensure_resource(self, graph, predicates, timeout=DEFAULT_TIMEOUT):
+        """Ensure that a resource matching 'predicates' exists in 'graph'.
+
+        This function will block if the resource is not yet created.
+
+        """
+        await_ctx_mgr = await_insert(self.conn, graph, predicates, timeout, _check_inserted=False)
+        with await_ctx_mgr as resource:
+            # Check if the data was committed *before* the function was called.
+            query_initial = ' '.join([
+                'SELECT ?urn tracker:id(?urn) '
+                f' FROM NAMED <{graph}>',
+                ' WHERE { '
+                '   ?urn a rdfs:Resource ; ',
+                predicates,
+                '}'
+            ])
+
+            cursor = self.conn.query(query_initial)
+            if cursor.next():
+                resource.urn = cursor.get_string(0)[0]
+                resource.id = cursor.get_integer(1)
+                return resource
+        return resource
+
+    def query(self, query):
+        cursor = self.conn.query(query, None)
+        result = []
+        while cursor.next():
+            row = []
+            for i in range(0, cursor.get_n_columns()):
+                row.append(cursor.get_string(i)[0])
+            result.append(row)
+        return result
+
+    def update(self, update_sparql):
+        self.conn.update(update_sparql, 0, None)
+
+    def count_instances(self, ontology_class):
+        QUERY = """
+        SELECT COUNT(?u) WHERE {
+            ?u a %s .
+        }
+        """
+        result = self.query(QUERY % ontology_class)
+
+        if (len(result) == 1):
+            return int(result[0][0])
+        else:
+            return -1
+
+    def get_resource_id_by_uri(self, uri):
+        """
+        Get the internal ID for a given resource, identified by URI.
+        """
+        result = self.query(
+            'SELECT tracker:id(<%s>) WHERE { }' % uri)
+        if len(result) == 1:
+            return int(result[0][0])
+        elif len(result) == 0:
+            raise Exception("No entry for resource %s" % uri)
+        else:
+            raise Exception("Multiple entries for resource %s" % uri)
+
+    def get_content_resource_id(self, url):
+        """
+        Gets the internal ID for an nie:InformationElement resource.
+
+        The InformationElement represents data stored in a file, not
+        the file itself. The 'url' parameter is the URL of the file
+        that stores the given content.
+
+        """
+        result = self.query(
+            'SELECT tracker:id(?r) WHERE { ?r a nie:InformationElement; nie:isStoredAs "%s" }' % url)
+        if len(result) == 1:
+            return int(result[0][0])
+        elif len(result) == 0:
+            raise Exception("No entry for resource %s" % url)
+        else:
+            raise Exception("Multiple entries for resource %s" % url)
+
+    def ask(self, ask_query):
+        assert ask_query.strip().startswith("ASK")
+        result = self.query(ask_query)
+        assert len(result) == 1
+        if result[0][0] == "true":
+            return True
+        elif result[0][0] == "false":
+            return False
+        else:
+            raise Exception("Something fishy is going on")


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]