[tracker/sam/functional-tests-shared: 8/8] functional-tests: Merge changes to 'helpers' module from tracker-miners.git



commit 68f73d4302cf48c37d11bbd466befff2f431c228
Author: Sam Thursfield <sam afuera me uk>
Date:   Wed Aug 21 22:30:55 2019 +0100

    functional-tests: Merge changes to 'helpers' module from tracker-miners.git
    
    There are various features which were implemented in tracker-miners.git
    since it was split from tracker.git which never made their way into
    tracker.git.

 utils/trackertestutils/helpers.py  | 297 ++++++++++++++++++++++++++++++++++---
 utils/trackertestutils/mainloop.py |  58 ++++++++
 2 files changed, 332 insertions(+), 23 deletions(-)
---
diff --git a/utils/trackertestutils/helpers.py b/utils/trackertestutils/helpers.py
index 71685674a..2b218e5d0 100644
--- a/utils/trackertestutils/helpers.py
+++ b/utils/trackertestutils/helpers.py
@@ -21,20 +21,39 @@
 from gi.repository import Gio
 from gi.repository import GLib
 
+import atexit
 import logging
 import os
-import re
-import sys
 import subprocess
-import time
+
+from . import mainloop
+
+log = logging.getLogger(__name__)
+
+
+class GraphUpdateTimeoutException(RuntimeError):
+    pass
 
 
 class NoMetadataException (Exception):
     pass
 
+
 REASONABLE_TIMEOUT = 30
 
 
+_process_list = []
+
+
+def _cleanup_processes():
+    for process in _process_list:
+        log.debug("helpers._cleanup_processes: stopping %s", process)
+        process.stop()
+
+
+atexit.register(_cleanup_processes)
+
+
 class Helper:
     """
     Abstract helper for Tracker processes. Launches the process
@@ -62,24 +81,14 @@ class Helper:
         self.process = None
         self.available = False
 
-        self.loop = GLib.MainLoop()
-        self.install_glib_excepthook(self.loop)
+        self.loop = mainloop.MainLoop()
 
         self.bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
 
-    def install_glib_excepthook(self, loop):
-        """
-        Handler to abort test if an exception occurs inside the GLib main loop.
-        """
-        old_hook = sys.excepthook
-
-        def new_hook(etype, evalue, etb):
-            old_hook(etype, evalue, etb)
-            GLib.MainLoop.quit(loop)
-            sys.exit(1)
-        sys.excepthook = new_hook
-
     def _start_process(self, command_args=None, extra_env=None):
+        global _process_list
+        _process_list.append(self)
+
         command = [self.process_path] + (command_args or [])
         self.log.debug("Starting %s.", ' '.join(command))
 
@@ -91,7 +100,7 @@ class Helper:
         try:
             return subprocess.Popen(command, env=env)
         except OSError as e:
-            raise RuntimeError("Error starting %s: %s" % (path, e))
+            raise RuntimeError("Error starting %s: %s" % (self.process_path, e))
 
     def _bus_name_appeared(self, connection, name, owner):
         self.log.debug("%s appeared on the message bus, owned by %s", name, owner)
@@ -138,7 +147,7 @@ class Helper:
 
         # We expect the _bus_name_vanished callback to be called here,
         # causing the loop to exit again.
-        self.loop.run()
+        self.loop.run_checked()
 
         if self.available:
             # It's running, but we didn't start it...
@@ -155,16 +164,17 @@ class Helper:
         self.abort_if_process_exits_with_status_0 = True
 
         # Run the loop until the bus name appears, or the process dies.
-        self.loop.run()
+        self.loop.run_checked()
 
         self.abort_if_process_exits_with_status_0 = False
 
     def stop(self):
+        global _process_list
+
         if self.process is None:
             # Seems that it didn't even start...
             return
 
-        start = time.time()
         if self.process.poll() == None:
             GLib.source_remove(self.process_startup_timeout)
             self.process_startup_timeout = 0
@@ -181,12 +191,15 @@ class Helper:
         self.log.debug("Process stopped.")
 
         # Run the loop to handle the expected name_vanished signal.
-        self.loop.run()
+        self.loop.run_checked()
         Gio.bus_unwatch_name(self._bus_name_watch_id)
 
         self.process = None
+        _process_list.remove(self)
 
     def kill(self):
+        global _process_list
+
         if self.process_watch_timeout != 0:
             GLib.source_remove(self.process_watch_timeout)
             self.process_watch_timeout = 0
@@ -194,10 +207,11 @@ class Helper:
         self.process.kill()
 
         # Name owner changed callback should take us out from this loop
-        self.loop.run()
+        self.loop.run_checked()
         Gio.bus_unwatch_name(self._bus_name_watch_id)
 
         self.process = None
+        _process_list.remove(self)
 
         self.log.debug("Process killed.")
 
@@ -246,9 +260,246 @@ class StoreHelper (Helper):
         self.status_iface.Wait()
         self.log.debug("Ready")
 
+        self.reset_graph_updates_tracking()
+
+        def signal_handler(proxy, sender_name, signal_name, parameters):
+            if signal_name == 'GraphUpdated':
+                self._graph_updated_cb(*parameters.unpack())
+
+        self.graph_updated_handler_id = self.resources.connect(
+            'g-signal', signal_handler)
+
     def stop(self):
         Helper.stop(self)
 
+        if self.graph_updated_handler_id != 0:
+            self.resources.disconnect(self.graph_updated_handler_id)
+
+    # A system to follow GraphUpdated and make sure all changes are tracked.
+    # This code saves every change notification received, and exposes methods
+    # to await insertion or deletion of a certain resource which first check
+    # the list of events already received and wait for more if the event has
+    # not yet happened.
+
+    def reset_graph_updates_tracking(self):
+        self.class_to_track = None
+        self.inserts_list = []
+        self.deletes_list = []
+        self.inserts_match_function = None
+        self.deletes_match_function = None
+
+    def _graph_updated_timeout_cb(self):
+        raise GraphUpdateTimeoutException()
+
+    def _graph_updated_cb(self, class_name, deletes_list, inserts_list):
+        """
+        Process notifications from tracker-store on resource changes.
+        """
+        exit_loop = False
+
+        if class_name == self.class_to_track:
+            self.log.debug("GraphUpdated for %s: %i deletes, %i inserts", class_name, len(deletes_list), 
len(inserts_list))
+
+            if inserts_list is not None:
+                if self.inserts_match_function is not None:
+                    # The match function will remove matched entries from the list
+                    (exit_loop, inserts_list) = self.inserts_match_function(inserts_list)
+                self.inserts_list += inserts_list
+
+            if not exit_loop and deletes_list is not None:
+                if self.deletes_match_function is not None:
+                    (exit_loop, deletes_list) = self.deletes_match_function(deletes_list)
+                self.deletes_list += deletes_list
+
+            if exit_loop:
+                GLib.source_remove(self.graph_updated_timeout_id)
+                self.graph_updated_timeout_id = 0
+                self.loop.quit()
+        else:
+            self.log.debug("Ignoring GraphUpdated for class %s, currently tracking %s", class_name, 
self.class_to_track)
+
+    def _enable_await_timeout(self):
+        self.graph_updated_timeout_id = GLib.timeout_add_seconds(REASONABLE_TIMEOUT,
+                                                                 self._graph_updated_timeout_cb)
+
+    def await_resource_inserted(self, rdf_class, url=None, title=None, required_property=None):
+        """
+        Block until a resource matching the parameters becomes available
+        """
+        assert (self.inserts_match_function == None)
+        assert (self.class_to_track == None), "Already waiting for resource of type %s" % self.class_to_track
+
+        self.class_to_track = rdf_class
+
+        self.matched_resource_urn = None
+        self.matched_resource_id = None
+
+        self.log.debug("Await new %s (%i existing inserts)", rdf_class, len(self.inserts_list))
+
+        if required_property is not None:
+            required_property_id = self.get_resource_id_by_uri(required_property)
+            self.log.debug("Required property %s id %i", required_property, required_property_id)
+
+        def find_resource_insertion(inserts_list):
+            matched_creation = (self.matched_resource_id is not None)
+            matched_required_property = False
+            remaining_events = []
+
+            # FIXME: this could be done in an easier way: build one query that filters
+            # based on every subject id in inserts_list, and returns the id of the one
+            # that matched :)
+            for insert in inserts_list:
+                id = insert[1]
+
+                if not matched_creation:
+                    where = "  ?urn a <%s> " % rdf_class
+
+                    if url is not None:
+                        where += "; nie:url \"%s\"" % url
+
+                    if title is not None:
+                        where += "; nie:title \"%s\"" % title
+
+                    query = "SELECT ?urn WHERE { %s FILTER (tracker:id(?urn) = %s)}" % (where, insert[1])
+                    result_set = self.query(query)
+
+                    if len(result_set) > 0:
+                        matched_creation = True
+                        self.matched_resource_urn = result_set[0][0]
+                        self.matched_resource_id = insert[1]
+                        self.log.debug("Matched creation of resource %s (%i)",
+                            self.matched_resource_urn,
+                             self.matched_resource_id)
+                        if required_property is not None:
+                            self.log.debug("Waiting for property %s (%i) to be set",
+                                required_property, required_property_id)
+
+                if required_property is not None and matched_creation and not matched_required_property:
+                    if id == self.matched_resource_id and insert[2] == required_property_id:
+                        matched_required_property = True
+                        self.log.debug("Matched %s %s", self.matched_resource_urn, required_property)
+
+                if not matched_creation or id != self.matched_resource_id:
+                    remaining_events += [insert]
+
+            matched = matched_creation if required_property is None else matched_required_property
+            return matched, remaining_events
+
+        def match_cb(inserts_list):
+            matched, remaining_events = find_resource_insertion(inserts_list)
+            exit_loop = matched
+            return exit_loop, remaining_events
+
+        # Check the list of previously received events for matches
+        (existing_match, self.inserts_list) = find_resource_insertion(self.inserts_list)
+
+        if not existing_match:
+            self._enable_await_timeout()
+            self.inserts_match_function = match_cb
+            # Run the event loop until the correct notification arrives
+            try:
+                self.loop.run_checked()
+            except GraphUpdateTimeoutException:
+                raise GraphUpdateTimeoutException("Timeout waiting for resource: class %s, URL %s, title %s" 
% (rdf_class, url, title)) from None
+            self.inserts_match_function = None
+
+        self.class_to_track = None
+        return (self.matched_resource_id, self.matched_resource_urn)
+
+    def await_resource_deleted(self, rdf_class, id):
+        """
+        Block until we are notified of a resources deletion
+        """
+        assert (self.deletes_match_function == None)
+        assert (self.class_to_track == None)
+
+        def find_resource_deletion(deletes_list):
+            self.log.debug("find_resource_deletion: looking for %i in %s", id, deletes_list)
+
+            matched = False
+            remaining_events = []
+
+            for delete in deletes_list:
+                if delete[1] == id:
+                    matched = True
+                else:
+                    remaining_events += [delete]
+
+            return matched, remaining_events
+
+        def match_cb(deletes_list):
+            matched, remaining_events = find_resource_deletion(deletes_list)
+            exit_loop = matched
+            return exit_loop, remaining_events
+
+        self.log.debug("Await deletion of %i (%i existing)", id, len(self.deletes_list))
+
+        (existing_match, self.deletes_list) = find_resource_deletion(self.deletes_list)
+
+        if not existing_match:
+            self._enable_await_timeout()
+            self.class_to_track = rdf_class
+            self.deletes_match_function = match_cb
+            # Run the event loop until the correct notification arrives
+            try:
+                self.loop.run_checked()
+            except GraphUpdateTimeoutException:
+                raise GraphUpdateTimeoutException("Resource %i has not been deleted." % id)
+            self.deletes_match_function = None
+            self.class_to_track = None
+
+        return
+
+    def await_property_changed(self, rdf_class, subject_id, property_uri):
+        """
+        Block until a property of a resource is updated or inserted.
+        """
+        assert (self.inserts_match_function == None)
+        assert (self.deletes_match_function == None)
+        assert (self.class_to_track == None)
+
+        self.log.debug("Await change to %i %s (%i, %i existing)", subject_id, property_uri, 
len(self.inserts_list), len(self.deletes_list))
+
+        self.class_to_track = rdf_class
+
+        property_id = self.get_resource_id_by_uri(property_uri)
+
+        def find_property_change(event_list):
+            matched = False
+            remaining_events = []
+
+            for event in event_list:
+                if event[1] == subject_id and event[2] == property_id:
+                    self.log.debug("Matched property change: %s", str(event))
+                    matched = True
+                else:
+                    remaining_events += [event]
+
+            return matched, remaining_events
+
+        def match_cb(event_list):
+            matched, remaining_events = find_property_change(event_list)
+            exit_loop = matched
+            return exit_loop, remaining_events
+
+        # Check the list of previously received events for matches
+        (existing_match, self.inserts_list) = find_property_change(self.inserts_list)
+        (existing_match, self.deletes_list) = find_property_change(self.deletes_list)
+
+        if not existing_match:
+            self._enable_await_timeout()
+            self.inserts_match_function = match_cb
+            self.deletes_match_function = match_cb
+            # Run the event loop until the correct notification arrives
+            try:
+                self.loop.run_checked()
+            except GraphUpdateTimeoutException:
+                raise GraphUpdateTimeoutException(
+                    "Timeout waiting for property change, subject %i property %s (%i)" % (subject_id, 
property_uri, property_id))
+            self.inserts_match_function = None
+            self.deletes_match_function = None
+            self.class_to_track = None
+
     # Note: The methods below call the tracker-store D-Bus API directly. This
     # is useful for testing this API surface, but we recommand that all regular
     # applications use libtracker-sparql library to talk to the database.
diff --git a/utils/trackertestutils/mainloop.py b/utils/trackertestutils/mainloop.py
new file mode 100644
index 000000000..1e7a46c87
--- /dev/null
+++ b/utils/trackertestutils/mainloop.py
@@ -0,0 +1,58 @@
+# Copyright (C) 2018, Sam Thursfield <sam afuera me uk>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+from gi.repository import GLib
+
+import sys
+
+
+class MainLoop():
+    '''Wrapper for GLib.MainLoop that propagates any unhandled exceptions.
+
+    PyGObject doesn't seem to provide any help with propagating exceptions from
+    the GLib main loop to the main Python execution context. The default
+    behaviour is to print a message and continue, which is useless for tests as
+    it means tests appear to pass when in fact they are broken.
+
+    '''
+
+    def __init__(self):
+        self._loop = GLib.MainLoop.new(None, 0)
+
+    def quit(self):
+        self._loop.quit()
+
+    def run_checked(self):
+        '''Run the loop until quit(), then raise any unhandled exception.'''
+        self._exception = None
+
+        old_hook = sys.excepthook
+
+        def new_hook(etype, evalue, etb):
+            self._loop.quit()
+            self._exception = evalue
+            old_hook(etype, evalue, etb)
+
+        try:
+            sys.excepthook = new_hook
+            self._loop.run()
+        finally:
+            sys.excepthook = old_hook
+
+        if self._exception:
+            raise self._exception


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]