Qinusty pushed to branch Qinusty/skipped-rework at BuildStream / buildstream
Commits:
-
2b76df33
by Josh Smith at 2018-09-08T16:40:16Z
-
9c6c66b2
by Josh Smith at 2018-09-08T16:40:21Z
-
0a703cdc
by Josh Smith at 2018-09-08T16:40:21Z
-
80b265cf
by Josh Smith at 2018-09-08T16:40:21Z
13 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_exceptions.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/fetchqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/queues/pushqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/queues/trackqueue.py
- buildstream/element.py
- tests/frontend/pull.py
- tests/frontend/push.py
- tests/testutils/runcli.py
Changes:
| ... | ... | @@ -252,7 +252,7 @@ class CASCache(ArtifactCache): |
| 252 | 252 |
else:
|
| 253 | 253 |
self.context.message(Message(
|
| 254 | 254 |
None,
|
| 255 |
- MessageType.SKIPPED,
|
|
| 255 |
+ MessageType.INFO,
|
|
| 256 | 256 |
"Remote ({}) does not have {} cached".format(
|
| 257 | 257 |
remote.spec.url, element._get_brief_display_key())
|
| 258 | 258 |
))
|
| ... | ... | @@ -363,7 +363,7 @@ class CASCache(ArtifactCache): |
| 363 | 363 |
if skipped_remote:
|
| 364 | 364 |
self.context.message(Message(
|
| 365 | 365 |
None,
|
| 366 |
- MessageType.SKIPPED,
|
|
| 366 |
+ MessageType.INFO,
|
|
| 367 | 367 |
"Remote ({}) already has {} cached".format(
|
| 368 | 368 |
remote.spec.url, element._get_brief_display_key())
|
| 369 | 369 |
))
|
| ... | ... | @@ -309,3 +309,17 @@ class StreamError(BstError): |
| 309 | 309 |
class AppError(BstError):
|
| 310 | 310 |
def __init__(self, message, detail=None, reason=None):
|
| 311 | 311 |
super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason)
|
| 312 |
+ |
|
| 313 |
+ |
|
| 314 |
+# SkipJob
|
|
| 315 |
+#
|
|
| 316 |
+# Raised from a child process within a job when the job should be
|
|
| 317 |
+# considered skipped by the parent process.
|
|
| 318 |
+#
|
|
| 319 |
+class SkipJob(Exception):
|
|
| 320 |
+ def __init__(self, *, detail=""):
|
|
| 321 |
+ super().__init__()
|
|
| 322 |
+ self._detail = detail
|
|
| 323 |
+ |
|
| 324 |
+ def __str__(self):
|
|
| 325 |
+ return self._detail
|
| ... | ... | @@ -31,7 +31,7 @@ import multiprocessing |
| 31 | 31 |
import psutil
|
| 32 | 32 |
|
| 33 | 33 |
# BuildStream toplevel imports
|
| 34 |
-from ..._exceptions import ImplError, BstError, set_last_task_error
|
|
| 34 |
+from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
|
|
| 35 | 35 |
from ..._message import Message, MessageType, unconditional_messages
|
| 36 | 36 |
from ... import _signals, utils
|
| 37 | 37 |
|
| ... | ... | @@ -40,6 +40,7 @@ from ... import _signals, utils |
| 40 | 40 |
RC_OK = 0
|
| 41 | 41 |
RC_FAIL = 1
|
| 42 | 42 |
RC_PERM_FAIL = 2
|
| 43 |
+RC_SKIPPED = 3
|
|
| 43 | 44 |
|
| 44 | 45 |
|
| 45 | 46 |
# Used to distinguish between status messages and return values
|
| ... | ... | @@ -117,7 +118,7 @@ class Job(): |
| 117 | 118 |
self._max_retries = max_retries # Maximum number of automatic retries
|
| 118 | 119 |
self._result = None # Return value of child action in the parent
|
| 119 | 120 |
self._tries = 0 # Try count, for retryable jobs
|
| 120 |
- |
|
| 121 |
+ self._skipped = False # Indicate whether the job was skipped.
|
|
| 121 | 122 |
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
|
| 122 | 123 |
#
|
| 123 | 124 |
self._retry_flag = True
|
| ... | ... | @@ -275,6 +276,10 @@ class Job(): |
| 275 | 276 |
def set_task_id(self, task_id):
|
| 276 | 277 |
self._task_id = task_id
|
| 277 | 278 |
|
| 279 |
+ @property
|
|
| 280 |
+ def skipped(self):
|
|
| 281 |
+ return self._skipped
|
|
| 282 |
+ |
|
| 278 | 283 |
#######################################################
|
| 279 | 284 |
# Abstract Methods #
|
| 280 | 285 |
#######################################################
|
| ... | ... | @@ -396,6 +401,13 @@ class Job(): |
| 396 | 401 |
try:
|
| 397 | 402 |
# Try the task action
|
| 398 | 403 |
result = self.child_process()
|
| 404 |
+ except SkipJob as e:
|
|
| 405 |
+ elapsed = datetime.datetime.now() - starttime
|
|
| 406 |
+ self.message(MessageType.SKIPPED, str(e),
|
|
| 407 |
+ elapsed=elapsed, logfile=filename)
|
|
| 408 |
+ |
|
| 409 |
+ # Alert parent of skip by return code
|
|
| 410 |
+ self._child_shutdown(RC_SKIPPED)
|
|
| 399 | 411 |
except BstError as e:
|
| 400 | 412 |
elapsed = datetime.datetime.now() - starttime
|
| 401 | 413 |
self._retry_flag = e.temporary
|
| ... | ... | @@ -441,6 +453,7 @@ class Job(): |
| 441 | 453 |
self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
|
| 442 | 454 |
logfile=filename)
|
| 443 | 455 |
|
| 456 |
+ # XXX Verify below.
|
|
| 444 | 457 |
# Shutdown needs to stay outside of the above context manager,
|
| 445 | 458 |
# make sure we dont try to handle SIGTERM while the process
|
| 446 | 459 |
# is already busy in sys.exit()
|
| ... | ... | @@ -547,14 +560,19 @@ class Job(): |
| 547 | 560 |
# We don't want to retry if we got OK or a permanent fail.
|
| 548 | 561 |
# This is set in _child_action but must also be set for the parent.
|
| 549 | 562 |
#
|
| 550 |
- self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL)
|
|
| 563 |
+ self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL, RC_SKIPPED)
|
|
| 551 | 564 |
|
| 552 | 565 |
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
|
| 553 | 566 |
self.spawn()
|
| 554 | 567 |
return
|
| 555 | 568 |
|
| 556 |
- self.parent_complete(returncode == RC_OK, self._result)
|
|
| 557 |
- self._scheduler.job_completed(self, returncode == RC_OK)
|
|
| 569 |
+ if returncode == RC_SKIPPED:
|
|
| 570 |
+ # Set _skipped for parent process.
|
|
| 571 |
+ self._skipped = True
|
|
| 572 |
+ |
|
| 573 |
+ success = (returncode in (RC_OK, RC_SKIPPED))
|
|
| 574 |
+ self.parent_complete(success, self._result)
|
|
| 575 |
+ self._scheduler.job_completed(self, success)
|
|
| 558 | 576 |
|
| 559 | 577 |
# _parent_process_envelope()
|
| 560 | 578 |
#
|
| ... | ... | @@ -54,12 +54,7 @@ class BuildQueue(Queue): |
| 54 | 54 |
detail=detail, action_name=self.action_name,
|
| 55 | 55 |
elapsed=timedelta(seconds=0),
|
| 56 | 56 |
logfile=logfile)
|
| 57 |
- job = ElementJob(self._scheduler, self.action_name,
|
|
| 58 |
- logfile, element=element, queue=self,
|
|
| 59 |
- resources=self.resources,
|
|
| 60 |
- action_cb=self.process,
|
|
| 61 |
- complete_cb=self._job_done,
|
|
| 62 |
- max_retries=self._max_retries)
|
|
| 57 |
+ job = self._create_job(element)
|
|
| 63 | 58 |
self._done_queue.append(job)
|
| 64 | 59 |
self.failed_elements.append(element)
|
| 65 | 60 |
self._scheduler._job_complete_callback(job, False)
|
| ... | ... | @@ -109,5 +104,3 @@ class BuildQueue(Queue): |
| 109 | 104 |
# This has to be done after _assemble_done, such that the
|
| 110 | 105 |
# element may register its cache key as required
|
| 111 | 106 |
self._check_cache_size(job, element)
|
| 112 |
- |
|
| 113 |
- return True
|
| ... | ... | @@ -70,13 +70,8 @@ class FetchQueue(Queue): |
| 70 | 70 |
return QueueStatus.READY
|
| 71 | 71 |
|
| 72 | 72 |
def done(self, _, element, result, success):
|
| 73 |
+ if success:
|
|
| 74 |
+ element._update_state()
|
|
| 73 | 75 |
|
| 74 |
- if not success:
|
|
| 75 |
- return False
|
|
| 76 |
- |
|
| 77 |
- element._update_state()
|
|
| 78 |
- |
|
| 79 |
- # Successful fetch, we must be CACHED now
|
|
| 80 |
- assert element._get_consistency() == Consistency.CACHED
|
|
| 81 |
- |
|
| 82 |
- return True
|
|
| 76 |
+ # Successful fetch, we must be CACHED now
|
|
| 77 |
+ assert element._get_consistency() == Consistency.CACHED
|
| ... | ... | @@ -21,6 +21,7 @@ |
| 21 | 21 |
# Local imports
|
| 22 | 22 |
from . import Queue, QueueStatus
|
| 23 | 23 |
from ..resources import ResourceType
|
| 24 |
+from ..._exceptions import SkipJob
|
|
| 24 | 25 |
|
| 25 | 26 |
|
| 26 | 27 |
# A queue which pulls element artifacts
|
| ... | ... | @@ -33,7 +34,12 @@ class PullQueue(Queue): |
| 33 | 34 |
|
| 34 | 35 |
def process(self, element):
|
| 35 | 36 |
# returns whether an artifact was downloaded or not
|
| 36 |
- return element._pull()
|
|
| 37 |
+ pulled = element._pull()
|
|
| 38 |
+ |
|
| 39 |
+ if not pulled:
|
|
| 40 |
+ raise SkipJob(detail=self.action_name)
|
|
| 41 |
+ |
|
| 42 |
+ return pulled
|
|
| 37 | 43 |
|
| 38 | 44 |
def status(self, element):
|
| 39 | 45 |
# state of dependencies may have changed, recalculate element state
|
| ... | ... | @@ -53,17 +59,10 @@ class PullQueue(Queue): |
| 53 | 59 |
return QueueStatus.SKIP
|
| 54 | 60 |
|
| 55 | 61 |
def done(self, _, element, result, success):
|
| 62 |
+ if success:
|
|
| 63 |
+ element._pull_done()
|
|
| 56 | 64 |
|
| 57 |
- if not success:
|
|
| 58 |
- return False
|
|
| 59 |
- |
|
| 60 |
- element._pull_done()
|
|
| 61 |
- |
|
| 62 |
- # Build jobs will check the "approximate" size first. Since we
|
|
| 63 |
- # do not get an artifact size from pull jobs, we have to
|
|
| 64 |
- # actually check the cache size.
|
|
| 65 |
- self._scheduler._check_cache_size_real()
|
|
| 66 |
- |
|
| 67 |
- # Element._pull() returns True if it downloaded an artifact,
|
|
| 68 |
- # here we want to appear skipped if we did not download.
|
|
| 69 |
- return result
|
|
| 65 |
+ # Build jobs will check the "approximate" size first. Since we
|
|
| 66 |
+ # do not get an artifact size from pull jobs, we have to
|
|
| 67 |
+ # actually check the cache size.
|
|
| 68 |
+ self._scheduler._check_cache_size_real()
|
| ... | ... | @@ -21,6 +21,7 @@ |
| 21 | 21 |
# Local imports
|
| 22 | 22 |
from . import Queue, QueueStatus
|
| 23 | 23 |
from ..resources import ResourceType
|
| 24 |
+from ..._exceptions import SkipJob
|
|
| 24 | 25 |
|
| 25 | 26 |
|
| 26 | 27 |
# A queue which pushes element artifacts
|
| ... | ... | @@ -33,20 +34,15 @@ class PushQueue(Queue): |
| 33 | 34 |
|
| 34 | 35 |
def process(self, element):
|
| 35 | 36 |
# returns whether an artifact was uploaded or not
|
| 36 |
- return element._push()
|
|
| 37 |
+ pushed = element._push()
|
|
| 38 |
+ |
|
| 39 |
+ if not pushed:
|
|
| 40 |
+ raise SkipJob(detail=self.action_name)
|
|
| 41 |
+ |
|
| 42 |
+ return pushed
|
|
| 37 | 43 |
|
| 38 | 44 |
def status(self, element):
|
| 39 | 45 |
if element._skip_push():
|
| 40 | 46 |
return QueueStatus.SKIP
|
| 41 | 47 |
|
| 42 | 48 |
return QueueStatus.READY
|
| 43 |
- |
|
| 44 |
- def done(self, _, element, result, success):
|
|
| 45 |
- |
|
| 46 |
- if not success:
|
|
| 47 |
- return False
|
|
| 48 |
- |
|
| 49 |
- # Element._push() returns True if it uploaded an artifact,
|
|
| 50 |
- # here we want to appear skipped if the remote already had
|
|
| 51 |
- # the artifact.
|
|
| 52 |
- return result
|
| ... | ... | @@ -136,10 +136,6 @@ class Queue(): |
| 136 | 136 |
# success (bool): True if the process() implementation did not
|
| 137 | 137 |
# raise any exception
|
| 138 | 138 |
#
|
| 139 |
- # Returns:
|
|
| 140 |
- # (bool): True if the element should appear to be processsed,
|
|
| 141 |
- # Otherwise False will count the element as "skipped"
|
|
| 142 |
- #
|
|
| 143 | 139 |
def done(self, job, element, result, success):
|
| 144 | 140 |
pass
|
| 145 | 141 |
|
| ... | ... | @@ -158,20 +154,8 @@ class Queue(): |
| 158 | 154 |
if not elts:
|
| 159 | 155 |
return
|
| 160 | 156 |
|
| 161 |
- # Note: The internal lists work with jobs. This is not
|
|
| 162 |
- # reflected in any external methods (except
|
|
| 163 |
- # pop/peek_ready_jobs).
|
|
| 164 |
- def create_job(element):
|
|
| 165 |
- logfile = self._element_log_path(element)
|
|
| 166 |
- return ElementJob(self._scheduler, self.action_name,
|
|
| 167 |
- logfile, element=element, queue=self,
|
|
| 168 |
- resources=self.resources,
|
|
| 169 |
- action_cb=self.process,
|
|
| 170 |
- complete_cb=self._job_done,
|
|
| 171 |
- max_retries=self._max_retries)
|
|
| 172 |
- |
|
| 173 | 157 |
# Place skipped elements directly on the done queue
|
| 174 |
- jobs = [create_job(elt) for elt in elts]
|
|
| 158 |
+ jobs = [self._create_job(elt) for elt in elts]
|
|
| 175 | 159 |
skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
|
| 176 | 160 |
wait = [job for job in jobs if job not in skip]
|
| 177 | 161 |
|
| ... | ... | @@ -308,8 +292,7 @@ class Queue(): |
| 308 | 292 |
# and determine if it should be considered as processed
|
| 309 | 293 |
# or skipped.
|
| 310 | 294 |
try:
|
| 311 |
- processed = self.done(job, element, result, success)
|
|
| 312 |
- |
|
| 295 |
+ self.done(job, element, result, success)
|
|
| 313 | 296 |
except BstError as e:
|
| 314 | 297 |
|
| 315 | 298 |
# Report error and mark as failed
|
| ... | ... | @@ -339,7 +322,7 @@ class Queue(): |
| 339 | 322 |
self._done_queue.append(job)
|
| 340 | 323 |
|
| 341 | 324 |
if success:
|
| 342 |
- if processed:
|
|
| 325 |
+ if not job.skipped:
|
|
| 343 | 326 |
self.processed_elements.append(element)
|
| 344 | 327 |
else:
|
| 345 | 328 |
self.skipped_elements.append(element)
|
| ... | ... | @@ -360,3 +343,15 @@ class Queue(): |
| 360 | 343 |
logfile = "{key}-{action}".format(key=key, action=action)
|
| 361 | 344 |
|
| 362 | 345 |
return os.path.join(project.name, element.normal_name, logfile)
|
| 346 |
+ |
|
| 347 |
+ # Note: The internal lists work with jobs. This is not
|
|
| 348 |
+ # reflected in any external methods (except
|
|
| 349 |
+ # pop/peek_ready_jobs).
|
|
| 350 |
+ def _create_job(self, element):
|
|
| 351 |
+ logfile = self._element_log_path(element)
|
|
| 352 |
+ return ElementJob(self._scheduler, self.action_name,
|
|
| 353 |
+ logfile, element=element, queue=self,
|
|
| 354 |
+ resources=self.resources,
|
|
| 355 |
+ action_cb=self.process,
|
|
| 356 |
+ complete_cb=self._job_done,
|
|
| 357 |
+ max_retries=self._max_retries)
|
| ... | ... | @@ -49,20 +49,10 @@ class TrackQueue(Queue): |
| 49 | 49 |
return QueueStatus.READY
|
| 50 | 50 |
|
| 51 | 51 |
def done(self, _, element, result, success):
|
| 52 |
+ if success:
|
|
| 53 |
+ # Set the new refs in the main process one by one as they complete
|
|
| 54 |
+ for unique_id, new_ref in result:
|
|
| 55 |
+ source = _plugin_lookup(unique_id)
|
|
| 56 |
+ source._save_ref(new_ref)
|
|
| 52 | 57 |
|
| 53 |
- if not success:
|
|
| 54 |
- return False
|
|
| 55 |
- |
|
| 56 |
- changed = False
|
|
| 57 |
- |
|
| 58 |
- # Set the new refs in the main process one by one as they complete
|
|
| 59 |
- for unique_id, new_ref in result:
|
|
| 60 |
- source = _plugin_lookup(unique_id)
|
|
| 61 |
- # We appear processed if at least one source has changed
|
|
| 62 |
- if source._save_ref(new_ref):
|
|
| 63 |
- changed = True
|
|
| 64 |
- |
|
| 65 |
- element._tracking_done()
|
|
| 66 |
- |
|
| 67 |
- # We'll appear as a skipped element if tracking resulted in no change
|
|
| 68 |
- return changed
|
|
| 58 |
+ element._tracking_done()
|
| ... | ... | @@ -1746,7 +1746,7 @@ class Element(Plugin): |
| 1746 | 1746 |
|
| 1747 | 1747 |
# Notify successfull download
|
| 1748 | 1748 |
display_key = self._get_brief_display_key()
|
| 1749 |
- self.info("Downloaded artifact {}".format(display_key))
|
|
| 1749 |
+ self.info("Pulled artifact {}".format(display_key))
|
|
| 1750 | 1750 |
return True
|
| 1751 | 1751 |
|
| 1752 | 1752 |
# _skip_push():
|
| ... | ... | @@ -1785,16 +1785,15 @@ class Element(Plugin): |
| 1785 | 1785 |
self.warn("Not pushing tainted artifact.")
|
| 1786 | 1786 |
return False
|
| 1787 | 1787 |
|
| 1788 |
- display_key = self._get_brief_display_key()
|
|
| 1789 |
- with self.timed_activity("Pushing artifact {}".format(display_key)):
|
|
| 1790 |
- # Push all keys used for local commit
|
|
| 1791 |
- pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
|
|
| 1792 |
- if not pushed:
|
|
| 1793 |
- return False
|
|
| 1788 |
+ # Push all keys used for local commit
|
|
| 1789 |
+ pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
|
|
| 1790 |
+ if not pushed:
|
|
| 1791 |
+ return False
|
|
| 1794 | 1792 |
|
| 1795 |
- # Notify successful upload
|
|
| 1796 |
- self.info("Pushed artifact {}".format(display_key))
|
|
| 1797 |
- return True
|
|
| 1793 |
+ # Notify successful upload
|
|
| 1794 |
+ display_key = self._get_brief_display_key()
|
|
| 1795 |
+ self.info("Pushed artifact {}".format(display_key))
|
|
| 1796 |
+ return True
|
|
| 1798 | 1797 |
|
| 1799 | 1798 |
# _shell():
|
| 1800 | 1799 |
#
|
| ... | ... | @@ -356,4 +356,5 @@ def test_pull_missing_notifies_user(caplog, cli, tmpdir, datafiles): |
| 356 | 356 |
assert not result.get_pulled_elements(), \
|
| 357 | 357 |
"No elements should have been pulled since the cache was empty"
|
| 358 | 358 |
|
| 359 |
- assert "SKIPPED Remote ({}) does not have".format(share.repo) in result.stderr
|
|
| 359 |
+ assert "INFO Remote ({}) does not have".format(share.repo) in result.stderr
|
|
| 360 |
+ assert "SKIPPED Pull" in result.stderr
|
| ... | ... | @@ -386,3 +386,25 @@ def test_push_cross_junction(cli, tmpdir, datafiles): |
| 386 | 386 |
|
| 387 | 387 |
cache_key = cli.get_element_key(project, 'junction.bst:import-etc.bst')
|
| 388 | 388 |
assert share.has_artifact('subtest', 'import-etc.bst', cache_key)
|
| 389 |
+ |
|
| 390 |
+ |
|
| 391 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
| 392 |
+def test_push_already_cached(caplog, cli, tmpdir, datafiles):
|
|
| 393 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
| 394 |
+ caplog.set_level(1)
|
|
| 395 |
+ |
|
| 396 |
+ with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
|
|
| 397 |
+ |
|
| 398 |
+ cli.configure({
|
|
| 399 |
+ 'artifacts': {'url': share.repo, 'push': True}
|
|
| 400 |
+ })
|
|
| 401 |
+ result = cli.run(project=project, args=['build', 'target.bst'])
|
|
| 402 |
+ |
|
| 403 |
+ result.assert_success()
|
|
| 404 |
+ assert "SKIPPED Push" not in result.stderr
|
|
| 405 |
+ |
|
| 406 |
+ result = cli.run(project=project, args=['push', 'target.bst'])
|
|
| 407 |
+ |
|
| 408 |
+ assert not result.get_pushed_elements(), "No elements should have been pushed since the cache was populated"
|
|
| 409 |
+ assert "INFO Remote ({}) already has ".format(share.repo) in result.stderr
|
|
| 410 |
+ assert "SKIPPED Push" in result.stderr
|
| ... | ... | @@ -178,7 +178,7 @@ class Result(): |
| 178 | 178 |
return list(pushed)
|
| 179 | 179 |
|
| 180 | 180 |
def get_pulled_elements(self):
|
| 181 |
- pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Downloaded artifact', self.stderr)
|
|
| 181 |
+ pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled artifact', self.stderr)
|
|
| 182 | 182 |
if pulled is None:
|
| 183 | 183 |
return []
|
| 184 | 184 |
|
