[Notes] [Git][BuildStream/buildstream][tpollard/566] 12 commits: Fix stack traces discovered with ^C forceful termination.



Title: GitLab

Tom Pollard pushed to branch tpollard/566 at BuildStream / buildstream

Commits:

20 changed files:

Changes:

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -74,6 +74,7 @@ class ArtifactCache():
    74 74
     
    
    75 75
             self._has_fetch_remotes = False
    
    76 76
             self._has_push_remotes = False
    
    77
    +        self._has_partial_push_remotes = False
    
    77 78
     
    
    78 79
             os.makedirs(self.extractdir, exist_ok=True)
    
    79 80
     
    
    ... ... @@ -398,6 +399,8 @@ class ArtifactCache():
    398 399
                     self._has_fetch_remotes = True
    
    399 400
                     if remote_spec.push:
    
    400 401
                         self._has_push_remotes = True
    
    402
    +                    if remote_spec.partial_push:
    
    403
    +                        self._has_partial_push_remotes = True
    
    401 404
     
    
    402 405
                     remotes[remote_spec.url] = CASRemote(remote_spec)
    
    403 406
     
    
    ... ... @@ -596,6 +599,31 @@ class ArtifactCache():
    596 599
                 remotes_for_project = self._remotes[element._get_project()]
    
    597 600
                 return any(remote.spec.push for remote in remotes_for_project)
    
    598 601
     
    
    602
    +    # has_partial_push_remotes():
    
    603
    +    #
    
    604
    +    # Check whether any remote repositories are available for pushing
    
    605
    +    # non-complete artifacts
    
    606
    +    #
    
    607
    +    # Args:
    
    608
    +    #     element (Element): The Element to check
    
    609
    +    #
    
    610
    +    # Returns:
    
    611
    +    #   (bool): True if any remote repository is configured for optional
    
    612
    +    #            partial pushes, False otherwise
    
    613
    +    #
    
    614
    +    def has_partial_push_remotes(self, *, element=None):
    
    615
    +        # If there's no partial push remotes available, we can't partial push at all
    
    616
    +        if not self._has_partial_push_remotes:
    
    617
    +            return False
    
    618
    +        elif element is None:
    
    619
    +            # At least one remote is set to allow partial pushes
    
    620
    +            return True
    
    621
    +        else:
    
    622
    +            # Check whether the specified element's project has push remotes configured
    
    623
    +            # to not accept partial artifact pushes
    
    624
    +            remotes_for_project = self._remotes[element._get_project()]
    
    625
    +            return any(remote.spec.partial_push for remote in remotes_for_project)
    
    626
    +
    
    599 627
         # push():
    
    600 628
         #
    
    601 629
         # Push committed artifact to remote repository.
    
    ... ... @@ -603,6 +631,8 @@ class ArtifactCache():
    603 631
         # Args:
    
    604 632
         #     element (Element): The Element whose artifact is to be pushed
    
    605 633
         #     keys (list): The cache keys to use
    
    634
    +    #     partial(bool): If the artifact is cached in a partial state
    
    635
    +    #     subdir(string): Optional subdir to not push
    
    606 636
         #
    
    607 637
         # Returns:
    
    608 638
         #   (bool): True if any remote was updated, False if no pushes were required
    
    ... ... @@ -610,12 +640,25 @@ class ArtifactCache():
    610 640
         # Raises:
    
    611 641
         #   (ArtifactError): if there was an error
    
    612 642
         #
    
    613
    -    def push(self, element, keys):
    
    643
    +    def push(self, element, keys, partial=False, subdir=None):
    
    614 644
             refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
    
    615 645
     
    
    616 646
             project = element._get_project()
    
    617 647
     
    
    618
    -        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    648
    +        push_remotes = []
    
    649
    +        partial_remotes = []
    
    650
    +
    
    651
    +        # Create list of remotes to push to, given current element and partial push config
    
    652
    +        if not partial:
    
    653
    +            push_remotes = [r for r in self._remotes[project] if (r.spec.push and not r.spec.partial_push)]
    
    654
    +
    
    655
    +        if self._has_partial_push_remotes:
    
    656
    +            # Create a specific list of the remotes expecting the artifact to be push in a partial
    
    657
    +            # state. This list needs to be pushed in a partial state, without the optional subdir if
    
    658
    +            # exists locally. No need to attempt pushing a partial artifact to a remote that is queued to
    
    659
    +            # to also recieve a full artifact
    
    660
    +            partial_remotes = [r for r in self._remotes[project] if (r.spec.partial_push and r.spec.push) and
    
    661
    +                               r not in push_remotes]
    
    619 662
     
    
    620 663
             pushed = False
    
    621 664
     
    
    ... ... @@ -624,7 +667,7 @@ class ArtifactCache():
    624 667
                 display_key = element._get_brief_display_key()
    
    625 668
                 element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
    
    626 669
     
    
    627
    -            if self.cas.push(refs, remote):
    
    670
    +            if self.cas.push(refs, remote, subdir=subdir):
    
    628 671
                     element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    629 672
                     pushed = True
    
    630 673
                 else:
    
    ... ... @@ -632,6 +675,19 @@ class ArtifactCache():
    632 675
                         remote.spec.url, element._get_brief_display_key()
    
    633 676
                     ))
    
    634 677
     
    
    678
    +        for remote in partial_remotes:
    
    679
    +            remote.init()
    
    680
    +            display_key = element._get_brief_display_key()
    
    681
    +            element.status("Pushing partial artifact {} -> {}".format(display_key, remote.spec.url))
    
    682
    +
    
    683
    +            if self.cas.push(refs, remote, excluded_subdirs=subdir):
    
    684
    +                element.info("Pushed partial artifact {} -> {}".format(display_key, remote.spec.url))
    
    685
    +                pushed = True
    
    686
    +            else:
    
    687
    +                element.info("Remote ({}) already has {} partial cached".format(
    
    688
    +                    remote.spec.url, element._get_brief_display_key()
    
    689
    +                ))
    
    690
    +
    
    635 691
             return pushed
    
    636 692
     
    
    637 693
         # pull():
    
    ... ... @@ -659,14 +715,23 @@ class ArtifactCache():
    659 715
                     element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
    
    660 716
     
    
    661 717
                     if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
    
    662
    -                    element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    663 718
                         if subdir:
    
    664
    -                        # Attempt to extract subdir into artifact extract dir if it already exists
    
    665
    -                        # without containing the subdir. If the respective artifact extract dir does not
    
    666
    -                        # exist a complete extraction will complete.
    
    667
    -                        self.extract(element, key, subdir)
    
    668
    -                    # no need to pull from additional remotes
    
    669
    -                    return True
    
    719
    +                        if not self.contains_subdir_artifact(element, key, subdir):
    
    720
    +                            # The pull was expecting the specific subdir to be present, attempt
    
    721
    +                            # to find it in other available remotes
    
    722
    +                            element.info("Pulled partial artifact {} <- {}. Attempting to retrieve {} from remotes"
    
    723
    +                                         .format(display_key, remote.spec.url, subdir))
    
    724
    +                        else:
    
    725
    +                            element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    726
    +                            # Attempt to extract subdir into artifact extract dir if it already exists
    
    727
    +                            # without containing the subdir. If the respective artifact extract dir does not
    
    728
    +                            # exist a complete extraction will complete.
    
    729
    +                            self.extract(element, key, subdir)
    
    730
    +                            # no need to pull from additional remotes
    
    731
    +                            return True
    
    732
    +                    else:
    
    733
    +                        element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    734
    +                        return True
    
    670 735
                     else:
    
    671 736
                         element.info("Remote ({}) does not have {} cached".format(
    
    672 737
                             remote.spec.url, element._get_brief_display_key()
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -45,7 +45,8 @@ from .. import _yaml
    45 45
     _MAX_PAYLOAD_BYTES = 1024 * 1024
    
    46 46
     
    
    47 47
     
    
    48
    -class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')):
    
    48
    +class CASRemoteSpec(namedtuple('CASRemoteSpec',
    
    49
    +                               'url push partial_push server_cert client_key client_cert instance_name')):
    
    49 50
     
    
    50 51
         # _new_from_config_node
    
    51 52
         #
    
    ... ... @@ -53,9 +54,13 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
    53 54
         #
    
    54 55
         @staticmethod
    
    55 56
         def _new_from_config_node(spec_node, basedir=None):
    
    56
    -        _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance_name'])
    
    57
    +        _yaml.node_validate(spec_node,
    
    58
    +                            ['url', 'push', 'allow-partial-push', 'server-cert', 'client-key',
    
    59
    +                             'client-cert', 'instance_name'])
    
    57 60
             url = _yaml.node_get(spec_node, str, 'url')
    
    58 61
             push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
    
    62
    +        partial_push = _yaml.node_get(spec_node, bool, 'allow-partial-push', default_value=False)
    
    63
    +
    
    59 64
             if not url:
    
    60 65
                 provenance = _yaml.node_get_provenance(spec_node, 'url')
    
    61 66
                 raise LoadError(LoadErrorReason.INVALID_DATA,
    
    ... ... @@ -85,10 +90,10 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
    85 90
                 raise LoadError(LoadErrorReason.INVALID_DATA,
    
    86 91
                                 "{}: 'client-cert' was specified without 'client-key'".format(provenance))
    
    87 92
     
    
    88
    -        return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name)
    
    93
    +        return CASRemoteSpec(url, push, partial_push, server_cert, client_key, client_cert, instance_name)
    
    89 94
     
    
    90 95
     
    
    91
    -CASRemoteSpec.__new__.__defaults__ = (None, None, None, None)
    
    96
    +CASRemoteSpec.__new__.__defaults__ = (False, None, None, None, None)
    
    92 97
     
    
    93 98
     
    
    94 99
     class BlobNotFound(CASError):
    
    ... ... @@ -283,34 +288,47 @@ class CASCache():
    283 288
         #   (bool): True if pull was successful, False if ref was not available
    
    284 289
         #
    
    285 290
         def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None):
    
    286
    -        try:
    
    287
    -            remote.init()
    
    288 291
     
    
    289
    -            request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
    
    290
    -            request.key = ref
    
    291
    -            response = remote.ref_storage.GetReference(request)
    
    292
    +        tree_found = False
    
    292 293
     
    
    293
    -            tree = remote_execution_pb2.Digest()
    
    294
    -            tree.hash = response.digest.hash
    
    295
    -            tree.size_bytes = response.digest.size_bytes
    
    294
    +        while True:
    
    295
    +            try:
    
    296
    +                if not tree_found:
    
    297
    +                    remote.init()
    
    296 298
     
    
    297
    -            # Check if the element artifact is present, if so just fetch the subdir.
    
    298
    -            if subdir and os.path.exists(self.objpath(tree)):
    
    299
    -                self._fetch_subdir(remote, tree, subdir)
    
    300
    -            else:
    
    301
    -                # Fetch artifact, excluded_subdirs determined in pullqueue
    
    302
    -                self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
    
    299
    +                    request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
    
    300
    +                    request.key = ref
    
    301
    +                    response = remote.ref_storage.GetReference(request)
    
    303 302
     
    
    304
    -            self.set_ref(ref, tree)
    
    303
    +                    tree = remote_execution_pb2.Digest()
    
    304
    +                    tree.hash = response.digest.hash
    
    305
    +                    tree.size_bytes = response.digest.size_bytes
    
    305 306
     
    
    306
    -            return True
    
    307
    -        except grpc.RpcError as e:
    
    308
    -            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    309
    -                raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
    
    310
    -            else:
    
    311
    -                return False
    
    312
    -        except BlobNotFound as e:
    
    313
    -            return False
    
    307
    +                # Check if the element artifact is present, if so just fetch the subdir.
    
    308
    +                if subdir and os.path.exists(self.objpath(tree)):
    
    309
    +                    self._fetch_subdir(remote, tree, subdir)
    
    310
    +                else:
    
    311
    +                    # Fetch artifact, excluded_subdirs determined in pullqueue
    
    312
    +                    self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
    
    313
    +
    
    314
    +                self.set_ref(ref, tree)
    
    315
    +
    
    316
    +                return True
    
    317
    +            except grpc.RpcError as e:
    
    318
    +                if e.code() != grpc.StatusCode.NOT_FOUND:
    
    319
    +                    raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
    
    320
    +                else:
    
    321
    +                    return False
    
    322
    +            except BlobNotFound as e:
    
    323
    +                if not excluded_subdirs and subdir:
    
    324
    +                    # The remote has the top level digest but could not complete a full pull,
    
    325
    +                    # attempt partial without the need to initialise and check for the artifact
    
    326
    +                    # digest. This default behaviour of dropping back to partial pulls could
    
    327
    +                    # be made a configurable warning given at artfictcache level.
    
    328
    +                    tree_found = True
    
    329
    +                    excluded_subdirs, subdir = subdir, excluded_subdirs
    
    330
    +                else:
    
    331
    +                    return False
    
    314 332
     
    
    315 333
         # pull_tree():
    
    316 334
         #
    
    ... ... @@ -355,6 +373,8 @@ class CASCache():
    355 373
         # Args:
    
    356 374
         #     refs (list): The refs to push
    
    357 375
         #     remote (CASRemote): The remote to push to
    
    376
    +    #     subdir (string): Optional specific subdir to include in the push
    
    377
    +    #     excluded_subdirs (list): The optional list of subdirs to not push
    
    358 378
         #
    
    359 379
         # Returns:
    
    360 380
         #   (bool): True if any remote was updated, False if no pushes were required
    
    ... ... @@ -362,7 +382,7 @@ class CASCache():
    362 382
         # Raises:
    
    363 383
         #   (CASError): if there was an error
    
    364 384
         #
    
    365
    -    def push(self, refs, remote):
    
    385
    +    def push(self, refs, remote, *, subdir=None, excluded_subdirs=None):
    
    366 386
             skipped_remote = True
    
    367 387
             try:
    
    368 388
                 for ref in refs:
    
    ... ... @@ -376,15 +396,18 @@ class CASCache():
    376 396
                         response = remote.ref_storage.GetReference(request)
    
    377 397
     
    
    378 398
                         if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    379
    -                        # ref is already on the server with the same tree
    
    380
    -                        continue
    
    399
    +                        # ref is already on the server with the same tree, however it might be partially cached.
    
    400
    +                        # If artifact is not set to be pushed partially attempt to 'complete' the remote artifact if
    
    401
    +                        # needed, else continue.
    
    402
    +                        if excluded_subdirs or self.verify_digest_on_remote(remote, self._get_subdir(tree, subdir)):
    
    403
    +                            continue
    
    381 404
     
    
    382 405
                     except grpc.RpcError as e:
    
    383 406
                         if e.code() != grpc.StatusCode.NOT_FOUND:
    
    384 407
                             # Intentionally re-raise RpcError for outer except block.
    
    385 408
                             raise
    
    386 409
     
    
    387
    -                self._send_directory(remote, tree)
    
    410
    +                self._send_directory(remote, tree, excluded_dir=excluded_subdirs)
    
    388 411
     
    
    389 412
                     request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
    
    390 413
                     request.keys.append(ref)
    
    ... ... @@ -866,10 +889,17 @@ class CASCache():
    866 889
                     a += 1
    
    867 890
                     b += 1
    
    868 891
     
    
    869
    -    def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
    
    892
    +    def _reachable_refs_dir(self, reachable, tree, update_mtime=False, subdir=False):
    
    870 893
             if tree.hash in reachable:
    
    871 894
                 return
    
    872 895
     
    
    896
    +        # If looping through subdir digests, skip processing if
    
    897
    +        # ref path does not exist, allowing for partial objects
    
    898
    +        if subdir and not os.path.exists(self.objpath(tree)):
    
    899
    +            return
    
    900
    +
    
    901
    +        # Raises FileNotFound exception is path does not exist,
    
    902
    +        # which should only be entered on the top level digest
    
    873 903
             if update_mtime:
    
    874 904
                 os.utime(self.objpath(tree))
    
    875 905
     
    
    ... ... @@ -886,9 +916,9 @@ class CASCache():
    886 916
                 reachable.add(filenode.digest.hash)
    
    887 917
     
    
    888 918
             for dirnode in directory.directories:
    
    889
    -            self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
    
    919
    +            self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, subdir=True)
    
    890 920
     
    
    891
    -    def _required_blobs(self, directory_digest):
    
    921
    +    def _required_blobs(self, directory_digest, excluded_dir=None):
    
    892 922
             # parse directory, and recursively add blobs
    
    893 923
             d = remote_execution_pb2.Digest()
    
    894 924
             d.hash = directory_digest.hash
    
    ... ... @@ -907,7 +937,8 @@ class CASCache():
    907 937
                 yield d
    
    908 938
     
    
    909 939
             for dirnode in directory.directories:
    
    910
    -            yield from self._required_blobs(dirnode.digest)
    
    940
    +            if dirnode.name != excluded_dir:
    
    941
    +                yield from self._required_blobs(dirnode.digest)
    
    911 942
     
    
    912 943
         def _fetch_blob(self, remote, digest, stream):
    
    913 944
             resource_name_components = ['blobs', digest.hash, str(digest.size_bytes)]
    
    ... ... @@ -1029,6 +1060,7 @@ class CASCache():
    1029 1060
                 objpath = self._ensure_blob(remote, dir_digest)
    
    1030 1061
     
    
    1031 1062
                 directory = remote_execution_pb2.Directory()
    
    1063
    +
    
    1032 1064
                 with open(objpath, 'rb') as f:
    
    1033 1065
                     directory.ParseFromString(f.read())
    
    1034 1066
     
    
    ... ... @@ -1104,9 +1136,8 @@ class CASCache():
    1104 1136
     
    
    1105 1137
             assert response.committed_size == digest.size_bytes
    
    1106 1138
     
    
    1107
    -    def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
    
    1108
    -        required_blobs = self._required_blobs(digest)
    
    1109
    -
    
    1139
    +    def _send_directory(self, remote, digest, u_uid=uuid.uuid4(), excluded_dir=None):
    
    1140
    +        required_blobs = self._required_blobs(digest, excluded_dir=excluded_dir)
    
    1110 1141
             missing_blobs = dict()
    
    1111 1142
             # Limit size of FindMissingBlobs request
    
    1112 1143
             for required_blobs_group in _grouper(required_blobs, 512):
    

  • buildstream/_frontend/app.py
    ... ... @@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages
    38 38
     from .._stream import Stream
    
    39 39
     from .._versions import BST_FORMAT_VERSION
    
    40 40
     from .. import _yaml
    
    41
    -from .._scheduler import ElementJob
    
    41
    +from .._scheduler import ElementJob, JobStatus
    
    42 42
     
    
    43 43
     # Import frontend assets
    
    44 44
     from . import Profile, LogLine, Status
    
    ... ... @@ -515,13 +515,13 @@ class App():
    515 515
             self._status.add_job(job)
    
    516 516
             self._maybe_render_status()
    
    517 517
     
    
    518
    -    def _job_completed(self, job, success):
    
    518
    +    def _job_completed(self, job, status):
    
    519 519
             self._status.remove_job(job)
    
    520 520
             self._maybe_render_status()
    
    521 521
     
    
    522 522
             # Dont attempt to handle a failure if the user has already opted to
    
    523 523
             # terminate
    
    524
    -        if not success and not self.stream.terminated:
    
    524
    +        if status == JobStatus.FAIL and not self.stream.terminated:
    
    525 525
     
    
    526 526
                 if isinstance(job, ElementJob):
    
    527 527
                     element = job.element
    

  • buildstream/_scheduler/__init__.py
    ... ... @@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue
    26 26
     from .queues.pullqueue import PullQueue
    
    27 27
     
    
    28 28
     from .scheduler import Scheduler, SchedStatus
    
    29
    -from .jobs import ElementJob
    29
    +from .jobs import ElementJob, JobStatus

  • buildstream/_scheduler/jobs/__init__.py
    ... ... @@ -20,3 +20,4 @@
    20 20
     from .elementjob import ElementJob
    
    21 21
     from .cachesizejob import CacheSizeJob
    
    22 22
     from .cleanupjob import CleanupJob
    
    23
    +from .job import JobStatus

  • buildstream/_scheduler/jobs/cachesizejob.py
    ... ... @@ -16,7 +16,7 @@
    16 16
     #  Author:
    
    17 17
     #        Tristan Daniël Maat <tristan maat codethink co uk>
    
    18 18
     #
    
    19
    -from .job import Job
    
    19
    +from .job import Job, JobStatus
    
    20 20
     
    
    21 21
     
    
    22 22
     class CacheSizeJob(Job):
    
    ... ... @@ -30,8 +30,8 @@ class CacheSizeJob(Job):
    30 30
         def child_process(self):
    
    31 31
             return self._artifacts.compute_cache_size()
    
    32 32
     
    
    33
    -    def parent_complete(self, success, result):
    
    34
    -        if success:
    
    33
    +    def parent_complete(self, status, result):
    
    34
    +        if status == JobStatus.OK:
    
    35 35
                 self._artifacts.set_cache_size(result)
    
    36 36
     
    
    37 37
                 if self._complete_cb:
    

  • buildstream/_scheduler/jobs/cleanupjob.py
    ... ... @@ -16,7 +16,7 @@
    16 16
     #  Author:
    
    17 17
     #        Tristan Daniël Maat <tristan maat codethink co uk>
    
    18 18
     #
    
    19
    -from .job import Job
    
    19
    +from .job import Job, JobStatus
    
    20 20
     
    
    21 21
     
    
    22 22
     class CleanupJob(Job):
    
    ... ... @@ -29,6 +29,6 @@ class CleanupJob(Job):
    29 29
         def child_process(self):
    
    30 30
             return self._artifacts.clean()
    
    31 31
     
    
    32
    -    def parent_complete(self, success, result):
    
    33
    -        if success:
    
    32
    +    def parent_complete(self, status, result):
    
    33
    +        if status == JobStatus.OK:
    
    34 34
                 self._artifacts.set_cache_size(result)

  • buildstream/_scheduler/jobs/elementjob.py
    ... ... @@ -60,7 +60,7 @@ from .job import Job
    60 60
     #     Args:
    
    61 61
     #        job (Job): The job object which completed
    
    62 62
     #        element (Element): The element passed to the Job() constructor
    
    63
    -#        success (bool): True if the action_cb did not raise an exception
    
    63
    +#        status (JobStatus): The status of whether the workload raised an exception
    
    64 64
     #        result (object): The deserialized object returned by the `action_cb`, or None
    
    65 65
     #                         if `success` is False
    
    66 66
     #
    
    ... ... @@ -93,8 +93,8 @@ class ElementJob(Job):
    93 93
             # Run the action
    
    94 94
             return self._action_cb(self._element)
    
    95 95
     
    
    96
    -    def parent_complete(self, success, result):
    
    97
    -        self._complete_cb(self, self._element, success, self._result)
    
    96
    +    def parent_complete(self, status, result):
    
    97
    +        self._complete_cb(self, self._element, status, self._result)
    
    98 98
     
    
    99 99
         def message(self, message_type, message, **kwargs):
    
    100 100
             args = dict(kwargs)
    

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -28,8 +28,6 @@ import traceback
    28 28
     import asyncio
    
    29 29
     import multiprocessing
    
    30 30
     
    
    31
    -import psutil
    
    32
    -
    
    33 31
     # BuildStream toplevel imports
    
    34 32
     from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
    
    35 33
     from ..._message import Message, MessageType, unconditional_messages
    
    ... ... @@ -43,6 +41,22 @@ RC_PERM_FAIL = 2
    43 41
     RC_SKIPPED = 3
    
    44 42
     
    
    45 43
     
    
    44
    +# JobStatus:
    
    45
    +#
    
    46
    +# The job completion status, passed back through the
    
    47
    +# complete callbacks.
    
    48
    +#
    
    49
    +class JobStatus():
    
    50
    +    # Job succeeded
    
    51
    +    OK = 0
    
    52
    +
    
    53
    +    # A temporary BstError was raised
    
    54
    +    FAIL = 1
    
    55
    +
    
    56
    +    # A SkipJob was raised
    
    57
    +    SKIPPED = 3
    
    58
    +
    
    59
    +
    
    46 60
     # Used to distinguish between status messages and return values
    
    47 61
     class Envelope():
    
    48 62
         def __init__(self, message_type, message):
    
    ... ... @@ -118,7 +132,6 @@ class Job():
    118 132
             self._max_retries = max_retries        # Maximum number of automatic retries
    
    119 133
             self._result = None                    # Return value of child action in the parent
    
    120 134
             self._tries = 0                        # Try count, for retryable jobs
    
    121
    -        self._skipped_flag = False             # Indicate whether the job was skipped.
    
    122 135
             self._terminated = False               # Whether this job has been explicitly terminated
    
    123 136
     
    
    124 137
             # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
    
    ... ... @@ -215,17 +228,10 @@ class Job():
    215 228
         # Forcefully kill the process, and any children it might have.
    
    216 229
         #
    
    217 230
         def kill(self):
    
    218
    -
    
    219 231
             # Force kill
    
    220 232
             self.message(MessageType.WARN,
    
    221 233
                          "{} did not terminate gracefully, killing".format(self.action_name))
    
    222
    -
    
    223
    -        try:
    
    224
    -            utils._kill_process_tree(self._process.pid)
    
    225
    -        # This can happen if the process died of its own accord before
    
    226
    -        # we try to kill it
    
    227
    -        except psutil.NoSuchProcess:
    
    228
    -            return
    
    234
    +        utils._kill_process_tree(self._process.pid)
    
    229 235
     
    
    230 236
         # suspend()
    
    231 237
         #
    
    ... ... @@ -282,18 +288,6 @@ class Job():
    282 288
         def set_task_id(self, task_id):
    
    283 289
             self._task_id = task_id
    
    284 290
     
    
    285
    -    # skipped
    
    286
    -    #
    
    287
    -    # This will evaluate to True if the job was skipped
    
    288
    -    # during processing, or if it was forcefully terminated.
    
    289
    -    #
    
    290
    -    # Returns:
    
    291
    -    #    (bool): Whether the job should appear as skipped
    
    292
    -    #
    
    293
    -    @property
    
    294
    -    def skipped(self):
    
    295
    -        return self._skipped_flag or self._terminated
    
    296
    -
    
    297 291
         #######################################################
    
    298 292
         #                  Abstract Methods                   #
    
    299 293
         #######################################################
    
    ... ... @@ -304,10 +298,10 @@ class Job():
    304 298
         # pass the result to the main thread.
    
    305 299
         #
    
    306 300
         # Args:
    
    307
    -    #    success (bool): Whether the job was successful.
    
    301
    +    #    status (JobStatus): The job exit status
    
    308 302
         #    result (any): The result returned by child_process().
    
    309 303
         #
    
    310
    -    def parent_complete(self, success, result):
    
    304
    +    def parent_complete(self, status, result):
    
    311 305
             raise ImplError("Job '{kind}' does not implement parent_complete()"
    
    312 306
                             .format(kind=type(self).__name__))
    
    313 307
     
    
    ... ... @@ -571,16 +565,23 @@ class Job():
    571 565
             #
    
    572 566
             self._retry_flag = returncode == RC_FAIL
    
    573 567
     
    
    574
    -        # Set the flag to alert Queue that this job skipped.
    
    575
    -        self._skipped_flag = returncode == RC_SKIPPED
    
    576
    -
    
    577 568
             if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
    
    578 569
                 self.spawn()
    
    579 570
                 return
    
    580 571
     
    
    581
    -        success = returncode in (RC_OK, RC_SKIPPED)
    
    582
    -        self.parent_complete(success, self._result)
    
    583
    -        self._scheduler.job_completed(self, success)
    
    572
    +        # Resolve the outward facing overall job completion status
    
    573
    +        #
    
    574
    +        if returncode == RC_OK:
    
    575
    +            status = JobStatus.OK
    
    576
    +        elif returncode == RC_SKIPPED:
    
    577
    +            status = JobStatus.SKIPPED
    
    578
    +        elif returncode in (RC_FAIL, RC_PERM_FAIL):
    
    579
    +            status = JobStatus.FAIL
    
    580
    +        else:
    
    581
    +            status = JobStatus.FAIL
    
    582
    +
    
    583
    +        self.parent_complete(status, self._result)
    
    584
    +        self._scheduler.job_completed(self, status)
    
    584 585
     
    
    585 586
             # Force the deletion of the queue and process objects to try and clean up FDs
    
    586 587
             self._queue = self._process = None
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -21,7 +21,7 @@
    21 21
     from datetime import timedelta
    
    22 22
     
    
    23 23
     from . import Queue, QueueStatus
    
    24
    -from ..jobs import ElementJob
    
    24
    +from ..jobs import ElementJob, JobStatus
    
    25 25
     from ..resources import ResourceType
    
    26 26
     from ..._message import MessageType
    
    27 27
     
    
    ... ... @@ -104,7 +104,7 @@ class BuildQueue(Queue):
    104 104
             if artifacts.has_quota_exceeded():
    
    105 105
                 self._scheduler.check_cache_size()
    
    106 106
     
    
    107
    -    def done(self, job, element, result, success):
    
    107
    +    def done(self, job, element, result, status):
    
    108 108
     
    
    109 109
             # Inform element in main process that assembly is done
    
    110 110
             element._assemble_done()
    
    ... ... @@ -117,5 +117,5 @@ class BuildQueue(Queue):
    117 117
             #        artifact cache size for a successful build even though we know a
    
    118 118
             #        failed build also grows the artifact cache size.
    
    119 119
             #
    
    120
    -        if success:
    
    120
    +        if status == JobStatus.OK:
    
    121 121
                 self._check_cache_size(job, element, result)

  • buildstream/_scheduler/queues/fetchqueue.py
    ... ... @@ -24,6 +24,7 @@ from ... import Consistency
    24 24
     # Local imports
    
    25 25
     from . import Queue, QueueStatus
    
    26 26
     from ..resources import ResourceType
    
    27
    +from ..jobs import JobStatus
    
    27 28
     
    
    28 29
     
    
    29 30
     # A queue which fetches element sources
    
    ... ... @@ -66,9 +67,9 @@ class FetchQueue(Queue):
    66 67
     
    
    67 68
             return QueueStatus.READY
    
    68 69
     
    
    69
    -    def done(self, _, element, result, success):
    
    70
    +    def done(self, _, element, result, status):
    
    70 71
     
    
    71
    -        if not success:
    
    72
    +        if status == JobStatus.FAIL:
    
    72 73
                 return
    
    73 74
     
    
    74 75
             element._update_state()
    

  • buildstream/_scheduler/queues/pullqueue.py
    ... ... @@ -21,6 +21,7 @@
    21 21
     # Local imports
    
    22 22
     from . import Queue, QueueStatus
    
    23 23
     from ..resources import ResourceType
    
    24
    +from ..jobs import JobStatus
    
    24 25
     from ..._exceptions import SkipJob
    
    25 26
     
    
    26 27
     
    
    ... ... @@ -54,9 +55,9 @@ class PullQueue(Queue):
    54 55
             else:
    
    55 56
                 return QueueStatus.SKIP
    
    56 57
     
    
    57
    -    def done(self, _, element, result, success):
    
    58
    +    def done(self, _, element, result, status):
    
    58 59
     
    
    59
    -        if not success:
    
    60
    +        if status == JobStatus.FAIL:
    
    60 61
                 return
    
    61 62
     
    
    62 63
             element._pull_done()
    
    ... ... @@ -64,4 +65,5 @@ class PullQueue(Queue):
    64 65
             # Build jobs will check the "approximate" size first. Since we
    
    65 66
             # do not get an artifact size from pull jobs, we have to
    
    66 67
             # actually check the cache size.
    
    67
    -        self._scheduler.check_cache_size()
    68
    +        if status == JobStatus.OK:
    
    69
    +            self._scheduler.check_cache_size()

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -25,7 +25,7 @@ from enum import Enum
    25 25
     import traceback
    
    26 26
     
    
    27 27
     # Local imports
    
    28
    -from ..jobs import ElementJob
    
    28
    +from ..jobs import ElementJob, JobStatus
    
    29 29
     from ..resources import ResourceType
    
    30 30
     
    
    31 31
     # BuildStream toplevel imports
    
    ... ... @@ -133,10 +133,9 @@ class Queue():
    133 133
         #    job (Job): The job which completed processing
    
    134 134
         #    element (Element): The element which completed processing
    
    135 135
         #    result (any): The return value of the process() implementation
    
    136
    -    #    success (bool): True if the process() implementation did not
    
    137
    -    #                    raise any exception
    
    136
    +    #    status (JobStatus): The return status of the Job
    
    138 137
         #
    
    139
    -    def done(self, job, element, result, success):
    
    138
    +    def done(self, job, element, result, status):
    
    140 139
             pass
    
    141 140
     
    
    142 141
         #####################################################
    
    ... ... @@ -291,7 +290,7 @@ class Queue():
    291 290
         #
    
    292 291
         # See the Job object for an explanation of the call signature
    
    293 292
         #
    
    294
    -    def _job_done(self, job, element, success, result):
    
    293
    +    def _job_done(self, job, element, status, result):
    
    295 294
     
    
    296 295
             # Update values that need to be synchronized in the main task
    
    297 296
             # before calling any queue implementation
    
    ... ... @@ -301,7 +300,7 @@ class Queue():
    301 300
             # and determine if it should be considered as processed
    
    302 301
             # or skipped.
    
    303 302
             try:
    
    304
    -            self.done(job, element, result, success)
    
    303
    +            self.done(job, element, result, status)
    
    305 304
             except BstError as e:
    
    306 305
     
    
    307 306
                 # Report error and mark as failed
    
    ... ... @@ -332,12 +331,10 @@ class Queue():
    332 331
                 # All jobs get placed on the done queue for later processing.
    
    333 332
                 self._done_queue.append(job)
    
    334 333
     
    
    335
    -            # A Job can be skipped whether or not it has failed,
    
    336
    -            # we want to only bookkeep them as processed or failed
    
    337
    -            # if they are not skipped.
    
    338
    -            if job.skipped:
    
    334
    +            # These lists are for bookkeeping purposes for the UI and logging.
    
    335
    +            if status == JobStatus.SKIPPED:
    
    339 336
                     self.skipped_elements.append(element)
    
    340
    -            elif success:
    
    337
    +            elif status == JobStatus.OK:
    
    341 338
                     self.processed_elements.append(element)
    
    342 339
                 else:
    
    343 340
                     self.failed_elements.append(element)
    

  • buildstream/_scheduler/queues/trackqueue.py
    ... ... @@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup
    24 24
     # Local imports
    
    25 25
     from . import Queue, QueueStatus
    
    26 26
     from ..resources import ResourceType
    
    27
    +from ..jobs import JobStatus
    
    27 28
     
    
    28 29
     
    
    29 30
     # A queue which tracks sources
    
    ... ... @@ -47,9 +48,9 @@ class TrackQueue(Queue):
    47 48
     
    
    48 49
             return QueueStatus.READY
    
    49 50
     
    
    50
    -    def done(self, _, element, result, success):
    
    51
    +    def done(self, _, element, result, status):
    
    51 52
     
    
    52
    -        if not success:
    
    53
    +        if status == JobStatus.FAIL:
    
    53 54
                 return
    
    54 55
     
    
    55 56
             # Set the new refs in the main process one by one as they complete
    

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -38,6 +38,16 @@ class SchedStatus():
    38 38
         TERMINATED = 1
    
    39 39
     
    
    40 40
     
    
    41
    +# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
    
    42
    +# which we launch dynamically, they have the property of being
    
    43
    +# meaningless to queue if one is already queued, and it also
    
    44
    +# doesnt make sense to run them in parallel
    
    45
    +#
    
    46
    +_ACTION_NAME_CLEANUP = 'cleanup'
    
    47
    +_ACTION_NAME_CACHE_SIZE = 'cache_size'
    
    48
    +_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
    
    49
    +
    
    50
    +
    
    41 51
     # Scheduler()
    
    42 52
     #
    
    43 53
     # The scheduler operates on a list queues, each of which is meant to accomplish
    
    ... ... @@ -94,6 +104,15 @@ class Scheduler():
    94 104
             self._suspendtime = None
    
    95 105
             self._queue_jobs = True      # Whether we should continue to queue jobs
    
    96 106
     
    
    107
    +        # Whether our exclusive jobs, like 'cleanup' are currently already
    
    108
    +        # waiting or active.
    
    109
    +        #
    
    110
    +        # This is just a bit quicker than scanning the wait queue and active
    
    111
    +        # queue and comparing job action names.
    
    112
    +        #
    
    113
    +        self._exclusive_waiting = set()
    
    114
    +        self._exclusive_active = set()
    
    115
    +
    
    97 116
             self._resources = Resources(context.sched_builders,
    
    98 117
                                         context.sched_fetchers,
    
    99 118
                                         context.sched_pushers)
    
    ... ... @@ -211,19 +230,6 @@ class Scheduler():
    211 230
                 starttime = timenow
    
    212 231
             return timenow - starttime
    
    213 232
     
    
    214
    -    # schedule_jobs()
    
    215
    -    #
    
    216
    -    # Args:
    
    217
    -    #     jobs ([Job]): A list of jobs to schedule
    
    218
    -    #
    
    219
    -    # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
    
    220
    -    # run as soon any other queueing jobs finish, provided sufficient
    
    221
    -    # resources are available for them to run
    
    222
    -    #
    
    223
    -    def schedule_jobs(self, jobs):
    
    224
    -        for job in jobs:
    
    225
    -            self.waiting_jobs.append(job)
    
    226
    -
    
    227 233
         # job_completed():
    
    228 234
         #
    
    229 235
         # Called when a Job completes
    
    ... ... @@ -231,12 +237,14 @@ class Scheduler():
    231 237
         # Args:
    
    232 238
         #    queue (Queue): The Queue holding a complete job
    
    233 239
         #    job (Job): The completed Job
    
    234
    -    #    success (bool): Whether the Job completed with a success status
    
    240
    +    #    status (JobStatus): The status of the completed job
    
    235 241
         #
    
    236
    -    def job_completed(self, job, success):
    
    242
    +    def job_completed(self, job, status):
    
    237 243
             self._resources.clear_job_resources(job)
    
    238 244
             self.active_jobs.remove(job)
    
    239
    -        self._job_complete_callback(job, success)
    
    245
    +        if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    246
    +            self._exclusive_active.remove(job.action_name)
    
    247
    +        self._job_complete_callback(job, status)
    
    240 248
             self._schedule_queue_jobs()
    
    241 249
             self._sched()
    
    242 250
     
    
    ... ... @@ -246,18 +254,13 @@ class Scheduler():
    246 254
         # size is calculated, a cleanup job will be run automatically
    
    247 255
         # if needed.
    
    248 256
         #
    
    249
    -    # FIXME: This should ensure that only one cache size job
    
    250
    -    #        is ever pending at a given time. If a cache size
    
    251
    -    #        job is already running, it is correct to queue
    
    252
    -    #        a new one, it is incorrect to have more than one
    
    253
    -    #        of these jobs pending at a given time, though.
    
    254
    -    #
    
    255 257
         def check_cache_size(self):
    
    256
    -        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
    
    258
    +        job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
    
    259
    +                           'cache_size/cache_size',
    
    257 260
                                resources=[ResourceType.CACHE,
    
    258 261
                                           ResourceType.PROCESS],
    
    259 262
                                complete_cb=self._run_cleanup)
    
    260
    -        self.schedule_jobs([job])
    
    263
    +        self._schedule_jobs([job])
    
    261 264
     
    
    262 265
         #######################################################
    
    263 266
         #                  Local Private Methods              #
    
    ... ... @@ -276,10 +279,19 @@ class Scheduler():
    276 279
                 if not self._resources.reserve_job_resources(job):
    
    277 280
                     continue
    
    278 281
     
    
    282
    +            # Postpone these jobs if one is already running
    
    283
    +            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
    
    284
    +               job.action_name in self._exclusive_active:
    
    285
    +                continue
    
    286
    +
    
    279 287
                 job.spawn()
    
    280 288
                 self.waiting_jobs.remove(job)
    
    281 289
                 self.active_jobs.append(job)
    
    282 290
     
    
    291
    +            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    292
    +                self._exclusive_waiting.remove(job.action_name)
    
    293
    +                self._exclusive_active.add(job.action_name)
    
    294
    +
    
    283 295
                 if self._job_start_callback:
    
    284 296
                     self._job_start_callback(job)
    
    285 297
     
    
    ... ... @@ -287,6 +299,33 @@ class Scheduler():
    287 299
             if not self.active_jobs and not self.waiting_jobs:
    
    288 300
                 self.loop.stop()
    
    289 301
     
    
    302
    +    # _schedule_jobs()
    
    303
    +    #
    
    304
    +    # The main entry point for jobs to be scheduled.
    
    305
    +    #
    
    306
    +    # This is called either as a result of scanning the queues
    
    307
    +    # in _schedule_queue_jobs(), or directly by the Scheduler
    
    308
    +    # to insert special jobs like cleanups.
    
    309
    +    #
    
    310
    +    # Args:
    
    311
    +    #     jobs ([Job]): A list of jobs to schedule
    
    312
    +    #
    
    313
    +    def _schedule_jobs(self, jobs):
    
    314
    +        for job in jobs:
    
    315
    +
    
    316
    +            # Special treatment of our redundant exclusive jobs
    
    317
    +            #
    
    318
    +            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    319
    +
    
    320
    +                # Drop the job if one is already queued
    
    321
    +                if job.action_name in self._exclusive_waiting:
    
    322
    +                    continue
    
    323
    +
    
    324
    +                # Mark this action type as queued
    
    325
    +                self._exclusive_waiting.add(job.action_name)
    
    326
    +
    
    327
    +            self.waiting_jobs.append(job)
    
    328
    +
    
    290 329
         # _schedule_queue_jobs()
    
    291 330
         #
    
    292 331
         # Ask the queues what jobs they want to schedule and schedule
    
    ... ... @@ -331,7 +370,7 @@ class Scheduler():
    331 370
                 # the next queue and process them.
    
    332 371
                 process_queues = any(q.dequeue_ready() for q in self.queues)
    
    333 372
     
    
    334
    -        self.schedule_jobs(ready)
    
    373
    +        self._schedule_jobs(ready)
    
    335 374
             self._sched()
    
    336 375
     
    
    337 376
         # _run_cleanup()
    
    ... ... @@ -353,11 +392,11 @@ class Scheduler():
    353 392
             if not artifacts.has_quota_exceeded():
    
    354 393
                 return
    
    355 394
     
    
    356
    -        job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
    
    395
    +        job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
    
    357 396
                              resources=[ResourceType.CACHE,
    
    358 397
                                         ResourceType.PROCESS],
    
    359 398
                              exclusive_resources=[ResourceType.CACHE])
    
    360
    -        self.schedule_jobs([job])
    
    399
    +        self._schedule_jobs([job])
    
    361 400
     
    
    362 401
         # _suspend_jobs()
    
    363 402
         #
    

  • buildstream/element.py
    ... ... @@ -65,7 +65,7 @@ Miscellaneous abstract methods also exist:
    65 65
     
    
    66 66
     * :func:`Element.generate_script() <buildstream.element.Element.generate_script>`
    
    67 67
     
    
    68
    -  For the purpose of ``bst source bundle``, an Element may optionally implement this.
    
    68
    +  For the purpose of ``bst source checkout --include-build-scripts``, an Element may optionally implement this.
    
    69 69
     
    
    70 70
     
    
    71 71
     Class Reference
    
    ... ... @@ -1800,13 +1800,19 @@ class Element(Plugin):
    1800 1800
         #   (bool): True if this element does not need a push job to be created
    
    1801 1801
         #
    
    1802 1802
         def _skip_push(self):
    
    1803
    +
    
    1803 1804
             if not self.__artifacts.has_push_remotes(element=self):
    
    1804 1805
                 # No push remotes for this element's project
    
    1805 1806
                 return True
    
    1806 1807
     
    
    1807 1808
             # Do not push elements that aren't cached, or that are cached with a dangling buildtree
    
    1808
    -        # artifact unless element type is expected to have an an empty buildtree directory
    
    1809
    -        if not self._cached_buildtree():
    
    1809
    +        # artifact unless element type is expected to have an an empty buildtree directory. Check
    
    1810
    +        # that this default behaviour is not overriden via a remote configured to allow pushing
    
    1811
    +        # artifacts without their corresponding buildtree.
    
    1812
    +        if not self._cached():
    
    1813
    +            return True
    
    1814
    +
    
    1815
    +        if not self._cached_buildtree() and not self.__artifacts.has_partial_push_remotes(element=self):
    
    1810 1816
                 return True
    
    1811 1817
     
    
    1812 1818
             # Do not push tainted artifact
    
    ... ... @@ -1817,7 +1823,8 @@ class Element(Plugin):
    1817 1823
     
    
    1818 1824
         # _push():
    
    1819 1825
         #
    
    1820
    -    # Push locally cached artifact to remote artifact repository.
    
    1826
    +    # Push locally cached artifact to remote artifact repository. An attempt
    
    1827
    +    # will be made to push partial artifacts given current config
    
    1821 1828
         #
    
    1822 1829
         # Returns:
    
    1823 1830
         #   (bool): True if the remote was updated, False if it already existed
    
    ... ... @@ -1830,8 +1837,19 @@ class Element(Plugin):
    1830 1837
                 self.warn("Not pushing tainted artifact.")
    
    1831 1838
                 return False
    
    1832 1839
     
    
    1833
    -        # Push all keys used for local commit
    
    1834
    -        pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
    
    1840
    +        # Push all keys used for local commit, this could be full or partial,
    
    1841
    +        # given previous _skip_push() logic. If buildtree isn't cached, then
    
    1842
    +        # set partial push
    
    1843
    +
    
    1844
    +        partial = False
    
    1845
    +        subdir = 'buildtree'
    
    1846
    +        if not self._cached_buildtree():
    
    1847
    +            partial = True
    
    1848
    +
    
    1849
    +        pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit(), partial=partial, subdir=subdir)
    
    1850
    +
    
    1851
    +        # Artifact might be cached in the server partially with the top level ref existing.
    
    1852
    +        # Check if we need to attempt a push of a locally cached buildtree given current config
    
    1835 1853
             if not pushed:
    
    1836 1854
                 return False
    
    1837 1855
     
    

  • buildstream/sandbox/sandbox.py
    ... ... @@ -592,7 +592,7 @@ class _SandboxBatch():
    592 592
             if command.label:
    
    593 593
                 context = self.sandbox._get_context()
    
    594 594
                 message = Message(self.sandbox._get_plugin_id(), MessageType.STATUS,
    
    595
    -                              'Running {}'.format(command.label))
    
    595
    +                              'Running command', detail=command.label)
    
    596 596
                 context.message(message)
    
    597 597
     
    
    598 598
             exitcode = self.sandbox._run(command.command, self.flags, cwd=command.cwd, env=command.env)
    

  • buildstream/utils.py
    ... ... @@ -1050,6 +1050,11 @@ def _kill_process_tree(pid):
    1050 1050
                 # Ignore this error, it can happen with
    
    1051 1051
                 # some setuid bwrap processes.
    
    1052 1052
                 pass
    
    1053
    +        except psutil.NoSuchProcess:
    
    1054
    +            # It is certain that this has already been sent
    
    1055
    +            # SIGTERM, so there is a window where the process
    
    1056
    +            # could have exited already.
    
    1057
    +            pass
    
    1053 1058
     
    
    1054 1059
         # Bloody Murder
    
    1055 1060
         for child in children:
    

  • tests/integration/pushbuildtrees.py
    1
    +import os
    
    2
    +import shutil
    
    3
    +import pytest
    
    4
    +
    
    5
    +from tests.testutils import cli_integration as cli, create_artifact_share
    
    6
    +from tests.testutils.integration import assert_contains
    
    7
    +from tests.testutils.site import HAVE_BWRAP, IS_LINUX
    
    8
    +from buildstream._exceptions import ErrorDomain, LoadErrorReason
    
    9
    +
    
    10
    +
    
    11
    +DATA_DIR = os.path.join(
    
    12
    +    os.path.dirname(os.path.realpath(__file__)),
    
    13
    +    "project"
    
    14
    +)
    
    15
    +
    
    16
    +
    
    17
    +# Remove artifact cache & set cli.config value of pull-buildtrees
    
    18
    +# to false, which is the default user context. The cache has to be
    
    19
    +# cleared as just forcefully removing the refpath leaves dangling objects.
    
    20
    +def default_state(cli, tmpdir, share):
    
    21
    +    shutil.rmtree(os.path.join(str(tmpdir), 'artifacts'))
    
    22
    +    cli.configure({
    
    23
    +        'artifacts': {'url': share.repo, 'push': False},
    
    24
    +        'artifactdir': os.path.join(str(tmpdir), 'artifacts'),
    
    25
    +        'cache': {'pull-buildtrees': False},
    
    26
    +    })
    
    27
    +
    
    28
    +
    
    29
    +# Tests to capture the integration of the optionl push of buildtrees.
    
    30
    +# The behaviour should encompass pushing artifacts that are already cached
    
    31
    +# without a buildtree as well as artifacts that are cached with their buildtree.
    
    32
    +# This option is handled via 'allow-partial-push' on a per artifact remote config
    
    33
    +# node basis. Multiple remote config nodes can point to the same url and as such can
    
    34
    +# have different 'allow-partial-push' options, tests need to cover this using project
    
    35
    +# confs.
    
    36
    +@pytest.mark.integration
    
    37
    +@pytest.mark.datafiles(DATA_DIR)
    
    38
    +@pytest.mark.skipif(IS_LINUX and not HAVE_BWRAP, reason='Only available with bubblewrap on Linux')
    
    39
    +def test_pushbuildtrees(cli, tmpdir, datafiles, integration_cache):
    
    40
    +    project = os.path.join(datafiles.dirname, datafiles.basename)
    
    41
    +    element_name = 'autotools/amhello.bst'
    
    42
    +
    
    43
    +    # Create artifact shares for pull & push testing
    
    44
    +    with create_artifact_share(os.path.join(str(tmpdir), 'share1')) as share1,\
    
    45
    +        create_artifact_share(os.path.join(str(tmpdir), 'share2')) as share2,\
    
    46
    +        create_artifact_share(os.path.join(str(tmpdir), 'share3')) as share3,\
    
    47
    +        create_artifact_share(os.path.join(str(tmpdir), 'share4')) as share4:
    
    48
    +
    
    49
    +        cli.configure({
    
    50
    +            'artifacts': {'url': share1.repo, 'push': True},
    
    51
    +            'artifactdir': os.path.join(str(tmpdir), 'artifacts')
    
    52
    +        })
    
    53
    +
    
    54
    +        cli.configure({'artifacts': [{'url': share1.repo, 'push': True},
    
    55
    +                                     {'url': share2.repo, 'push': True, 'allow-partial-push': True}]})
    
    56
    +
    
    57
    +        # Build autotools element, checked pushed, delete local.
    
    58
    +        # As share 2 has push & allow-partial-push set a true, it
    
    59
    +        # should have pushed the artifacts, without the cached buildtrees,
    
    60
    +        # to it.
    
    61
    +        result = cli.run(project=project, args=['build', element_name])
    
    62
    +        assert result.exit_code == 0
    
    63
    +        assert cli.get_element_state(project, element_name) == 'cached'
    
    64
    +        elementdigest = share1.has_artifact('test', element_name, cli.get_element_key(project, element_name))
    
    65
    +        buildtreedir = os.path.join(str(tmpdir), 'artifacts', 'extract', 'test', 'autotools-amhello',
    
    66
    +                                    elementdigest.hash, 'buildtree')
    
    67
    +        assert os.path.isdir(buildtreedir)
    
    68
    +        assert element_name in result.get_partial_pushed_elements()
    
    69
    +        assert element_name in result.get_pushed_elements()
    
    70
    +        assert share1.has_artifact('test', element_name, cli.get_element_key(project, element_name))
    
    71
    +        assert share2.has_artifact('test', element_name, cli.get_element_key(project, element_name))
    
    72
    +        default_state(cli, tmpdir, share1)
    
    73
    +
    
    74
    +        # Check that after explictly pulling an artifact without it's buildtree,
    
    75
    +        # we can push it to another remote that is configured to accept the partial
    
    76
    +        # artifact
    
    77
    +        result = cli.run(project=project, args=['pull', element_name])
    
    78
    +        assert element_name in result.get_pulled_elements()
    
    79
    +        cli.configure({'artifacts': {'url': share3.repo, 'push': True, 'allow-partial-push': True}})
    
    80
    +        assert cli.get_element_state(project, element_name) == 'cached'
    
    81
    +        assert not os.path.isdir(buildtreedir)
    
    82
    +        result = cli.run(project=project, args=['push', element_name])
    
    83
    +        assert result.exit_code == 0
    
    84
    +        assert element_name in result.get_partial_pushed_elements()
    
    85
    +        assert element_name not in result.get_pushed_elements()
    
    86
    +        assert share3.has_artifact('test', element_name, cli.get_element_key(project, element_name))
    
    87
    +        default_state(cli, tmpdir, share3)
    
    88
    +
    
    89
    +        # Delete the local cache and pull the partial artifact from share 3,
    
    90
    +        # this should not include the buildtree when extracted locally, even when
    
    91
    +        # pull-buildtrees is given as a cli parameter as no available remotes will
    
    92
    +        # contain the buildtree
    
    93
    +        assert not os.path.isdir(buildtreedir)
    
    94
    +        assert cli.get_element_state(project, element_name) != 'cached'
    
    95
    +        result = cli.run(project=project, args=['--pull-buildtrees', 'pull', element_name])
    
    96
    +        assert element_name in result.get_partial_pulled_elements()
    
    97
    +        assert not os.path.isdir(buildtreedir)
    
    98
    +        default_state(cli, tmpdir, share3)
    
    99
    +
    
    100
    +        # Delete the local cache and attempt to pull a 'full' artifact, including its
    
    101
    +        # buildtree. As with before share3 being the first listed remote will not have
    
    102
    +        # the buildtree available and should spawn a partial pull. Having share1 as the
    
    103
    +        # second available remote should allow the buildtree to be pulled thus 'completing'
    
    104
    +        # the artifact
    
    105
    +        cli.configure({'artifacts': [{'url': share3.repo, 'push': True, 'allow-partial-push': True},
    
    106
    +                                     {'url': share1.repo, 'push': True}]})
    
    107
    +        assert cli.get_element_state(project, element_name) != 'cached'
    
    108
    +        result = cli.run(project=project, args=['--pull-buildtrees', 'pull', element_name])
    
    109
    +        assert element_name in result.get_partial_pulled_elements()
    
    110
    +        assert element_name in result.get_pulled_elements()
    
    111
    +        assert "Attempting to retrieve buildtree from remotes" in result.stderr
    
    112
    +        assert os.path.isdir(buildtreedir)
    
    113
    +        assert cli.get_element_state(project, element_name) == 'cached'
    
    114
    +
    
    115
    +        # Test that we are able to 'complete' an artifact on a server which is cached partially,
    
    116
    +        # but has now been configured for full artifact pushing. This should require only pushing
    
    117
    +        # the missing blobs, which should be those of just the buildtree. In this case changing
    
    118
    +        # share3 to full pushes should exercise this
    
    119
    +        cli.configure({'artifacts': {'url': share3.repo, 'push': True}})
    
    120
    +        result = cli.run(project=project, args=['push', element_name])
    
    121
    +        assert element_name in result.get_pushed_elements()

  • tests/testutils/runcli.py
    ... ... @@ -191,6 +191,13 @@ class Result():
    191 191
     
    
    192 192
             return list(pushed)
    
    193 193
     
    
    194
    +    def get_partial_pushed_elements(self):
    
    195
    +        pushed = re.findall(r'\[\s*push:(\S+)\s*\]\s*INFO\s*Pushed partial artifact', self.stderr)
    
    196
    +        if pushed is None:
    
    197
    +            return []
    
    198
    +
    
    199
    +        return list(pushed)
    
    200
    +
    
    194 201
         def get_pulled_elements(self):
    
    195 202
             pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled artifact', self.stderr)
    
    196 203
             if pulled is None:
    
    ... ... @@ -198,6 +205,13 @@ class Result():
    198 205
     
    
    199 206
             return list(pulled)
    
    200 207
     
    
    208
    +    def get_partial_pulled_elements(self):
    
    209
    +        pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled partial artifact', self.stderr)
    
    210
    +        if pulled is None:
    
    211
    +            return []
    
    212
    +
    
    213
    +        return list(pulled)
    
    214
    +
    
    201 215
     
    
    202 216
     class Cli():
    
    203 217
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]