[tracker/sam/test-fixes: 7/7] trackertestutils: Separate await_update() into two functions
- From: Sam Thursfield <sthursfield src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/sam/test-fixes: 7/7] trackertestutils: Separate await_update() into two functions
- Date: Fri, 1 May 2020 23:24:17 +0000 (UTC)
commit 44cef25e90fe40366cc65e5aced4b1ba8fabdd4e
Author: Sam Thursfield <sam afuera me uk>
Date: Fri May 1 22:01:06 2020 +0200
trackertestutils: Separate await_update() into two functions
There are now two kinds of updates, property updates, and whole
information updates where a resource is deleted and a new one inserted.
Let's treat them separately.
utils/trackertestutils/helpers.py | 162 ++++++++++++++++++++++++++++++--------
1 file changed, 128 insertions(+), 34 deletions(-)
---
diff --git a/utils/trackertestutils/helpers.py b/utils/trackertestutils/helpers.py
index 4de36d522..0e69dd197 100644
--- a/utils/trackertestutils/helpers.py
+++ b/utils/trackertestutils/helpers.py
@@ -151,7 +151,6 @@ class await_insert():
#stmt.bind_int('~id', event.get_id())
#cursor = stmt.execute(None)
stmt = query_filtered % (self.graph, event.get_id())
- log.debug("Running %s", stmt)
cursor = self.conn.query(stmt)
if cursor.next():
@@ -185,14 +184,14 @@ class await_insert():
return True
-class await_update():
+class await_property_update():
"""Context manager to await data updates by Tracker miners & extractors.
Use like this:
before = 'nie:isStoredAs <test://url1>'
after = 'nie:isStoredAs <test://url2>'
- with self.tracker.await_update(resource_id, before, after):
+ with self.tracker.await_property_update(resource_id, before, after):
# Trigger an update of the data.
"""
@@ -213,47 +212,34 @@ class await_update():
log.info("Awaiting update of resource id %s", self.resource_id)
query_before = ' '.join([
- 'SELECT nie:isStoredAs(?urn) ?urn tracker:id(?urn) '
+ 'SELECT ?urn tracker:id(?urn) ',
' FROM NAMED <%s> ',
- ' WHERE { '
+ ' WHERE { ',
' ?urn a rdfs:Resource ; ',
self.before_predicates,
' . FILTER (tracker:id(?urn) = %s) '
'}'
]) % (self.graph, self.resource_id)
+
cursor = self.conn.query(query_before)
if not cursor.next():
raise AwaitException("Expected data is not present in the store.")
- self.stored_as = cursor.get_string(0)[0]
-
- query_on_create = 'SELECT FROM NAMED <%s> nie:isStoredAs(tracker:uri(%s)) { }'
query_after = ' '.join([
'SELECT ?urn tracker:id(?urn) '
' FROM NAMED <%s> ',
' WHERE { '
- ' ?urn a rdfs:Resource ; ',
+ ' ?urn a rdfs:Resource ; ',
self.after_predicates,
- ' . FILTER (tracker:id(?urn) = %s) ',
+ ' . FILTER (tracker:id(?urn) = %s) '
'}'
- ])
+ ]) % (self.graph, self.resource_id)
def match_cb(notifier, service, graph, events):
for event in events:
- log.debug("Processing %s event for id %s", event.get_event_type().value_nick, event.get_id())
- if event.get_event_type() == Tracker.NotifierEventType.DELETE and event.get_id() ==
self.resource_id:
- self.resource_deleted = True
- elif event.get_event_type() in [Tracker.NotifierEventType.CREATE,
- Tracker.NotifierEventType.UPDATE]:
- if event.get_event_type() == Tracker.NotifierEventType.CREATE:
- if not self.resource_deleted:
- raise AwaitException("Received insert with no prior delete")
- cursor = self.conn.query(query_on_create % (self.graph, event.get_id()))
- if cursor.next() and cursor.get_string(0)[0] == self.stored_as:
- self.resource_id = event.get_id()
-
- log.debug("Running %s", query_after % (self.graph, event.get_id()))
- cursor = self.conn.query(query_after % (self.graph, event.get_id()))
+ if event.get_event_type() == Tracker.NotifierEventType.UPDATE and event.get_id() ==
self.resource_id:
+ log.debug("Processing %s event for id %s", event.get_event_type(), event.get_id())
+ cursor = self.conn.query(query_after)
if cursor.next():
log.debug("Query matched!")
@@ -269,6 +255,105 @@ class await_update():
self.signal_id = self.notifier.connect('events', match_cb)
self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+ def __exit__(self, etype, evalue, etraceback):
+ if etype is not None:
+ return False
+ while not self.matched:
+ self.loop.run_checked()
+
+ GLib.source_remove(self.timeout_id)
+ GObject.signal_handler_disconnect(self.notifier, self.signal_id)
+
+ return True
+
+
+class await_content_update():
+ """Context manager to await updates to file contents.
+
+ When a file is updated, the old information it contained is deleted from
+ the store, and the new information is inserted as a new resource.
+
+ """
+ def __init__(self, conn, graph, before_resource_id, before_predicates, after_predicates,
+ timeout=DEFAULT_TIMEOUT):
+ self.conn = conn
+ self.graph = graph
+ self.before_resource_id = before_resource_id
+ self.before_predicates = before_predicates
+ self.after_predicates = after_predicates
+ self.timeout = timeout
+
+ self.loop = mainloop.MainLoop()
+ self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+ self.matched = False
+
+ self.result = InsertedResource(None, 0)
+
+ def __enter__(self):
+ log.info("Awaiting delete of resource id %s and creation of a new one", self.before_resource_id)
+
+ query_before = ' '.join([
+ 'SELECT nie:isStoredAs(?urn) ?urn tracker:id(?urn) '
+ ' FROM NAMED <%s> ',
+ ' WHERE { '
+ ' ?urn a rdfs:Resource ; ',
+ self.before_predicates,
+ ' . FILTER (tracker:id(?urn) = %s) '
+ '}'
+ ]) % (self.graph, self.before_resource_id)
+ cursor = self.conn.query(query_before)
+ if not cursor.next():
+ raise AwaitException("Expected data is not present in the store.")
+ file_url = cursor.get_string(0)[0]
+
+ query_after = ' '.join([
+ 'SELECT ?urn tracker:id(?urn) '
+ ' FROM NAMED <%s> ',
+ ' WHERE { '
+ ' ?urn a rdfs:Resource ; ',
+ ' nie:isStoredAs <%s> ; ',
+ self.after_predicates,
+ '}'
+ ]) % (self.graph, file_url)
+
+ # When a file is updated, the DataObject representing the file gets
+ # an UPDATE notification. The InformationElement representing the
+ # content gets a DELETE and CREATE notification, because it is
+ # deleted and recreated. We detect the latter situation.
+
+ self.matched_delete = False
+ def match_cb(notifier, service, graph, events):
+ for event in events:
+ log.debug("Received %s event for id %s", event.get_event_type().value_nick, event.get_id())
+ if event.get_id() == self.before_resource_id and event.get_event_type() ==
Tracker.NotifierEventType.DELETE:
+ log.debug(" Matched delete")
+ self.matched_delete = True
+
+ # The after_predicates may match after the miner-fs creates
+ # the new resource, or they may only match once the extractor
+ # processes the resource. The latter will be an UPDATE event.
+ elif self.matched_delete and event.get_event_type() in [Tracker.NotifierEventType.CREATE,
Tracker.NotifierEventType.UPDATE]:
+ cursor = self.conn.query(query_after)
+
+ if cursor.next():
+ self.result.urn = cursor.get_string(0)[0]
+ self.result.id = cursor.get_integer(1)
+ log.debug("Query matched! Got new urn %s", self.result.urn)
+
+ self.matched = True
+ self.loop.quit()
+
+ def timeout_cb():
+ log.info("Timeout fired after %s seconds", self.timeout)
+ raise AwaitTimeoutException(
+ f"Timeout awaiting update of resource {self.before_resource_id} "
+ f"with URL {file_url} matching: {self.after_predicates}")
+
+ self.signal_id = self.notifier.connect('events', match_cb)
+ self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+
+ return self.result
+
def __exit__(self, etype, evalue, etraceback):
if etype is not None:
return False
@@ -360,11 +445,17 @@ class StoreHelper():
"""Context manager that blocks until a resource is inserted."""
return await_insert(self.conn, graph, predicates, timeout)
- def await_update(self, graph, resource_id, before_predicates, after_predicates,
+ def await_property_update(self, graph, resource_id, before_predicates, after_predicates,
+ timeout=DEFAULT_TIMEOUT):
+ """Context manager that blocks until a resource property is updated."""
+ return await_property_update(self.conn, graph, resource_id, before_predicates,
+ after_predicates, timeout)
+
+ def await_content_update(self, graph, before_resource_id, before_predicates, after_predicates,
timeout=DEFAULT_TIMEOUT):
- """Context manager that blocks until a resource is updated."""
- return await_update(self.conn, graph, resource_id, before_predicates,
- after_predicates, timeout)
+ """Context manager that blocks until a resource is deleted and recreated."""
+ return await_content_update(self.conn, graph, before_resource_id, before_predicates,
+ after_predicates, timeout)
def await_delete(self, graph, resource_id, timeout=DEFAULT_TIMEOUT):
"""Context manager that blocks until a resource is deleted."""
@@ -388,7 +479,6 @@ class StoreHelper():
'}'
])
- log.debug("Running: %s", query_initial)
cursor = self.conn.query(query_initial)
if cursor.next():
resource.urn = cursor.get_string(0)[0]
@@ -435,13 +525,17 @@ class StoreHelper():
else:
raise Exception("Multiple entries for resource %s" % uri)
- # FIXME: rename to get_resource_id_by_nepomuk_url !!
- def get_resource_id(self, url):
+ def get_content_resource_id(self, url):
"""
- Get the internal ID for a given resource, identified by URL.
+ Gets the internal ID for an nie:InformationElement resource.
+
+ The InformationElement represents data stored in a file, not
+ the file itself. The 'url' parameter is the URL of the file
+ that stores the given content.
+
"""
result = self.query(
- 'SELECT tracker:id(?r) WHERE { ?r nie:isStoredAs "%s" }' % url)
+ 'SELECT tracker:id(?r) WHERE { ?r a nie:InformationElement; nie:isStoredAs "%s" }' % url)
if len(result) == 1:
return int(result[0][0])
elif len(result) == 0:
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]