Tom Pollard pushed to branch tpollard/566 at BuildStream / buildstream
Commits:
-
891fcb0e
by Tristan Van Berkom at 2019-01-07T16:47:01Z
-
5de42d43
by Tristan Van Berkom at 2019-01-07T18:00:37Z
-
059035b9
by Tristan Van Berkom at 2019-01-07T18:02:00Z
-
b83d1b1f
by Tristan Van Berkom at 2019-01-07T18:02:00Z
-
16a8816f
by Tristan Van Berkom at 2019-01-07T18:02:00Z
-
c2fc2a5e
by Tristan Van Berkom at 2019-01-07T18:02:00Z
-
3e3984ad
by Tristan Van Berkom at 2019-01-07T18:50:23Z
-
512c726e
by Tristan Van Berkom at 2019-01-08T03:38:11Z
-
01171988
by Tristan Van Berkom at 2019-01-08T04:20:14Z
-
6c1d06d6
by Phil Dawson at 2019-01-08T10:24:32Z
-
914ecb72
by Jürg Billeter at 2019-01-08T10:54:02Z
-
75b0c186
by Tom Pollard at 2019-01-08T12:12:37Z
20 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_frontend/app.py
- buildstream/_scheduler/__init__.py
- buildstream/_scheduler/jobs/__init__.py
- buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/jobs/elementjob.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/fetchqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/queues/trackqueue.py
- buildstream/_scheduler/scheduler.py
- buildstream/element.py
- buildstream/sandbox/sandbox.py
- buildstream/utils.py
- + tests/integration/pushbuildtrees.py
- tests/testutils/runcli.py
Changes:
| ... | ... | @@ -74,6 +74,7 @@ class ArtifactCache(): |
| 74 | 74 |
|
| 75 | 75 |
self._has_fetch_remotes = False
|
| 76 | 76 |
self._has_push_remotes = False
|
| 77 |
+ self._has_partial_push_remotes = False
|
|
| 77 | 78 |
|
| 78 | 79 |
os.makedirs(self.extractdir, exist_ok=True)
|
| 79 | 80 |
|
| ... | ... | @@ -398,6 +399,8 @@ class ArtifactCache(): |
| 398 | 399 |
self._has_fetch_remotes = True
|
| 399 | 400 |
if remote_spec.push:
|
| 400 | 401 |
self._has_push_remotes = True
|
| 402 |
+ if remote_spec.partial_push:
|
|
| 403 |
+ self._has_partial_push_remotes = True
|
|
| 401 | 404 |
|
| 402 | 405 |
remotes[remote_spec.url] = CASRemote(remote_spec)
|
| 403 | 406 |
|
| ... | ... | @@ -596,6 +599,31 @@ class ArtifactCache(): |
| 596 | 599 |
remotes_for_project = self._remotes[element._get_project()]
|
| 597 | 600 |
return any(remote.spec.push for remote in remotes_for_project)
|
| 598 | 601 |
|
| 602 |
+ # has_partial_push_remotes():
|
|
| 603 |
+ #
|
|
| 604 |
+ # Check whether any remote repositories are available for pushing
|
|
| 605 |
+ # non-complete artifacts
|
|
| 606 |
+ #
|
|
| 607 |
+ # Args:
|
|
| 608 |
+ # element (Element): The Element to check
|
|
| 609 |
+ #
|
|
| 610 |
+ # Returns:
|
|
| 611 |
+ # (bool): True if any remote repository is configured for optional
|
|
| 612 |
+ # partial pushes, False otherwise
|
|
| 613 |
+ #
|
|
| 614 |
+ def has_partial_push_remotes(self, *, element=None):
|
|
| 615 |
+ # If there's no partial push remotes available, we can't partial push at all
|
|
| 616 |
+ if not self._has_partial_push_remotes:
|
|
| 617 |
+ return False
|
|
| 618 |
+ elif element is None:
|
|
| 619 |
+ # At least one remote is set to allow partial pushes
|
|
| 620 |
+ return True
|
|
| 621 |
+ else:
|
|
| 622 |
+ # Check whether the specified element's project has push remotes configured
|
|
| 623 |
+ # to not accept partial artifact pushes
|
|
| 624 |
+ remotes_for_project = self._remotes[element._get_project()]
|
|
| 625 |
+ return any(remote.spec.partial_push for remote in remotes_for_project)
|
|
| 626 |
+ |
|
| 599 | 627 |
# push():
|
| 600 | 628 |
#
|
| 601 | 629 |
# Push committed artifact to remote repository.
|
| ... | ... | @@ -603,6 +631,8 @@ class ArtifactCache(): |
| 603 | 631 |
# Args:
|
| 604 | 632 |
# element (Element): The Element whose artifact is to be pushed
|
| 605 | 633 |
# keys (list): The cache keys to use
|
| 634 |
+ # partial(bool): If the artifact is cached in a partial state
|
|
| 635 |
+ # subdir(string): Optional subdir to not push
|
|
| 606 | 636 |
#
|
| 607 | 637 |
# Returns:
|
| 608 | 638 |
# (bool): True if any remote was updated, False if no pushes were required
|
| ... | ... | @@ -610,12 +640,25 @@ class ArtifactCache(): |
| 610 | 640 |
# Raises:
|
| 611 | 641 |
# (ArtifactError): if there was an error
|
| 612 | 642 |
#
|
| 613 |
- def push(self, element, keys):
|
|
| 643 |
+ def push(self, element, keys, partial=False, subdir=None):
|
|
| 614 | 644 |
refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
| 615 | 645 |
|
| 616 | 646 |
project = element._get_project()
|
| 617 | 647 |
|
| 618 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
| 648 |
+ push_remotes = []
|
|
| 649 |
+ partial_remotes = []
|
|
| 650 |
+ |
|
| 651 |
+ # Create list of remotes to push to, given current element and partial push config
|
|
| 652 |
+ if not partial:
|
|
| 653 |
+ push_remotes = [r for r in self._remotes[project] if (r.spec.push and not r.spec.partial_push)]
|
|
| 654 |
+ |
|
| 655 |
+ if self._has_partial_push_remotes:
|
|
| 656 |
+ # Create a specific list of the remotes expecting the artifact to be push in a partial
|
|
| 657 |
+ # state. This list needs to be pushed in a partial state, without the optional subdir if
|
|
| 658 |
+ # exists locally. No need to attempt pushing a partial artifact to a remote that is queued to
|
|
| 659 |
+ # to also recieve a full artifact
|
|
| 660 |
+ partial_remotes = [r for r in self._remotes[project] if (r.spec.partial_push and r.spec.push) and
|
|
| 661 |
+ r not in push_remotes]
|
|
| 619 | 662 |
|
| 620 | 663 |
pushed = False
|
| 621 | 664 |
|
| ... | ... | @@ -624,7 +667,7 @@ class ArtifactCache(): |
| 624 | 667 |
display_key = element._get_brief_display_key()
|
| 625 | 668 |
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
| 626 | 669 |
|
| 627 |
- if self.cas.push(refs, remote):
|
|
| 670 |
+ if self.cas.push(refs, remote, subdir=subdir):
|
|
| 628 | 671 |
element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
| 629 | 672 |
pushed = True
|
| 630 | 673 |
else:
|
| ... | ... | @@ -632,6 +675,19 @@ class ArtifactCache(): |
| 632 | 675 |
remote.spec.url, element._get_brief_display_key()
|
| 633 | 676 |
))
|
| 634 | 677 |
|
| 678 |
+ for remote in partial_remotes:
|
|
| 679 |
+ remote.init()
|
|
| 680 |
+ display_key = element._get_brief_display_key()
|
|
| 681 |
+ element.status("Pushing partial artifact {} -> {}".format(display_key, remote.spec.url))
|
|
| 682 |
+ |
|
| 683 |
+ if self.cas.push(refs, remote, excluded_subdirs=subdir):
|
|
| 684 |
+ element.info("Pushed partial artifact {} -> {}".format(display_key, remote.spec.url))
|
|
| 685 |
+ pushed = True
|
|
| 686 |
+ else:
|
|
| 687 |
+ element.info("Remote ({}) already has {} partial cached".format(
|
|
| 688 |
+ remote.spec.url, element._get_brief_display_key()
|
|
| 689 |
+ ))
|
|
| 690 |
+ |
|
| 635 | 691 |
return pushed
|
| 636 | 692 |
|
| 637 | 693 |
# pull():
|
| ... | ... | @@ -659,14 +715,23 @@ class ArtifactCache(): |
| 659 | 715 |
element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
|
| 660 | 716 |
|
| 661 | 717 |
if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
|
| 662 |
- element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
|
|
| 663 | 718 |
if subdir:
|
| 664 |
- # Attempt to extract subdir into artifact extract dir if it already exists
|
|
| 665 |
- # without containing the subdir. If the respective artifact extract dir does not
|
|
| 666 |
- # exist a complete extraction will complete.
|
|
| 667 |
- self.extract(element, key, subdir)
|
|
| 668 |
- # no need to pull from additional remotes
|
|
| 669 |
- return True
|
|
| 719 |
+ if not self.contains_subdir_artifact(element, key, subdir):
|
|
| 720 |
+ # The pull was expecting the specific subdir to be present, attempt
|
|
| 721 |
+ # to find it in other available remotes
|
|
| 722 |
+ element.info("Pulled partial artifact {} <- {}. Attempting to retrieve {} from remotes"
|
|
| 723 |
+ .format(display_key, remote.spec.url, subdir))
|
|
| 724 |
+ else:
|
|
| 725 |
+ element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
|
|
| 726 |
+ # Attempt to extract subdir into artifact extract dir if it already exists
|
|
| 727 |
+ # without containing the subdir. If the respective artifact extract dir does not
|
|
| 728 |
+ # exist a complete extraction will complete.
|
|
| 729 |
+ self.extract(element, key, subdir)
|
|
| 730 |
+ # no need to pull from additional remotes
|
|
| 731 |
+ return True
|
|
| 732 |
+ else:
|
|
| 733 |
+ element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
|
|
| 734 |
+ return True
|
|
| 670 | 735 |
else:
|
| 671 | 736 |
element.info("Remote ({}) does not have {} cached".format(
|
| 672 | 737 |
remote.spec.url, element._get_brief_display_key()
|
| ... | ... | @@ -45,7 +45,8 @@ from .. import _yaml |
| 45 | 45 |
_MAX_PAYLOAD_BYTES = 1024 * 1024
|
| 46 | 46 |
|
| 47 | 47 |
|
| 48 |
-class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')):
|
|
| 48 |
+class CASRemoteSpec(namedtuple('CASRemoteSpec',
|
|
| 49 |
+ 'url push partial_push server_cert client_key client_cert instance_name')):
|
|
| 49 | 50 |
|
| 50 | 51 |
# _new_from_config_node
|
| 51 | 52 |
#
|
| ... | ... | @@ -53,9 +54,13 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key |
| 53 | 54 |
#
|
| 54 | 55 |
@staticmethod
|
| 55 | 56 |
def _new_from_config_node(spec_node, basedir=None):
|
| 56 |
- _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance_name'])
|
|
| 57 |
+ _yaml.node_validate(spec_node,
|
|
| 58 |
+ ['url', 'push', 'allow-partial-push', 'server-cert', 'client-key',
|
|
| 59 |
+ 'client-cert', 'instance_name'])
|
|
| 57 | 60 |
url = _yaml.node_get(spec_node, str, 'url')
|
| 58 | 61 |
push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
|
| 62 |
+ partial_push = _yaml.node_get(spec_node, bool, 'allow-partial-push', default_value=False)
|
|
| 63 |
+ |
|
| 59 | 64 |
if not url:
|
| 60 | 65 |
provenance = _yaml.node_get_provenance(spec_node, 'url')
|
| 61 | 66 |
raise LoadError(LoadErrorReason.INVALID_DATA,
|
| ... | ... | @@ -85,10 +90,10 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key |
| 85 | 90 |
raise LoadError(LoadErrorReason.INVALID_DATA,
|
| 86 | 91 |
"{}: 'client-cert' was specified without 'client-key'".format(provenance))
|
| 87 | 92 |
|
| 88 |
- return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name)
|
|
| 93 |
+ return CASRemoteSpec(url, push, partial_push, server_cert, client_key, client_cert, instance_name)
|
|
| 89 | 94 |
|
| 90 | 95 |
|
| 91 |
-CASRemoteSpec.__new__.__defaults__ = (None, None, None, None)
|
|
| 96 |
+CASRemoteSpec.__new__.__defaults__ = (False, None, None, None, None)
|
|
| 92 | 97 |
|
| 93 | 98 |
|
| 94 | 99 |
class BlobNotFound(CASError):
|
| ... | ... | @@ -283,34 +288,47 @@ class CASCache(): |
| 283 | 288 |
# (bool): True if pull was successful, False if ref was not available
|
| 284 | 289 |
#
|
| 285 | 290 |
def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None):
|
| 286 |
- try:
|
|
| 287 |
- remote.init()
|
|
| 288 | 291 |
|
| 289 |
- request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
|
|
| 290 |
- request.key = ref
|
|
| 291 |
- response = remote.ref_storage.GetReference(request)
|
|
| 292 |
+ tree_found = False
|
|
| 292 | 293 |
|
| 293 |
- tree = remote_execution_pb2.Digest()
|
|
| 294 |
- tree.hash = response.digest.hash
|
|
| 295 |
- tree.size_bytes = response.digest.size_bytes
|
|
| 294 |
+ while True:
|
|
| 295 |
+ try:
|
|
| 296 |
+ if not tree_found:
|
|
| 297 |
+ remote.init()
|
|
| 296 | 298 |
|
| 297 |
- # Check if the element artifact is present, if so just fetch the subdir.
|
|
| 298 |
- if subdir and os.path.exists(self.objpath(tree)):
|
|
| 299 |
- self._fetch_subdir(remote, tree, subdir)
|
|
| 300 |
- else:
|
|
| 301 |
- # Fetch artifact, excluded_subdirs determined in pullqueue
|
|
| 302 |
- self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
|
|
| 299 |
+ request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
|
|
| 300 |
+ request.key = ref
|
|
| 301 |
+ response = remote.ref_storage.GetReference(request)
|
|
| 303 | 302 |
|
| 304 |
- self.set_ref(ref, tree)
|
|
| 303 |
+ tree = remote_execution_pb2.Digest()
|
|
| 304 |
+ tree.hash = response.digest.hash
|
|
| 305 |
+ tree.size_bytes = response.digest.size_bytes
|
|
| 305 | 306 |
|
| 306 |
- return True
|
|
| 307 |
- except grpc.RpcError as e:
|
|
| 308 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
| 309 |
- raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
|
|
| 310 |
- else:
|
|
| 311 |
- return False
|
|
| 312 |
- except BlobNotFound as e:
|
|
| 313 |
- return False
|
|
| 307 |
+ # Check if the element artifact is present, if so just fetch the subdir.
|
|
| 308 |
+ if subdir and os.path.exists(self.objpath(tree)):
|
|
| 309 |
+ self._fetch_subdir(remote, tree, subdir)
|
|
| 310 |
+ else:
|
|
| 311 |
+ # Fetch artifact, excluded_subdirs determined in pullqueue
|
|
| 312 |
+ self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
|
|
| 313 |
+ |
|
| 314 |
+ self.set_ref(ref, tree)
|
|
| 315 |
+ |
|
| 316 |
+ return True
|
|
| 317 |
+ except grpc.RpcError as e:
|
|
| 318 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
| 319 |
+ raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
|
|
| 320 |
+ else:
|
|
| 321 |
+ return False
|
|
| 322 |
+ except BlobNotFound as e:
|
|
| 323 |
+ if not excluded_subdirs and subdir:
|
|
| 324 |
+ # The remote has the top level digest but could not complete a full pull,
|
|
| 325 |
+ # attempt partial without the need to initialise and check for the artifact
|
|
| 326 |
+ # digest. This default behaviour of dropping back to partial pulls could
|
|
| 327 |
+ # be made a configurable warning given at artfictcache level.
|
|
| 328 |
+ tree_found = True
|
|
| 329 |
+ excluded_subdirs, subdir = subdir, excluded_subdirs
|
|
| 330 |
+ else:
|
|
| 331 |
+ return False
|
|
| 314 | 332 |
|
| 315 | 333 |
# pull_tree():
|
| 316 | 334 |
#
|
| ... | ... | @@ -355,6 +373,8 @@ class CASCache(): |
| 355 | 373 |
# Args:
|
| 356 | 374 |
# refs (list): The refs to push
|
| 357 | 375 |
# remote (CASRemote): The remote to push to
|
| 376 |
+ # subdir (string): Optional specific subdir to include in the push
|
|
| 377 |
+ # excluded_subdirs (list): The optional list of subdirs to not push
|
|
| 358 | 378 |
#
|
| 359 | 379 |
# Returns:
|
| 360 | 380 |
# (bool): True if any remote was updated, False if no pushes were required
|
| ... | ... | @@ -362,7 +382,7 @@ class CASCache(): |
| 362 | 382 |
# Raises:
|
| 363 | 383 |
# (CASError): if there was an error
|
| 364 | 384 |
#
|
| 365 |
- def push(self, refs, remote):
|
|
| 385 |
+ def push(self, refs, remote, *, subdir=None, excluded_subdirs=None):
|
|
| 366 | 386 |
skipped_remote = True
|
| 367 | 387 |
try:
|
| 368 | 388 |
for ref in refs:
|
| ... | ... | @@ -376,15 +396,18 @@ class CASCache(): |
| 376 | 396 |
response = remote.ref_storage.GetReference(request)
|
| 377 | 397 |
|
| 378 | 398 |
if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
| 379 |
- # ref is already on the server with the same tree
|
|
| 380 |
- continue
|
|
| 399 |
+ # ref is already on the server with the same tree, however it might be partially cached.
|
|
| 400 |
+ # If artifact is not set to be pushed partially attempt to 'complete' the remote artifact if
|
|
| 401 |
+ # needed, else continue.
|
|
| 402 |
+ if excluded_subdirs or self.verify_digest_on_remote(remote, self._get_subdir(tree, subdir)):
|
|
| 403 |
+ continue
|
|
| 381 | 404 |
|
| 382 | 405 |
except grpc.RpcError as e:
|
| 383 | 406 |
if e.code() != grpc.StatusCode.NOT_FOUND:
|
| 384 | 407 |
# Intentionally re-raise RpcError for outer except block.
|
| 385 | 408 |
raise
|
| 386 | 409 |
|
| 387 |
- self._send_directory(remote, tree)
|
|
| 410 |
+ self._send_directory(remote, tree, excluded_dir=excluded_subdirs)
|
|
| 388 | 411 |
|
| 389 | 412 |
request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
|
| 390 | 413 |
request.keys.append(ref)
|
| ... | ... | @@ -866,10 +889,17 @@ class CASCache(): |
| 866 | 889 |
a += 1
|
| 867 | 890 |
b += 1
|
| 868 | 891 |
|
| 869 |
- def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
|
|
| 892 |
+ def _reachable_refs_dir(self, reachable, tree, update_mtime=False, subdir=False):
|
|
| 870 | 893 |
if tree.hash in reachable:
|
| 871 | 894 |
return
|
| 872 | 895 |
|
| 896 |
+ # If looping through subdir digests, skip processing if
|
|
| 897 |
+ # ref path does not exist, allowing for partial objects
|
|
| 898 |
+ if subdir and not os.path.exists(self.objpath(tree)):
|
|
| 899 |
+ return
|
|
| 900 |
+ |
|
| 901 |
+ # Raises FileNotFound exception is path does not exist,
|
|
| 902 |
+ # which should only be entered on the top level digest
|
|
| 873 | 903 |
if update_mtime:
|
| 874 | 904 |
os.utime(self.objpath(tree))
|
| 875 | 905 |
|
| ... | ... | @@ -886,9 +916,9 @@ class CASCache(): |
| 886 | 916 |
reachable.add(filenode.digest.hash)
|
| 887 | 917 |
|
| 888 | 918 |
for dirnode in directory.directories:
|
| 889 |
- self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
|
|
| 919 |
+ self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, subdir=True)
|
|
| 890 | 920 |
|
| 891 |
- def _required_blobs(self, directory_digest):
|
|
| 921 |
+ def _required_blobs(self, directory_digest, excluded_dir=None):
|
|
| 892 | 922 |
# parse directory, and recursively add blobs
|
| 893 | 923 |
d = remote_execution_pb2.Digest()
|
| 894 | 924 |
d.hash = directory_digest.hash
|
| ... | ... | @@ -907,7 +937,8 @@ class CASCache(): |
| 907 | 937 |
yield d
|
| 908 | 938 |
|
| 909 | 939 |
for dirnode in directory.directories:
|
| 910 |
- yield from self._required_blobs(dirnode.digest)
|
|
| 940 |
+ if dirnode.name != excluded_dir:
|
|
| 941 |
+ yield from self._required_blobs(dirnode.digest)
|
|
| 911 | 942 |
|
| 912 | 943 |
def _fetch_blob(self, remote, digest, stream):
|
| 913 | 944 |
resource_name_components = ['blobs', digest.hash, str(digest.size_bytes)]
|
| ... | ... | @@ -1029,6 +1060,7 @@ class CASCache(): |
| 1029 | 1060 |
objpath = self._ensure_blob(remote, dir_digest)
|
| 1030 | 1061 |
|
| 1031 | 1062 |
directory = remote_execution_pb2.Directory()
|
| 1063 |
+ |
|
| 1032 | 1064 |
with open(objpath, 'rb') as f:
|
| 1033 | 1065 |
directory.ParseFromString(f.read())
|
| 1034 | 1066 |
|
| ... | ... | @@ -1104,9 +1136,8 @@ class CASCache(): |
| 1104 | 1136 |
|
| 1105 | 1137 |
assert response.committed_size == digest.size_bytes
|
| 1106 | 1138 |
|
| 1107 |
- def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
|
| 1108 |
- required_blobs = self._required_blobs(digest)
|
|
| 1109 |
- |
|
| 1139 |
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4(), excluded_dir=None):
|
|
| 1140 |
+ required_blobs = self._required_blobs(digest, excluded_dir=excluded_dir)
|
|
| 1110 | 1141 |
missing_blobs = dict()
|
| 1111 | 1142 |
# Limit size of FindMissingBlobs request
|
| 1112 | 1143 |
for required_blobs_group in _grouper(required_blobs, 512):
|
| ... | ... | @@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages |
| 38 | 38 |
from .._stream import Stream
|
| 39 | 39 |
from .._versions import BST_FORMAT_VERSION
|
| 40 | 40 |
from .. import _yaml
|
| 41 |
-from .._scheduler import ElementJob
|
|
| 41 |
+from .._scheduler import ElementJob, JobStatus
|
|
| 42 | 42 |
|
| 43 | 43 |
# Import frontend assets
|
| 44 | 44 |
from . import Profile, LogLine, Status
|
| ... | ... | @@ -515,13 +515,13 @@ class App(): |
| 515 | 515 |
self._status.add_job(job)
|
| 516 | 516 |
self._maybe_render_status()
|
| 517 | 517 |
|
| 518 |
- def _job_completed(self, job, success):
|
|
| 518 |
+ def _job_completed(self, job, status):
|
|
| 519 | 519 |
self._status.remove_job(job)
|
| 520 | 520 |
self._maybe_render_status()
|
| 521 | 521 |
|
| 522 | 522 |
# Dont attempt to handle a failure if the user has already opted to
|
| 523 | 523 |
# terminate
|
| 524 |
- if not success and not self.stream.terminated:
|
|
| 524 |
+ if status == JobStatus.FAIL and not self.stream.terminated:
|
|
| 525 | 525 |
|
| 526 | 526 |
if isinstance(job, ElementJob):
|
| 527 | 527 |
element = job.element
|
| ... | ... | @@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue |
| 26 | 26 |
from .queues.pullqueue import PullQueue
|
| 27 | 27 |
|
| 28 | 28 |
from .scheduler import Scheduler, SchedStatus
|
| 29 |
-from .jobs import ElementJob
|
|
| 29 |
+from .jobs import ElementJob, JobStatus
|
| ... | ... | @@ -20,3 +20,4 @@ |
| 20 | 20 |
from .elementjob import ElementJob
|
| 21 | 21 |
from .cachesizejob import CacheSizeJob
|
| 22 | 22 |
from .cleanupjob import CleanupJob
|
| 23 |
+from .job import JobStatus
|
| ... | ... | @@ -16,7 +16,7 @@ |
| 16 | 16 |
# Author:
|
| 17 | 17 |
# Tristan Daniël Maat <tristan maat codethink co uk>
|
| 18 | 18 |
#
|
| 19 |
-from .job import Job
|
|
| 19 |
+from .job import Job, JobStatus
|
|
| 20 | 20 |
|
| 21 | 21 |
|
| 22 | 22 |
class CacheSizeJob(Job):
|
| ... | ... | @@ -30,8 +30,8 @@ class CacheSizeJob(Job): |
| 30 | 30 |
def child_process(self):
|
| 31 | 31 |
return self._artifacts.compute_cache_size()
|
| 32 | 32 |
|
| 33 |
- def parent_complete(self, success, result):
|
|
| 34 |
- if success:
|
|
| 33 |
+ def parent_complete(self, status, result):
|
|
| 34 |
+ if status == JobStatus.OK:
|
|
| 35 | 35 |
self._artifacts.set_cache_size(result)
|
| 36 | 36 |
|
| 37 | 37 |
if self._complete_cb:
|
| ... | ... | @@ -16,7 +16,7 @@ |
| 16 | 16 |
# Author:
|
| 17 | 17 |
# Tristan Daniël Maat <tristan maat codethink co uk>
|
| 18 | 18 |
#
|
| 19 |
-from .job import Job
|
|
| 19 |
+from .job import Job, JobStatus
|
|
| 20 | 20 |
|
| 21 | 21 |
|
| 22 | 22 |
class CleanupJob(Job):
|
| ... | ... | @@ -29,6 +29,6 @@ class CleanupJob(Job): |
| 29 | 29 |
def child_process(self):
|
| 30 | 30 |
return self._artifacts.clean()
|
| 31 | 31 |
|
| 32 |
- def parent_complete(self, success, result):
|
|
| 33 |
- if success:
|
|
| 32 |
+ def parent_complete(self, status, result):
|
|
| 33 |
+ if status == JobStatus.OK:
|
|
| 34 | 34 |
self._artifacts.set_cache_size(result)
|
| ... | ... | @@ -60,7 +60,7 @@ from .job import Job |
| 60 | 60 |
# Args:
|
| 61 | 61 |
# job (Job): The job object which completed
|
| 62 | 62 |
# element (Element): The element passed to the Job() constructor
|
| 63 |
-# success (bool): True if the action_cb did not raise an exception
|
|
| 63 |
+# status (JobStatus): The status of whether the workload raised an exception
|
|
| 64 | 64 |
# result (object): The deserialized object returned by the `action_cb`, or None
|
| 65 | 65 |
# if `success` is False
|
| 66 | 66 |
#
|
| ... | ... | @@ -93,8 +93,8 @@ class ElementJob(Job): |
| 93 | 93 |
# Run the action
|
| 94 | 94 |
return self._action_cb(self._element)
|
| 95 | 95 |
|
| 96 |
- def parent_complete(self, success, result):
|
|
| 97 |
- self._complete_cb(self, self._element, success, self._result)
|
|
| 96 |
+ def parent_complete(self, status, result):
|
|
| 97 |
+ self._complete_cb(self, self._element, status, self._result)
|
|
| 98 | 98 |
|
| 99 | 99 |
def message(self, message_type, message, **kwargs):
|
| 100 | 100 |
args = dict(kwargs)
|
| ... | ... | @@ -28,8 +28,6 @@ import traceback |
| 28 | 28 |
import asyncio
|
| 29 | 29 |
import multiprocessing
|
| 30 | 30 |
|
| 31 |
-import psutil
|
|
| 32 |
- |
|
| 33 | 31 |
# BuildStream toplevel imports
|
| 34 | 32 |
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
|
| 35 | 33 |
from ..._message import Message, MessageType, unconditional_messages
|
| ... | ... | @@ -43,6 +41,22 @@ RC_PERM_FAIL = 2 |
| 43 | 41 |
RC_SKIPPED = 3
|
| 44 | 42 |
|
| 45 | 43 |
|
| 44 |
+# JobStatus:
|
|
| 45 |
+#
|
|
| 46 |
+# The job completion status, passed back through the
|
|
| 47 |
+# complete callbacks.
|
|
| 48 |
+#
|
|
| 49 |
+class JobStatus():
|
|
| 50 |
+ # Job succeeded
|
|
| 51 |
+ OK = 0
|
|
| 52 |
+ |
|
| 53 |
+ # A temporary BstError was raised
|
|
| 54 |
+ FAIL = 1
|
|
| 55 |
+ |
|
| 56 |
+ # A SkipJob was raised
|
|
| 57 |
+ SKIPPED = 3
|
|
| 58 |
+ |
|
| 59 |
+ |
|
| 46 | 60 |
# Used to distinguish between status messages and return values
|
| 47 | 61 |
class Envelope():
|
| 48 | 62 |
def __init__(self, message_type, message):
|
| ... | ... | @@ -118,7 +132,6 @@ class Job(): |
| 118 | 132 |
self._max_retries = max_retries # Maximum number of automatic retries
|
| 119 | 133 |
self._result = None # Return value of child action in the parent
|
| 120 | 134 |
self._tries = 0 # Try count, for retryable jobs
|
| 121 |
- self._skipped_flag = False # Indicate whether the job was skipped.
|
|
| 122 | 135 |
self._terminated = False # Whether this job has been explicitly terminated
|
| 123 | 136 |
|
| 124 | 137 |
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
|
| ... | ... | @@ -215,17 +228,10 @@ class Job(): |
| 215 | 228 |
# Forcefully kill the process, and any children it might have.
|
| 216 | 229 |
#
|
| 217 | 230 |
def kill(self):
|
| 218 |
- |
|
| 219 | 231 |
# Force kill
|
| 220 | 232 |
self.message(MessageType.WARN,
|
| 221 | 233 |
"{} did not terminate gracefully, killing".format(self.action_name))
|
| 222 |
- |
|
| 223 |
- try:
|
|
| 224 |
- utils._kill_process_tree(self._process.pid)
|
|
| 225 |
- # This can happen if the process died of its own accord before
|
|
| 226 |
- # we try to kill it
|
|
| 227 |
- except psutil.NoSuchProcess:
|
|
| 228 |
- return
|
|
| 234 |
+ utils._kill_process_tree(self._process.pid)
|
|
| 229 | 235 |
|
| 230 | 236 |
# suspend()
|
| 231 | 237 |
#
|
| ... | ... | @@ -282,18 +288,6 @@ class Job(): |
| 282 | 288 |
def set_task_id(self, task_id):
|
| 283 | 289 |
self._task_id = task_id
|
| 284 | 290 |
|
| 285 |
- # skipped
|
|
| 286 |
- #
|
|
| 287 |
- # This will evaluate to True if the job was skipped
|
|
| 288 |
- # during processing, or if it was forcefully terminated.
|
|
| 289 |
- #
|
|
| 290 |
- # Returns:
|
|
| 291 |
- # (bool): Whether the job should appear as skipped
|
|
| 292 |
- #
|
|
| 293 |
- @property
|
|
| 294 |
- def skipped(self):
|
|
| 295 |
- return self._skipped_flag or self._terminated
|
|
| 296 |
- |
|
| 297 | 291 |
#######################################################
|
| 298 | 292 |
# Abstract Methods #
|
| 299 | 293 |
#######################################################
|
| ... | ... | @@ -304,10 +298,10 @@ class Job(): |
| 304 | 298 |
# pass the result to the main thread.
|
| 305 | 299 |
#
|
| 306 | 300 |
# Args:
|
| 307 |
- # success (bool): Whether the job was successful.
|
|
| 301 |
+ # status (JobStatus): The job exit status
|
|
| 308 | 302 |
# result (any): The result returned by child_process().
|
| 309 | 303 |
#
|
| 310 |
- def parent_complete(self, success, result):
|
|
| 304 |
+ def parent_complete(self, status, result):
|
|
| 311 | 305 |
raise ImplError("Job '{kind}' does not implement parent_complete()"
|
| 312 | 306 |
.format(kind=type(self).__name__))
|
| 313 | 307 |
|
| ... | ... | @@ -571,16 +565,23 @@ class Job(): |
| 571 | 565 |
#
|
| 572 | 566 |
self._retry_flag = returncode == RC_FAIL
|
| 573 | 567 |
|
| 574 |
- # Set the flag to alert Queue that this job skipped.
|
|
| 575 |
- self._skipped_flag = returncode == RC_SKIPPED
|
|
| 576 |
- |
|
| 577 | 568 |
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
|
| 578 | 569 |
self.spawn()
|
| 579 | 570 |
return
|
| 580 | 571 |
|
| 581 |
- success = returncode in (RC_OK, RC_SKIPPED)
|
|
| 582 |
- self.parent_complete(success, self._result)
|
|
| 583 |
- self._scheduler.job_completed(self, success)
|
|
| 572 |
+ # Resolve the outward facing overall job completion status
|
|
| 573 |
+ #
|
|
| 574 |
+ if returncode == RC_OK:
|
|
| 575 |
+ status = JobStatus.OK
|
|
| 576 |
+ elif returncode == RC_SKIPPED:
|
|
| 577 |
+ status = JobStatus.SKIPPED
|
|
| 578 |
+ elif returncode in (RC_FAIL, RC_PERM_FAIL):
|
|
| 579 |
+ status = JobStatus.FAIL
|
|
| 580 |
+ else:
|
|
| 581 |
+ status = JobStatus.FAIL
|
|
| 582 |
+ |
|
| 583 |
+ self.parent_complete(status, self._result)
|
|
| 584 |
+ self._scheduler.job_completed(self, status)
|
|
| 584 | 585 |
|
| 585 | 586 |
# Force the deletion of the queue and process objects to try and clean up FDs
|
| 586 | 587 |
self._queue = self._process = None
|
| ... | ... | @@ -21,7 +21,7 @@ |
| 21 | 21 |
from datetime import timedelta
|
| 22 | 22 |
|
| 23 | 23 |
from . import Queue, QueueStatus
|
| 24 |
-from ..jobs import ElementJob
|
|
| 24 |
+from ..jobs import ElementJob, JobStatus
|
|
| 25 | 25 |
from ..resources import ResourceType
|
| 26 | 26 |
from ..._message import MessageType
|
| 27 | 27 |
|
| ... | ... | @@ -104,7 +104,7 @@ class BuildQueue(Queue): |
| 104 | 104 |
if artifacts.has_quota_exceeded():
|
| 105 | 105 |
self._scheduler.check_cache_size()
|
| 106 | 106 |
|
| 107 |
- def done(self, job, element, result, success):
|
|
| 107 |
+ def done(self, job, element, result, status):
|
|
| 108 | 108 |
|
| 109 | 109 |
# Inform element in main process that assembly is done
|
| 110 | 110 |
element._assemble_done()
|
| ... | ... | @@ -117,5 +117,5 @@ class BuildQueue(Queue): |
| 117 | 117 |
# artifact cache size for a successful build even though we know a
|
| 118 | 118 |
# failed build also grows the artifact cache size.
|
| 119 | 119 |
#
|
| 120 |
- if success:
|
|
| 120 |
+ if status == JobStatus.OK:
|
|
| 121 | 121 |
self._check_cache_size(job, element, result)
|
| ... | ... | @@ -24,6 +24,7 @@ from ... import Consistency |
| 24 | 24 |
# Local imports
|
| 25 | 25 |
from . import Queue, QueueStatus
|
| 26 | 26 |
from ..resources import ResourceType
|
| 27 |
+from ..jobs import JobStatus
|
|
| 27 | 28 |
|
| 28 | 29 |
|
| 29 | 30 |
# A queue which fetches element sources
|
| ... | ... | @@ -66,9 +67,9 @@ class FetchQueue(Queue): |
| 66 | 67 |
|
| 67 | 68 |
return QueueStatus.READY
|
| 68 | 69 |
|
| 69 |
- def done(self, _, element, result, success):
|
|
| 70 |
+ def done(self, _, element, result, status):
|
|
| 70 | 71 |
|
| 71 |
- if not success:
|
|
| 72 |
+ if status == JobStatus.FAIL:
|
|
| 72 | 73 |
return
|
| 73 | 74 |
|
| 74 | 75 |
element._update_state()
|
| ... | ... | @@ -21,6 +21,7 @@ |
| 21 | 21 |
# Local imports
|
| 22 | 22 |
from . import Queue, QueueStatus
|
| 23 | 23 |
from ..resources import ResourceType
|
| 24 |
+from ..jobs import JobStatus
|
|
| 24 | 25 |
from ..._exceptions import SkipJob
|
| 25 | 26 |
|
| 26 | 27 |
|
| ... | ... | @@ -54,9 +55,9 @@ class PullQueue(Queue): |
| 54 | 55 |
else:
|
| 55 | 56 |
return QueueStatus.SKIP
|
| 56 | 57 |
|
| 57 |
- def done(self, _, element, result, success):
|
|
| 58 |
+ def done(self, _, element, result, status):
|
|
| 58 | 59 |
|
| 59 |
- if not success:
|
|
| 60 |
+ if status == JobStatus.FAIL:
|
|
| 60 | 61 |
return
|
| 61 | 62 |
|
| 62 | 63 |
element._pull_done()
|
| ... | ... | @@ -64,4 +65,5 @@ class PullQueue(Queue): |
| 64 | 65 |
# Build jobs will check the "approximate" size first. Since we
|
| 65 | 66 |
# do not get an artifact size from pull jobs, we have to
|
| 66 | 67 |
# actually check the cache size.
|
| 67 |
- self._scheduler.check_cache_size()
|
|
| 68 |
+ if status == JobStatus.OK:
|
|
| 69 |
+ self._scheduler.check_cache_size()
|
| ... | ... | @@ -25,7 +25,7 @@ from enum import Enum |
| 25 | 25 |
import traceback
|
| 26 | 26 |
|
| 27 | 27 |
# Local imports
|
| 28 |
-from ..jobs import ElementJob
|
|
| 28 |
+from ..jobs import ElementJob, JobStatus
|
|
| 29 | 29 |
from ..resources import ResourceType
|
| 30 | 30 |
|
| 31 | 31 |
# BuildStream toplevel imports
|
| ... | ... | @@ -133,10 +133,9 @@ class Queue(): |
| 133 | 133 |
# job (Job): The job which completed processing
|
| 134 | 134 |
# element (Element): The element which completed processing
|
| 135 | 135 |
# result (any): The return value of the process() implementation
|
| 136 |
- # success (bool): True if the process() implementation did not
|
|
| 137 |
- # raise any exception
|
|
| 136 |
+ # status (JobStatus): The return status of the Job
|
|
| 138 | 137 |
#
|
| 139 |
- def done(self, job, element, result, success):
|
|
| 138 |
+ def done(self, job, element, result, status):
|
|
| 140 | 139 |
pass
|
| 141 | 140 |
|
| 142 | 141 |
#####################################################
|
| ... | ... | @@ -291,7 +290,7 @@ class Queue(): |
| 291 | 290 |
#
|
| 292 | 291 |
# See the Job object for an explanation of the call signature
|
| 293 | 292 |
#
|
| 294 |
- def _job_done(self, job, element, success, result):
|
|
| 293 |
+ def _job_done(self, job, element, status, result):
|
|
| 295 | 294 |
|
| 296 | 295 |
# Update values that need to be synchronized in the main task
|
| 297 | 296 |
# before calling any queue implementation
|
| ... | ... | @@ -301,7 +300,7 @@ class Queue(): |
| 301 | 300 |
# and determine if it should be considered as processed
|
| 302 | 301 |
# or skipped.
|
| 303 | 302 |
try:
|
| 304 |
- self.done(job, element, result, success)
|
|
| 303 |
+ self.done(job, element, result, status)
|
|
| 305 | 304 |
except BstError as e:
|
| 306 | 305 |
|
| 307 | 306 |
# Report error and mark as failed
|
| ... | ... | @@ -332,12 +331,10 @@ class Queue(): |
| 332 | 331 |
# All jobs get placed on the done queue for later processing.
|
| 333 | 332 |
self._done_queue.append(job)
|
| 334 | 333 |
|
| 335 |
- # A Job can be skipped whether or not it has failed,
|
|
| 336 |
- # we want to only bookkeep them as processed or failed
|
|
| 337 |
- # if they are not skipped.
|
|
| 338 |
- if job.skipped:
|
|
| 334 |
+ # These lists are for bookkeeping purposes for the UI and logging.
|
|
| 335 |
+ if status == JobStatus.SKIPPED:
|
|
| 339 | 336 |
self.skipped_elements.append(element)
|
| 340 |
- elif success:
|
|
| 337 |
+ elif status == JobStatus.OK:
|
|
| 341 | 338 |
self.processed_elements.append(element)
|
| 342 | 339 |
else:
|
| 343 | 340 |
self.failed_elements.append(element)
|
| ... | ... | @@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup |
| 24 | 24 |
# Local imports
|
| 25 | 25 |
from . import Queue, QueueStatus
|
| 26 | 26 |
from ..resources import ResourceType
|
| 27 |
+from ..jobs import JobStatus
|
|
| 27 | 28 |
|
| 28 | 29 |
|
| 29 | 30 |
# A queue which tracks sources
|
| ... | ... | @@ -47,9 +48,9 @@ class TrackQueue(Queue): |
| 47 | 48 |
|
| 48 | 49 |
return QueueStatus.READY
|
| 49 | 50 |
|
| 50 |
- def done(self, _, element, result, success):
|
|
| 51 |
+ def done(self, _, element, result, status):
|
|
| 51 | 52 |
|
| 52 |
- if not success:
|
|
| 53 |
+ if status == JobStatus.FAIL:
|
|
| 53 | 54 |
return
|
| 54 | 55 |
|
| 55 | 56 |
# Set the new refs in the main process one by one as they complete
|
| ... | ... | @@ -38,6 +38,16 @@ class SchedStatus(): |
| 38 | 38 |
TERMINATED = 1
|
| 39 | 39 |
|
| 40 | 40 |
|
| 41 |
+# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
|
|
| 42 |
+# which we launch dynamically, they have the property of being
|
|
| 43 |
+# meaningless to queue if one is already queued, and it also
|
|
| 44 |
+# doesnt make sense to run them in parallel
|
|
| 45 |
+#
|
|
| 46 |
+_ACTION_NAME_CLEANUP = 'cleanup'
|
|
| 47 |
+_ACTION_NAME_CACHE_SIZE = 'cache_size'
|
|
| 48 |
+_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
|
|
| 49 |
+ |
|
| 50 |
+ |
|
| 41 | 51 |
# Scheduler()
|
| 42 | 52 |
#
|
| 43 | 53 |
# The scheduler operates on a list queues, each of which is meant to accomplish
|
| ... | ... | @@ -94,6 +104,15 @@ class Scheduler(): |
| 94 | 104 |
self._suspendtime = None
|
| 95 | 105 |
self._queue_jobs = True # Whether we should continue to queue jobs
|
| 96 | 106 |
|
| 107 |
+ # Whether our exclusive jobs, like 'cleanup' are currently already
|
|
| 108 |
+ # waiting or active.
|
|
| 109 |
+ #
|
|
| 110 |
+ # This is just a bit quicker than scanning the wait queue and active
|
|
| 111 |
+ # queue and comparing job action names.
|
|
| 112 |
+ #
|
|
| 113 |
+ self._exclusive_waiting = set()
|
|
| 114 |
+ self._exclusive_active = set()
|
|
| 115 |
+ |
|
| 97 | 116 |
self._resources = Resources(context.sched_builders,
|
| 98 | 117 |
context.sched_fetchers,
|
| 99 | 118 |
context.sched_pushers)
|
| ... | ... | @@ -211,19 +230,6 @@ class Scheduler(): |
| 211 | 230 |
starttime = timenow
|
| 212 | 231 |
return timenow - starttime
|
| 213 | 232 |
|
| 214 |
- # schedule_jobs()
|
|
| 215 |
- #
|
|
| 216 |
- # Args:
|
|
| 217 |
- # jobs ([Job]): A list of jobs to schedule
|
|
| 218 |
- #
|
|
| 219 |
- # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
|
|
| 220 |
- # run as soon any other queueing jobs finish, provided sufficient
|
|
| 221 |
- # resources are available for them to run
|
|
| 222 |
- #
|
|
| 223 |
- def schedule_jobs(self, jobs):
|
|
| 224 |
- for job in jobs:
|
|
| 225 |
- self.waiting_jobs.append(job)
|
|
| 226 |
- |
|
| 227 | 233 |
# job_completed():
|
| 228 | 234 |
#
|
| 229 | 235 |
# Called when a Job completes
|
| ... | ... | @@ -231,12 +237,14 @@ class Scheduler(): |
| 231 | 237 |
# Args:
|
| 232 | 238 |
# queue (Queue): The Queue holding a complete job
|
| 233 | 239 |
# job (Job): The completed Job
|
| 234 |
- # success (bool): Whether the Job completed with a success status
|
|
| 240 |
+ # status (JobStatus): The status of the completed job
|
|
| 235 | 241 |
#
|
| 236 |
- def job_completed(self, job, success):
|
|
| 242 |
+ def job_completed(self, job, status):
|
|
| 237 | 243 |
self._resources.clear_job_resources(job)
|
| 238 | 244 |
self.active_jobs.remove(job)
|
| 239 |
- self._job_complete_callback(job, success)
|
|
| 245 |
+ if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
|
| 246 |
+ self._exclusive_active.remove(job.action_name)
|
|
| 247 |
+ self._job_complete_callback(job, status)
|
|
| 240 | 248 |
self._schedule_queue_jobs()
|
| 241 | 249 |
self._sched()
|
| 242 | 250 |
|
| ... | ... | @@ -246,18 +254,13 @@ class Scheduler(): |
| 246 | 254 |
# size is calculated, a cleanup job will be run automatically
|
| 247 | 255 |
# if needed.
|
| 248 | 256 |
#
|
| 249 |
- # FIXME: This should ensure that only one cache size job
|
|
| 250 |
- # is ever pending at a given time. If a cache size
|
|
| 251 |
- # job is already running, it is correct to queue
|
|
| 252 |
- # a new one, it is incorrect to have more than one
|
|
| 253 |
- # of these jobs pending at a given time, though.
|
|
| 254 |
- #
|
|
| 255 | 257 |
def check_cache_size(self):
|
| 256 |
- job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
|
|
| 258 |
+ job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
|
|
| 259 |
+ 'cache_size/cache_size',
|
|
| 257 | 260 |
resources=[ResourceType.CACHE,
|
| 258 | 261 |
ResourceType.PROCESS],
|
| 259 | 262 |
complete_cb=self._run_cleanup)
|
| 260 |
- self.schedule_jobs([job])
|
|
| 263 |
+ self._schedule_jobs([job])
|
|
| 261 | 264 |
|
| 262 | 265 |
#######################################################
|
| 263 | 266 |
# Local Private Methods #
|
| ... | ... | @@ -276,10 +279,19 @@ class Scheduler(): |
| 276 | 279 |
if not self._resources.reserve_job_resources(job):
|
| 277 | 280 |
continue
|
| 278 | 281 |
|
| 282 |
+ # Postpone these jobs if one is already running
|
|
| 283 |
+ if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
|
|
| 284 |
+ job.action_name in self._exclusive_active:
|
|
| 285 |
+ continue
|
|
| 286 |
+ |
|
| 279 | 287 |
job.spawn()
|
| 280 | 288 |
self.waiting_jobs.remove(job)
|
| 281 | 289 |
self.active_jobs.append(job)
|
| 282 | 290 |
|
| 291 |
+ if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
|
| 292 |
+ self._exclusive_waiting.remove(job.action_name)
|
|
| 293 |
+ self._exclusive_active.add(job.action_name)
|
|
| 294 |
+ |
|
| 283 | 295 |
if self._job_start_callback:
|
| 284 | 296 |
self._job_start_callback(job)
|
| 285 | 297 |
|
| ... | ... | @@ -287,6 +299,33 @@ class Scheduler(): |
| 287 | 299 |
if not self.active_jobs and not self.waiting_jobs:
|
| 288 | 300 |
self.loop.stop()
|
| 289 | 301 |
|
| 302 |
+ # _schedule_jobs()
|
|
| 303 |
+ #
|
|
| 304 |
+ # The main entry point for jobs to be scheduled.
|
|
| 305 |
+ #
|
|
| 306 |
+ # This is called either as a result of scanning the queues
|
|
| 307 |
+ # in _schedule_queue_jobs(), or directly by the Scheduler
|
|
| 308 |
+ # to insert special jobs like cleanups.
|
|
| 309 |
+ #
|
|
| 310 |
+ # Args:
|
|
| 311 |
+ # jobs ([Job]): A list of jobs to schedule
|
|
| 312 |
+ #
|
|
| 313 |
+ def _schedule_jobs(self, jobs):
|
|
| 314 |
+ for job in jobs:
|
|
| 315 |
+ |
|
| 316 |
+ # Special treatment of our redundant exclusive jobs
|
|
| 317 |
+ #
|
|
| 318 |
+ if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
|
| 319 |
+ |
|
| 320 |
+ # Drop the job if one is already queued
|
|
| 321 |
+ if job.action_name in self._exclusive_waiting:
|
|
| 322 |
+ continue
|
|
| 323 |
+ |
|
| 324 |
+ # Mark this action type as queued
|
|
| 325 |
+ self._exclusive_waiting.add(job.action_name)
|
|
| 326 |
+ |
|
| 327 |
+ self.waiting_jobs.append(job)
|
|
| 328 |
+ |
|
| 290 | 329 |
# _schedule_queue_jobs()
|
| 291 | 330 |
#
|
| 292 | 331 |
# Ask the queues what jobs they want to schedule and schedule
|
| ... | ... | @@ -331,7 +370,7 @@ class Scheduler(): |
| 331 | 370 |
# the next queue and process them.
|
| 332 | 371 |
process_queues = any(q.dequeue_ready() for q in self.queues)
|
| 333 | 372 |
|
| 334 |
- self.schedule_jobs(ready)
|
|
| 373 |
+ self._schedule_jobs(ready)
|
|
| 335 | 374 |
self._sched()
|
| 336 | 375 |
|
| 337 | 376 |
# _run_cleanup()
|
| ... | ... | @@ -353,11 +392,11 @@ class Scheduler(): |
| 353 | 392 |
if not artifacts.has_quota_exceeded():
|
| 354 | 393 |
return
|
| 355 | 394 |
|
| 356 |
- job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
|
|
| 395 |
+ job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
|
|
| 357 | 396 |
resources=[ResourceType.CACHE,
|
| 358 | 397 |
ResourceType.PROCESS],
|
| 359 | 398 |
exclusive_resources=[ResourceType.CACHE])
|
| 360 |
- self.schedule_jobs([job])
|
|
| 399 |
+ self._schedule_jobs([job])
|
|
| 361 | 400 |
|
| 362 | 401 |
# _suspend_jobs()
|
| 363 | 402 |
#
|
| ... | ... | @@ -65,7 +65,7 @@ Miscellaneous abstract methods also exist: |
| 65 | 65 |
|
| 66 | 66 |
* :func:`Element.generate_script() <buildstream.element.Element.generate_script>`
|
| 67 | 67 |
|
| 68 |
- For the purpose of ``bst source bundle``, an Element may optionally implement this.
|
|
| 68 |
+ For the purpose of ``bst source checkout --include-build-scripts``, an Element may optionally implement this.
|
|
| 69 | 69 |
|
| 70 | 70 |
|
| 71 | 71 |
Class Reference
|
| ... | ... | @@ -1800,13 +1800,19 @@ class Element(Plugin): |
| 1800 | 1800 |
# (bool): True if this element does not need a push job to be created
|
| 1801 | 1801 |
#
|
| 1802 | 1802 |
def _skip_push(self):
|
| 1803 |
+ |
|
| 1803 | 1804 |
if not self.__artifacts.has_push_remotes(element=self):
|
| 1804 | 1805 |
# No push remotes for this element's project
|
| 1805 | 1806 |
return True
|
| 1806 | 1807 |
|
| 1807 | 1808 |
# Do not push elements that aren't cached, or that are cached with a dangling buildtree
|
| 1808 |
- # artifact unless element type is expected to have an an empty buildtree directory
|
|
| 1809 |
- if not self._cached_buildtree():
|
|
| 1809 |
+ # artifact unless element type is expected to have an an empty buildtree directory. Check
|
|
| 1810 |
+ # that this default behaviour is not overriden via a remote configured to allow pushing
|
|
| 1811 |
+ # artifacts without their corresponding buildtree.
|
|
| 1812 |
+ if not self._cached():
|
|
| 1813 |
+ return True
|
|
| 1814 |
+ |
|
| 1815 |
+ if not self._cached_buildtree() and not self.__artifacts.has_partial_push_remotes(element=self):
|
|
| 1810 | 1816 |
return True
|
| 1811 | 1817 |
|
| 1812 | 1818 |
# Do not push tainted artifact
|
| ... | ... | @@ -1817,7 +1823,8 @@ class Element(Plugin): |
| 1817 | 1823 |
|
| 1818 | 1824 |
# _push():
|
| 1819 | 1825 |
#
|
| 1820 |
- # Push locally cached artifact to remote artifact repository.
|
|
| 1826 |
+ # Push locally cached artifact to remote artifact repository. An attempt
|
|
| 1827 |
+ # will be made to push partial artifacts given current config
|
|
| 1821 | 1828 |
#
|
| 1822 | 1829 |
# Returns:
|
| 1823 | 1830 |
# (bool): True if the remote was updated, False if it already existed
|
| ... | ... | @@ -1830,8 +1837,19 @@ class Element(Plugin): |
| 1830 | 1837 |
self.warn("Not pushing tainted artifact.")
|
| 1831 | 1838 |
return False
|
| 1832 | 1839 |
|
| 1833 |
- # Push all keys used for local commit
|
|
| 1834 |
- pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
|
|
| 1840 |
+ # Push all keys used for local commit, this could be full or partial,
|
|
| 1841 |
+ # given previous _skip_push() logic. If buildtree isn't cached, then
|
|
| 1842 |
+ # set partial push
|
|
| 1843 |
+ |
|
| 1844 |
+ partial = False
|
|
| 1845 |
+ subdir = 'buildtree'
|
|
| 1846 |
+ if not self._cached_buildtree():
|
|
| 1847 |
+ partial = True
|
|
| 1848 |
+ |
|
| 1849 |
+ pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit(), partial=partial, subdir=subdir)
|
|
| 1850 |
+ |
|
| 1851 |
+ # Artifact might be cached in the server partially with the top level ref existing.
|
|
| 1852 |
+ # Check if we need to attempt a push of a locally cached buildtree given current config
|
|
| 1835 | 1853 |
if not pushed:
|
| 1836 | 1854 |
return False
|
| 1837 | 1855 |
|
| ... | ... | @@ -592,7 +592,7 @@ class _SandboxBatch(): |
| 592 | 592 |
if command.label:
|
| 593 | 593 |
context = self.sandbox._get_context()
|
| 594 | 594 |
message = Message(self.sandbox._get_plugin_id(), MessageType.STATUS,
|
| 595 |
- 'Running {}'.format(command.label))
|
|
| 595 |
+ 'Running command', detail=command.label)
|
|
| 596 | 596 |
context.message(message)
|
| 597 | 597 |
|
| 598 | 598 |
exitcode = self.sandbox._run(command.command, self.flags, cwd=command.cwd, env=command.env)
|
| ... | ... | @@ -1050,6 +1050,11 @@ def _kill_process_tree(pid): |
| 1050 | 1050 |
# Ignore this error, it can happen with
|
| 1051 | 1051 |
# some setuid bwrap processes.
|
| 1052 | 1052 |
pass
|
| 1053 |
+ except psutil.NoSuchProcess:
|
|
| 1054 |
+ # It is certain that this has already been sent
|
|
| 1055 |
+ # SIGTERM, so there is a window where the process
|
|
| 1056 |
+ # could have exited already.
|
|
| 1057 |
+ pass
|
|
| 1053 | 1058 |
|
| 1054 | 1059 |
# Bloody Murder
|
| 1055 | 1060 |
for child in children:
|
| 1 |
+import os
|
|
| 2 |
+import shutil
|
|
| 3 |
+import pytest
|
|
| 4 |
+ |
|
| 5 |
+from tests.testutils import cli_integration as cli, create_artifact_share
|
|
| 6 |
+from tests.testutils.integration import assert_contains
|
|
| 7 |
+from tests.testutils.site import HAVE_BWRAP, IS_LINUX
|
|
| 8 |
+from buildstream._exceptions import ErrorDomain, LoadErrorReason
|
|
| 9 |
+ |
|
| 10 |
+ |
|
| 11 |
+DATA_DIR = os.path.join(
|
|
| 12 |
+ os.path.dirname(os.path.realpath(__file__)),
|
|
| 13 |
+ "project"
|
|
| 14 |
+)
|
|
| 15 |
+ |
|
| 16 |
+ |
|
| 17 |
+# Remove artifact cache & set cli.config value of pull-buildtrees
|
|
| 18 |
+# to false, which is the default user context. The cache has to be
|
|
| 19 |
+# cleared as just forcefully removing the refpath leaves dangling objects.
|
|
| 20 |
+def default_state(cli, tmpdir, share):
|
|
| 21 |
+ shutil.rmtree(os.path.join(str(tmpdir), 'artifacts'))
|
|
| 22 |
+ cli.configure({
|
|
| 23 |
+ 'artifacts': {'url': share.repo, 'push': False},
|
|
| 24 |
+ 'artifactdir': os.path.join(str(tmpdir), 'artifacts'),
|
|
| 25 |
+ 'cache': {'pull-buildtrees': False},
|
|
| 26 |
+ })
|
|
| 27 |
+ |
|
| 28 |
+ |
|
| 29 |
+# Tests to capture the integration of the optionl push of buildtrees.
|
|
| 30 |
+# The behaviour should encompass pushing artifacts that are already cached
|
|
| 31 |
+# without a buildtree as well as artifacts that are cached with their buildtree.
|
|
| 32 |
+# This option is handled via 'allow-partial-push' on a per artifact remote config
|
|
| 33 |
+# node basis. Multiple remote config nodes can point to the same url and as such can
|
|
| 34 |
+# have different 'allow-partial-push' options, tests need to cover this using project
|
|
| 35 |
+# confs.
|
|
| 36 |
+@pytest.mark.integration
|
|
| 37 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
| 38 |
+@pytest.mark.skipif(IS_LINUX and not HAVE_BWRAP, reason='Only available with bubblewrap on Linux')
|
|
| 39 |
+def test_pushbuildtrees(cli, tmpdir, datafiles, integration_cache):
|
|
| 40 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
| 41 |
+ element_name = 'autotools/amhello.bst'
|
|
| 42 |
+ |
|
| 43 |
+ # Create artifact shares for pull & push testing
|
|
| 44 |
+ with create_artifact_share(os.path.join(str(tmpdir), 'share1')) as share1,\
|
|
| 45 |
+ create_artifact_share(os.path.join(str(tmpdir), 'share2')) as share2,\
|
|
| 46 |
+ create_artifact_share(os.path.join(str(tmpdir), 'share3')) as share3,\
|
|
| 47 |
+ create_artifact_share(os.path.join(str(tmpdir), 'share4')) as share4:
|
|
| 48 |
+ |
|
| 49 |
+ cli.configure({
|
|
| 50 |
+ 'artifacts': {'url': share1.repo, 'push': True},
|
|
| 51 |
+ 'artifactdir': os.path.join(str(tmpdir), 'artifacts')
|
|
| 52 |
+ })
|
|
| 53 |
+ |
|
| 54 |
+ cli.configure({'artifacts': [{'url': share1.repo, 'push': True},
|
|
| 55 |
+ {'url': share2.repo, 'push': True, 'allow-partial-push': True}]})
|
|
| 56 |
+ |
|
| 57 |
+ # Build autotools element, checked pushed, delete local.
|
|
| 58 |
+ # As share 2 has push & allow-partial-push set a true, it
|
|
| 59 |
+ # should have pushed the artifacts, without the cached buildtrees,
|
|
| 60 |
+ # to it.
|
|
| 61 |
+ result = cli.run(project=project, args=['build', element_name])
|
|
| 62 |
+ assert result.exit_code == 0
|
|
| 63 |
+ assert cli.get_element_state(project, element_name) == 'cached'
|
|
| 64 |
+ elementdigest = share1.has_artifact('test', element_name, cli.get_element_key(project, element_name))
|
|
| 65 |
+ buildtreedir = os.path.join(str(tmpdir), 'artifacts', 'extract', 'test', 'autotools-amhello',
|
|
| 66 |
+ elementdigest.hash, 'buildtree')
|
|
| 67 |
+ assert os.path.isdir(buildtreedir)
|
|
| 68 |
+ assert element_name in result.get_partial_pushed_elements()
|
|
| 69 |
+ assert element_name in result.get_pushed_elements()
|
|
| 70 |
+ assert share1.has_artifact('test', element_name, cli.get_element_key(project, element_name))
|
|
| 71 |
+ assert share2.has_artifact('test', element_name, cli.get_element_key(project, element_name))
|
|
| 72 |
+ default_state(cli, tmpdir, share1)
|
|
| 73 |
+ |
|
| 74 |
+ # Check that after explictly pulling an artifact without it's buildtree,
|
|
| 75 |
+ # we can push it to another remote that is configured to accept the partial
|
|
| 76 |
+ # artifact
|
|
| 77 |
+ result = cli.run(project=project, args=['pull', element_name])
|
|
| 78 |
+ assert element_name in result.get_pulled_elements()
|
|
| 79 |
+ cli.configure({'artifacts': {'url': share3.repo, 'push': True, 'allow-partial-push': True}})
|
|
| 80 |
+ assert cli.get_element_state(project, element_name) == 'cached'
|
|
| 81 |
+ assert not os.path.isdir(buildtreedir)
|
|
| 82 |
+ result = cli.run(project=project, args=['push', element_name])
|
|
| 83 |
+ assert result.exit_code == 0
|
|
| 84 |
+ assert element_name in result.get_partial_pushed_elements()
|
|
| 85 |
+ assert element_name not in result.get_pushed_elements()
|
|
| 86 |
+ assert share3.has_artifact('test', element_name, cli.get_element_key(project, element_name))
|
|
| 87 |
+ default_state(cli, tmpdir, share3)
|
|
| 88 |
+ |
|
| 89 |
+ # Delete the local cache and pull the partial artifact from share 3,
|
|
| 90 |
+ # this should not include the buildtree when extracted locally, even when
|
|
| 91 |
+ # pull-buildtrees is given as a cli parameter as no available remotes will
|
|
| 92 |
+ # contain the buildtree
|
|
| 93 |
+ assert not os.path.isdir(buildtreedir)
|
|
| 94 |
+ assert cli.get_element_state(project, element_name) != 'cached'
|
|
| 95 |
+ result = cli.run(project=project, args=['--pull-buildtrees', 'pull', element_name])
|
|
| 96 |
+ assert element_name in result.get_partial_pulled_elements()
|
|
| 97 |
+ assert not os.path.isdir(buildtreedir)
|
|
| 98 |
+ default_state(cli, tmpdir, share3)
|
|
| 99 |
+ |
|
| 100 |
+ # Delete the local cache and attempt to pull a 'full' artifact, including its
|
|
| 101 |
+ # buildtree. As with before share3 being the first listed remote will not have
|
|
| 102 |
+ # the buildtree available and should spawn a partial pull. Having share1 as the
|
|
| 103 |
+ # second available remote should allow the buildtree to be pulled thus 'completing'
|
|
| 104 |
+ # the artifact
|
|
| 105 |
+ cli.configure({'artifacts': [{'url': share3.repo, 'push': True, 'allow-partial-push': True},
|
|
| 106 |
+ {'url': share1.repo, 'push': True}]})
|
|
| 107 |
+ assert cli.get_element_state(project, element_name) != 'cached'
|
|
| 108 |
+ result = cli.run(project=project, args=['--pull-buildtrees', 'pull', element_name])
|
|
| 109 |
+ assert element_name in result.get_partial_pulled_elements()
|
|
| 110 |
+ assert element_name in result.get_pulled_elements()
|
|
| 111 |
+ assert "Attempting to retrieve buildtree from remotes" in result.stderr
|
|
| 112 |
+ assert os.path.isdir(buildtreedir)
|
|
| 113 |
+ assert cli.get_element_state(project, element_name) == 'cached'
|
|
| 114 |
+ |
|
| 115 |
+ # Test that we are able to 'complete' an artifact on a server which is cached partially,
|
|
| 116 |
+ # but has now been configured for full artifact pushing. This should require only pushing
|
|
| 117 |
+ # the missing blobs, which should be those of just the buildtree. In this case changing
|
|
| 118 |
+ # share3 to full pushes should exercise this
|
|
| 119 |
+ cli.configure({'artifacts': {'url': share3.repo, 'push': True}})
|
|
| 120 |
+ result = cli.run(project=project, args=['push', element_name])
|
|
| 121 |
+ assert element_name in result.get_pushed_elements()
|
| ... | ... | @@ -191,6 +191,13 @@ class Result(): |
| 191 | 191 |
|
| 192 | 192 |
return list(pushed)
|
| 193 | 193 |
|
| 194 |
+ def get_partial_pushed_elements(self):
|
|
| 195 |
+ pushed = re.findall(r'\[\s*push:(\S+)\s*\]\s*INFO\s*Pushed partial artifact', self.stderr)
|
|
| 196 |
+ if pushed is None:
|
|
| 197 |
+ return []
|
|
| 198 |
+ |
|
| 199 |
+ return list(pushed)
|
|
| 200 |
+ |
|
| 194 | 201 |
def get_pulled_elements(self):
|
| 195 | 202 |
pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled artifact', self.stderr)
|
| 196 | 203 |
if pulled is None:
|
| ... | ... | @@ -198,6 +205,13 @@ class Result(): |
| 198 | 205 |
|
| 199 | 206 |
return list(pulled)
|
| 200 | 207 |
|
| 208 |
+ def get_partial_pulled_elements(self):
|
|
| 209 |
+ pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled partial artifact', self.stderr)
|
|
| 210 |
+ if pulled is None:
|
|
| 211 |
+ return []
|
|
| 212 |
+ |
|
| 213 |
+ return list(pulled)
|
|
| 214 |
+ |
|
| 201 | 215 |
|
| 202 | 216 |
class Cli():
|
| 203 | 217 |
|
