Tristan Van Berkom pushed to branch bst-1.2 at BuildStream / buildstream
Commits:
-
8a378557
by Josh Smith at 2018-09-19T12:26:24Z
-
93ab82d5
by Josh Smith at 2018-09-19T12:26:24Z
-
0a314bb8
by Josh Smith at 2018-09-19T12:26:24Z
-
bb66744d
by Josh Smith at 2018-09-20T08:30:23Z
-
283ff3fc
by Josh Smith at 2018-09-20T08:30:30Z
-
c6322a41
by Tristan Van Berkom at 2018-09-20T09:41:50Z
13 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_exceptions.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/fetchqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/queues/pushqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/queues/trackqueue.py
- buildstream/element.py
- tests/frontend/pull.py
- tests/frontend/push.py
- tests/testutils/runcli.py
Changes:
... | ... | @@ -225,8 +225,8 @@ class CASCache(ArtifactCache): |
225 | 225 |
for remote in self._remotes[project]:
|
226 | 226 |
try:
|
227 | 227 |
remote.init()
|
228 |
- |
|
229 |
- element.info("Pulling {} <- {}".format(element._get_brief_display_key(), remote.spec.url))
|
|
228 |
+ display_key = element._get_brief_display_key()
|
|
229 |
+ element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
|
|
230 | 230 |
|
231 | 231 |
request = buildstream_pb2.GetReferenceRequest()
|
232 | 232 |
request.key = ref
|
... | ... | @@ -240,6 +240,7 @@ class CASCache(ArtifactCache): |
240 | 240 |
|
241 | 241 |
self.set_ref(ref, tree)
|
242 | 242 |
|
243 |
+ element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
|
|
243 | 244 |
# no need to pull from additional remotes
|
244 | 245 |
return True
|
245 | 246 |
|
... | ... | @@ -248,11 +249,8 @@ class CASCache(ArtifactCache): |
248 | 249 |
raise ArtifactError("Failed to pull artifact {}: {}".format(
|
249 | 250 |
element._get_brief_display_key(), e)) from e
|
250 | 251 |
else:
|
251 |
- self.context.message(Message(
|
|
252 |
- None,
|
|
253 |
- MessageType.SKIPPED,
|
|
254 |
- "Remote ({}) does not have {} cached".format(
|
|
255 |
- remote.spec.url, element._get_brief_display_key())
|
|
252 |
+ element.info("Remote ({}) does not have {} cached".format(
|
|
253 |
+ remote.spec.url, element._get_brief_display_key()
|
|
256 | 254 |
))
|
257 | 255 |
|
258 | 256 |
return False
|
... | ... | @@ -273,11 +271,11 @@ class CASCache(ArtifactCache): |
273 | 271 |
push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
274 | 272 |
|
275 | 273 |
pushed = False
|
276 |
- |
|
274 |
+ display_key = element._get_brief_display_key()
|
|
277 | 275 |
for remote in push_remotes:
|
278 | 276 |
remote.init()
|
279 | 277 |
skipped_remote = True
|
280 |
- element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
|
|
278 |
+ element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
|
281 | 279 |
|
282 | 280 |
try:
|
283 | 281 |
for ref in refs:
|
... | ... | @@ -354,6 +352,9 @@ class CASCache(ArtifactCache): |
354 | 352 |
|
355 | 353 |
pushed = True
|
356 | 354 |
|
355 |
+ if not skipped_remote:
|
|
356 |
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
357 |
+ |
|
357 | 358 |
except grpc.RpcError as e:
|
358 | 359 |
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
359 | 360 |
raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
... | ... | @@ -361,7 +362,7 @@ class CASCache(ArtifactCache): |
361 | 362 |
if skipped_remote:
|
362 | 363 |
self.context.message(Message(
|
363 | 364 |
None,
|
364 |
- MessageType.SKIPPED,
|
|
365 |
+ MessageType.INFO,
|
|
365 | 366 |
"Remote ({}) already has {} cached".format(
|
366 | 367 |
remote.spec.url, element._get_brief_display_key())
|
367 | 368 |
))
|
... | ... | @@ -311,3 +311,12 @@ class StreamError(BstError): |
311 | 311 |
class AppError(BstError):
|
312 | 312 |
def __init__(self, message, detail=None, reason=None):
|
313 | 313 |
super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason)
|
314 |
+ |
|
315 |
+ |
|
316 |
+# SkipJob
|
|
317 |
+#
|
|
318 |
+# Raised from a child process within a job when the job should be
|
|
319 |
+# considered skipped by the parent process.
|
|
320 |
+#
|
|
321 |
+class SkipJob(Exception):
|
|
322 |
+ pass
|
... | ... | @@ -31,7 +31,7 @@ import multiprocessing |
31 | 31 |
import psutil
|
32 | 32 |
|
33 | 33 |
# BuildStream toplevel imports
|
34 |
-from ..._exceptions import ImplError, BstError, set_last_task_error
|
|
34 |
+from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
|
|
35 | 35 |
from ..._message import Message, MessageType, unconditional_messages
|
36 | 36 |
from ... import _signals, utils
|
37 | 37 |
|
... | ... | @@ -40,6 +40,7 @@ from ... import _signals, utils |
40 | 40 |
RC_OK = 0
|
41 | 41 |
RC_FAIL = 1
|
42 | 42 |
RC_PERM_FAIL = 2
|
43 |
+RC_SKIPPED = 3
|
|
43 | 44 |
|
44 | 45 |
|
45 | 46 |
# Used to distinguish between status messages and return values
|
... | ... | @@ -117,7 +118,7 @@ class Job(): |
117 | 118 |
self._max_retries = max_retries # Maximum number of automatic retries
|
118 | 119 |
self._result = None # Return value of child action in the parent
|
119 | 120 |
self._tries = 0 # Try count, for retryable jobs
|
120 |
- |
|
121 |
+ self._skipped_flag = False # Indicate whether the job was skipped.
|
|
121 | 122 |
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
|
122 | 123 |
#
|
123 | 124 |
self._retry_flag = True
|
... | ... | @@ -275,6 +276,14 @@ class Job(): |
275 | 276 |
def set_task_id(self, task_id):
|
276 | 277 |
self._task_id = task_id
|
277 | 278 |
|
279 |
+ # skipped
|
|
280 |
+ #
|
|
281 |
+ # Returns:
|
|
282 |
+ # bool: True if the job was skipped while processing.
|
|
283 |
+ @property
|
|
284 |
+ def skipped(self):
|
|
285 |
+ return self._skipped_flag
|
|
286 |
+ |
|
278 | 287 |
#######################################################
|
279 | 288 |
# Abstract Methods #
|
280 | 289 |
#######################################################
|
... | ... | @@ -396,6 +405,13 @@ class Job(): |
396 | 405 |
try:
|
397 | 406 |
# Try the task action
|
398 | 407 |
result = self.child_process()
|
408 |
+ except SkipJob as e:
|
|
409 |
+ elapsed = datetime.datetime.now() - starttime
|
|
410 |
+ self.message(MessageType.SKIPPED, str(e),
|
|
411 |
+ elapsed=elapsed, logfile=filename)
|
|
412 |
+ |
|
413 |
+ # Alert parent of skip by return code
|
|
414 |
+ self._child_shutdown(RC_SKIPPED)
|
|
399 | 415 |
except BstError as e:
|
400 | 416 |
elapsed = datetime.datetime.now() - starttime
|
401 | 417 |
self._retry_flag = e.temporary
|
... | ... | @@ -543,14 +559,18 @@ class Job(): |
543 | 559 |
# We don't want to retry if we got OK or a permanent fail.
|
544 | 560 |
# This is set in _child_action but must also be set for the parent.
|
545 | 561 |
#
|
546 |
- self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL)
|
|
562 |
+ self._retry_flag = returncode == RC_FAIL
|
|
563 |
+ |
|
564 |
+ # Set the flag to alert Queue that this job skipped.
|
|
565 |
+ self._skipped_flag = returncode == RC_SKIPPED
|
|
547 | 566 |
|
548 | 567 |
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
|
549 | 568 |
self.spawn()
|
550 | 569 |
return
|
551 | 570 |
|
552 |
- self.parent_complete(returncode == RC_OK, self._result)
|
|
553 |
- self._scheduler.job_completed(self, returncode == RC_OK)
|
|
571 |
+ success = returncode in (RC_OK, RC_SKIPPED)
|
|
572 |
+ self.parent_complete(success, self._result)
|
|
573 |
+ self._scheduler.job_completed(self, success)
|
|
554 | 574 |
|
555 | 575 |
# _parent_process_envelope()
|
556 | 576 |
#
|
... | ... | @@ -77,5 +77,3 @@ class BuildQueue(Queue): |
77 | 77 |
# This has to be done after _assemble_done, such that the
|
78 | 78 |
# element may register its cache key as required
|
79 | 79 |
self._check_cache_size(job, element, result)
|
80 |
- |
|
81 |
- return True
|
... | ... | @@ -70,11 +70,9 @@ class FetchQueue(Queue): |
70 | 70 |
def done(self, _, element, result, success):
|
71 | 71 |
|
72 | 72 |
if not success:
|
73 |
- return False
|
|
73 |
+ return
|
|
74 | 74 |
|
75 | 75 |
element._update_state()
|
76 | 76 |
|
77 | 77 |
# Successful fetch, we must be CACHED now
|
78 | 78 |
assert element._get_consistency() == Consistency.CACHED
|
79 |
- |
|
80 |
- return True
|
... | ... | @@ -21,6 +21,7 @@ |
21 | 21 |
# Local imports
|
22 | 22 |
from . import Queue, QueueStatus
|
23 | 23 |
from ..resources import ResourceType
|
24 |
+from ..._exceptions import SkipJob
|
|
24 | 25 |
|
25 | 26 |
|
26 | 27 |
# A queue which pulls element artifacts
|
... | ... | @@ -33,7 +34,8 @@ class PullQueue(Queue): |
33 | 34 |
|
34 | 35 |
def process(self, element):
|
35 | 36 |
# returns whether an artifact was downloaded or not
|
36 |
- return element._pull()
|
|
37 |
+ if not element._pull():
|
|
38 |
+ raise SkipJob(self.action_name)
|
|
37 | 39 |
|
38 | 40 |
def status(self, element):
|
39 | 41 |
# state of dependencies may have changed, recalculate element state
|
... | ... | @@ -63,7 +65,3 @@ class PullQueue(Queue): |
63 | 65 |
# do not get an artifact size from pull jobs, we have to
|
64 | 66 |
# actually check the cache size.
|
65 | 67 |
self._scheduler.check_cache_size()
|
66 |
- |
|
67 |
- # Element._pull() returns True if it downloaded an artifact,
|
|
68 |
- # here we want to appear skipped if we did not download.
|
|
69 |
- return result
|
... | ... | @@ -21,6 +21,7 @@ |
21 | 21 |
# Local imports
|
22 | 22 |
from . import Queue, QueueStatus
|
23 | 23 |
from ..resources import ResourceType
|
24 |
+from ..._exceptions import SkipJob
|
|
24 | 25 |
|
25 | 26 |
|
26 | 27 |
# A queue which pushes element artifacts
|
... | ... | @@ -33,20 +34,11 @@ class PushQueue(Queue): |
33 | 34 |
|
34 | 35 |
def process(self, element):
|
35 | 36 |
# returns whether an artifact was uploaded or not
|
36 |
- return element._push()
|
|
37 |
+ if not element._push():
|
|
38 |
+ raise SkipJob(self.action_name)
|
|
37 | 39 |
|
38 | 40 |
def status(self, element):
|
39 | 41 |
if element._skip_push():
|
40 | 42 |
return QueueStatus.SKIP
|
41 | 43 |
|
42 | 44 |
return QueueStatus.READY
|
43 |
- |
|
44 |
- def done(self, _, element, result, success):
|
|
45 |
- |
|
46 |
- if not success:
|
|
47 |
- return False
|
|
48 |
- |
|
49 |
- # Element._push() returns True if it uploaded an artifact,
|
|
50 |
- # here we want to appear skipped if the remote already had
|
|
51 |
- # the artifact.
|
|
52 |
- return result
|
... | ... | @@ -136,10 +136,6 @@ class Queue(): |
136 | 136 |
# success (bool): True if the process() implementation did not
|
137 | 137 |
# raise any exception
|
138 | 138 |
#
|
139 |
- # Returns:
|
|
140 |
- # (bool): True if the element should appear to be processsed,
|
|
141 |
- # Otherwise False will count the element as "skipped"
|
|
142 |
- #
|
|
143 | 139 |
def done(self, job, element, result, success):
|
144 | 140 |
pass
|
145 | 141 |
|
... | ... | @@ -305,8 +301,7 @@ class Queue(): |
305 | 301 |
# and determine if it should be considered as processed
|
306 | 302 |
# or skipped.
|
307 | 303 |
try:
|
308 |
- processed = self.done(job, element, result, success)
|
|
309 |
- |
|
304 |
+ self.done(job, element, result, success)
|
|
310 | 305 |
except BstError as e:
|
311 | 306 |
|
312 | 307 |
# Report error and mark as failed
|
... | ... | @@ -335,7 +330,7 @@ class Queue(): |
335 | 330 |
#
|
336 | 331 |
if success:
|
337 | 332 |
self._done_queue.append(job)
|
338 |
- if processed:
|
|
333 |
+ if not job.skipped:
|
|
339 | 334 |
self.processed_elements.append(element)
|
340 | 335 |
else:
|
341 | 336 |
self.skipped_elements.append(element)
|
... | ... | @@ -51,18 +51,11 @@ class TrackQueue(Queue): |
51 | 51 |
def done(self, _, element, result, success):
|
52 | 52 |
|
53 | 53 |
if not success:
|
54 |
- return False
|
|
55 |
- |
|
56 |
- changed = False
|
|
54 |
+ return
|
|
57 | 55 |
|
58 | 56 |
# Set the new refs in the main process one by one as they complete
|
59 | 57 |
for unique_id, new_ref in result:
|
60 | 58 |
source = _plugin_lookup(unique_id)
|
61 |
- # We appear processed if at least one source has changed
|
|
62 |
- if source._save_ref(new_ref):
|
|
63 |
- changed = True
|
|
59 |
+ source._save_ref(new_ref)
|
|
64 | 60 |
|
65 | 61 |
element._tracking_done()
|
66 |
- |
|
67 |
- # We'll appear as a skipped element if tracking resulted in no change
|
|
68 |
- return changed
|
... | ... | @@ -1672,8 +1672,6 @@ class Element(Plugin): |
1672 | 1672 |
return False
|
1673 | 1673 |
|
1674 | 1674 |
# Notify successfull download
|
1675 |
- display_key = self._get_brief_display_key()
|
|
1676 |
- self.info("Downloaded artifact {}".format(display_key))
|
|
1677 | 1675 |
return True
|
1678 | 1676 |
|
1679 | 1677 |
# _skip_push():
|
... | ... | @@ -1712,16 +1710,13 @@ class Element(Plugin): |
1712 | 1710 |
self.warn("Not pushing tainted artifact.")
|
1713 | 1711 |
return False
|
1714 | 1712 |
|
1715 |
- display_key = self._get_brief_display_key()
|
|
1716 |
- with self.timed_activity("Pushing artifact {}".format(display_key)):
|
|
1717 |
- # Push all keys used for local commit
|
|
1718 |
- pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
|
|
1719 |
- if not pushed:
|
|
1720 |
- return False
|
|
1713 |
+ # Push all keys used for local commit
|
|
1714 |
+ pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
|
|
1715 |
+ if not pushed:
|
|
1716 |
+ return False
|
|
1721 | 1717 |
|
1722 |
- # Notify successful upload
|
|
1723 |
- self.info("Pushed artifact {}".format(display_key))
|
|
1724 |
- return True
|
|
1718 |
+ # Notify successful upload
|
|
1719 |
+ return True
|
|
1725 | 1720 |
|
1726 | 1721 |
# _shell():
|
1727 | 1722 |
#
|
... | ... | @@ -356,4 +356,5 @@ def test_pull_missing_notifies_user(caplog, cli, tmpdir, datafiles): |
356 | 356 |
assert not result.get_pulled_elements(), \
|
357 | 357 |
"No elements should have been pulled since the cache was empty"
|
358 | 358 |
|
359 |
- assert "SKIPPED Remote ({}) does not have".format(share.repo) in result.stderr
|
|
359 |
+ assert "INFO Remote ({}) does not have".format(share.repo) in result.stderr
|
|
360 |
+ assert "SKIPPED Pull" in result.stderr
|
... | ... | @@ -362,3 +362,26 @@ def test_push_cross_junction(cli, tmpdir, datafiles): |
362 | 362 |
|
363 | 363 |
cache_key = cli.get_element_key(project, 'junction.bst:import-etc.bst')
|
364 | 364 |
assert share.has_artifact('subtest', 'import-etc.bst', cache_key)
|
365 |
+ |
|
366 |
+ |
|
367 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
368 |
+def test_push_already_cached(caplog, cli, tmpdir, datafiles):
|
|
369 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
370 |
+ caplog.set_level(1)
|
|
371 |
+ |
|
372 |
+ with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
|
|
373 |
+ |
|
374 |
+ cli.configure({
|
|
375 |
+ 'artifacts': {'url': share.repo, 'push': True}
|
|
376 |
+ })
|
|
377 |
+ result = cli.run(project=project, args=['build', 'target.bst'])
|
|
378 |
+ |
|
379 |
+ result.assert_success()
|
|
380 |
+ assert "SKIPPED Push" not in result.stderr
|
|
381 |
+ |
|
382 |
+ result = cli.run(project=project, args=['push', 'target.bst'])
|
|
383 |
+ |
|
384 |
+ result.assert_success()
|
|
385 |
+ assert not result.get_pushed_elements(), "No elements should have been pushed since the cache was populated"
|
|
386 |
+ assert "INFO Remote ({}) already has ".format(share.repo) in result.stderr
|
|
387 |
+ assert "SKIPPED Push" in result.stderr
|
... | ... | @@ -178,7 +178,7 @@ class Result(): |
178 | 178 |
return list(pushed)
|
179 | 179 |
|
180 | 180 |
def get_pulled_elements(self):
|
181 |
- pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Downloaded artifact', self.stderr)
|
|
181 |
+ pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled artifact', self.stderr)
|
|
182 | 182 |
if pulled is None:
|
183 | 183 |
return []
|
184 | 184 |
|