[tracker/sam/test-fixes: 7/7] trackertestutils: Separate await_update() into two functions



commit 44cef25e90fe40366cc65e5aced4b1ba8fabdd4e
Author: Sam Thursfield <sam afuera me uk>
Date:   Fri May 1 22:01:06 2020 +0200

    trackertestutils: Separate await_update() into two functions
    
    There are now two kinds of updates, property updates, and whole
    information updates where a resource is deleted and a new one inserted.
    Let's treat them separately.

 utils/trackertestutils/helpers.py | 162 ++++++++++++++++++++++++++++++--------
 1 file changed, 128 insertions(+), 34 deletions(-)
---
diff --git a/utils/trackertestutils/helpers.py b/utils/trackertestutils/helpers.py
index 4de36d522..0e69dd197 100644
--- a/utils/trackertestutils/helpers.py
+++ b/utils/trackertestutils/helpers.py
@@ -151,7 +151,6 @@ class await_insert():
                     #stmt.bind_int('~id', event.get_id())
                     #cursor = stmt.execute(None)
                     stmt = query_filtered % (self.graph, event.get_id())
-                    log.debug("Running %s", stmt)
                     cursor = self.conn.query(stmt)
 
                     if cursor.next():
@@ -185,14 +184,14 @@ class await_insert():
         return True
 
 
-class await_update():
+class await_property_update():
     """Context manager to await data updates by Tracker miners & extractors.
 
     Use like this:
 
         before = 'nie:isStoredAs <test://url1>'
         after = 'nie:isStoredAs <test://url2>'
-        with self.tracker.await_update(resource_id, before, after):
+        with self.tracker.await_property_update(resource_id, before, after):
             # Trigger an update of the data.
 
     """
@@ -213,47 +212,34 @@ class await_update():
         log.info("Awaiting update of resource id %s", self.resource_id)
 
         query_before = ' '.join([
-            'SELECT nie:isStoredAs(?urn) ?urn tracker:id(?urn) '
+            'SELECT ?urn tracker:id(?urn) ',
             ' FROM NAMED <%s> ',
-            ' WHERE { '
+            ' WHERE { ',
             '   ?urn a rdfs:Resource ; ',
             self.before_predicates,
             '   . FILTER (tracker:id(?urn) = %s) '
             '}'
         ]) % (self.graph, self.resource_id)
+
         cursor = self.conn.query(query_before)
         if not cursor.next():
             raise AwaitException("Expected data is not present in the store.")
 
-        self.stored_as = cursor.get_string(0)[0]
-
-        query_on_create = 'SELECT FROM NAMED <%s> nie:isStoredAs(tracker:uri(%s)) { }'
         query_after = ' '.join([
             'SELECT ?urn tracker:id(?urn) '
             ' FROM NAMED <%s> ',
             ' WHERE { '
-            '   ?urn a rdfs:Resource ; ',
+           '   ?urn a rdfs:Resource ; ',
             self.after_predicates,
-            '   . FILTER (tracker:id(?urn) = %s) ',
+            '   . FILTER (tracker:id(?urn) = %s) '
             '}'
-        ])
+        ]) % (self.graph, self.resource_id)
 
         def match_cb(notifier, service, graph, events):
             for event in events:
-                log.debug("Processing %s event for id %s", event.get_event_type().value_nick, event.get_id())
-                if event.get_event_type() == Tracker.NotifierEventType.DELETE and event.get_id() == 
self.resource_id:
-                    self.resource_deleted = True
-                elif event.get_event_type() in [Tracker.NotifierEventType.CREATE,
-                                                Tracker.NotifierEventType.UPDATE]:
-                    if event.get_event_type() == Tracker.NotifierEventType.CREATE:
-                        if not self.resource_deleted:
-                            raise AwaitException("Received insert with no prior delete")
-                        cursor = self.conn.query(query_on_create % (self.graph, event.get_id()))
-                        if cursor.next() and cursor.get_string(0)[0] == self.stored_as:
-                            self.resource_id = event.get_id()
-
-                    log.debug("Running %s", query_after % (self.graph, event.get_id()))
-                    cursor = self.conn.query(query_after % (self.graph, event.get_id()))
+                if event.get_event_type() == Tracker.NotifierEventType.UPDATE and event.get_id() == 
self.resource_id:
+                    log.debug("Processing %s event for id %s", event.get_event_type(), event.get_id())
+                    cursor = self.conn.query(query_after)
 
                     if cursor.next():
                         log.debug("Query matched!")
@@ -269,6 +255,105 @@ class await_update():
         self.signal_id = self.notifier.connect('events', match_cb)
         self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
 
+    def __exit__(self, etype, evalue, etraceback):
+        if etype is not None:
+            return False
+        while not self.matched:
+            self.loop.run_checked()
+
+        GLib.source_remove(self.timeout_id)
+        GObject.signal_handler_disconnect(self.notifier, self.signal_id)
+
+        return True
+
+
+class await_content_update():
+    """Context manager to await updates to file contents.
+
+    When a file is updated, the old information it contained is deleted from
+    the store, and the new information is inserted as a new resource.
+
+    """
+    def __init__(self, conn, graph, before_resource_id, before_predicates, after_predicates,
+                 timeout=DEFAULT_TIMEOUT):
+        self.conn = conn
+        self.graph = graph
+        self.before_resource_id = before_resource_id
+        self.before_predicates = before_predicates
+        self.after_predicates = after_predicates
+        self.timeout = timeout
+
+        self.loop = mainloop.MainLoop()
+        self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+        self.matched = False
+
+        self.result = InsertedResource(None, 0)
+
+    def __enter__(self):
+        log.info("Awaiting delete of resource id %s and creation of a new one", self.before_resource_id)
+
+        query_before = ' '.join([
+            'SELECT nie:isStoredAs(?urn) ?urn tracker:id(?urn) '
+            ' FROM NAMED <%s> ',
+            ' WHERE { '
+            '   ?urn a rdfs:Resource ; ',
+            self.before_predicates,
+            '   . FILTER (tracker:id(?urn) = %s) '
+            '}'
+        ]) % (self.graph, self.before_resource_id)
+        cursor = self.conn.query(query_before)
+        if not cursor.next():
+            raise AwaitException("Expected data is not present in the store.")
+        file_url = cursor.get_string(0)[0]
+
+        query_after = ' '.join([
+            'SELECT ?urn tracker:id(?urn) '
+            ' FROM NAMED <%s> ',
+            ' WHERE { '
+            '   ?urn a rdfs:Resource ; ',
+            '      nie:isStoredAs <%s> ; ',
+            self.after_predicates,
+            '}'
+        ]) % (self.graph, file_url)
+
+        # When a file is updated, the DataObject representing the file gets
+        # an UPDATE notification. The InformationElement representing the
+        # content gets a DELETE and CREATE notification, because it is
+        # deleted and recreated. We detect the latter situation.
+
+        self.matched_delete = False
+        def match_cb(notifier, service, graph, events):
+            for event in events:
+                log.debug("Received %s event for id %s", event.get_event_type().value_nick, event.get_id())
+                if event.get_id() == self.before_resource_id and event.get_event_type() == 
Tracker.NotifierEventType.DELETE:
+                    log.debug("  Matched delete")
+                    self.matched_delete = True
+
+                # The after_predicates may match after the miner-fs creates
+                # the new resource, or they may only match once the extractor
+                # processes the resource. The latter will be an UPDATE event.
+                elif self.matched_delete and event.get_event_type() in [Tracker.NotifierEventType.CREATE, 
Tracker.NotifierEventType.UPDATE]:
+                    cursor = self.conn.query(query_after)
+
+                    if cursor.next():
+                        self.result.urn = cursor.get_string(0)[0]
+                        self.result.id = cursor.get_integer(1)
+                        log.debug("Query matched! Got new urn %s", self.result.urn)
+
+                        self.matched = True
+                        self.loop.quit()
+
+        def timeout_cb():
+            log.info("Timeout fired after %s seconds", self.timeout)
+            raise AwaitTimeoutException(
+                f"Timeout awaiting update of resource {self.before_resource_id} "
+                f"with URL {file_url} matching: {self.after_predicates}")
+
+        self.signal_id = self.notifier.connect('events', match_cb)
+        self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+
+        return self.result
+
     def __exit__(self, etype, evalue, etraceback):
         if etype is not None:
             return False
@@ -360,11 +445,17 @@ class StoreHelper():
         """Context manager that blocks until a resource is inserted."""
         return await_insert(self.conn, graph, predicates, timeout)
 
-    def await_update(self, graph, resource_id, before_predicates, after_predicates,
+    def await_property_update(self, graph, resource_id, before_predicates, after_predicates,
+                     timeout=DEFAULT_TIMEOUT):
+        """Context manager that blocks until a resource property is updated."""
+        return await_property_update(self.conn, graph, resource_id, before_predicates,
+                                     after_predicates, timeout)
+
+    def await_content_update(self, graph, before_resource_id, before_predicates, after_predicates,
                      timeout=DEFAULT_TIMEOUT):
-        """Context manager that blocks until a resource is updated."""
-        return await_update(self.conn, graph, resource_id, before_predicates,
-                            after_predicates, timeout)
+        """Context manager that blocks until a resource is deleted and recreated."""
+        return await_content_update(self.conn, graph, before_resource_id, before_predicates,
+                                    after_predicates, timeout)
 
     def await_delete(self, graph, resource_id, timeout=DEFAULT_TIMEOUT):
         """Context manager that blocks until a resource is deleted."""
@@ -388,7 +479,6 @@ class StoreHelper():
                 '}'
             ])
 
-            log.debug("Running: %s", query_initial)
             cursor = self.conn.query(query_initial)
             if cursor.next():
                 resource.urn = cursor.get_string(0)[0]
@@ -435,13 +525,17 @@ class StoreHelper():
         else:
             raise Exception("Multiple entries for resource %s" % uri)
 
-    # FIXME: rename to get_resource_id_by_nepomuk_url !!
-    def get_resource_id(self, url):
+    def get_content_resource_id(self, url):
         """
-        Get the internal ID for a given resource, identified by URL.
+        Gets the internal ID for an nie:InformationElement resource.
+
+        The InformationElement represents data stored in a file, not
+        the file itself. The 'url' parameter is the URL of the file
+        that stores the given content.
+
         """
         result = self.query(
-            'SELECT tracker:id(?r) WHERE { ?r nie:isStoredAs "%s" }' % url)
+            'SELECT tracker:id(?r) WHERE { ?r a nie:InformationElement; nie:isStoredAs "%s" }' % url)
         if len(result) == 1:
             return int(result[0][0])
         elif len(result) == 0:


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]