[Notes] [Git][BuildStream/buildstream][chandan/fix-checkout-none-1.2] 31 commits: ci: update freedesktop-sdk ref



Title: GitLab

Chandan Singh pushed to branch chandan/fix-checkout-none-1.2 at BuildStream / buildstream

Commits:

18 changed files:

Changes:

  • .gitlab-ci.yml
    ... ... @@ -168,7 +168,7 @@ docs:
    168 168
       variables:
    
    169 169
         bst_ext_url: git+https://gitlab.com/BuildStream/bst-external.git
    
    170 170
         bst_ext_ref: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
    
    171
    -    fd_sdk_ref: 718ea88089644a1ea5b488de0b90c2c565cb75f8 # 18.08.12
    
    171
    +    fd_sdk_ref: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.12
    
    172 172
       before_script:
    
    173 173
       - (cd dist && ./unpack.sh && cd buildstream && pip3 install .)
    
    174 174
       - pip3 install --user -e ${bst_ext_url}@${bst_ext_ref}#egg=bst_ext
    

  • NEWS
    1
    +=================
    
    2
    +buildstream 1.2.3
    
    3
    +=================
    
    4
    +
    
    5
    + o Fixed an unhandled exception when cleaning up a build sandbox (#153)
    
    6
    +
    
    7
    + o Fixed race condition when calculating cache size and commiting artifacts
    
    8
    +
    
    9
    + o Fixed regression where terminating with `^C` results in a double user interrogation (#693)
    
    10
    +
    
    11
    + o Fixed regression in summary when builds are terminated (#479)
    
    12
    +
    
    13
    + o Fixed regression where irrelevant status messages appear from git sources
    
    14
    +
    
    15
    + o Improve performance of artifact uploads by batching file transfers (#676/#677)
    
    16
    +
    
    17
    + o Fixed performance of artifact downloads by batching file transfers (#554)
    
    18
    +
    
    19
    + o Fixed checks for paths which escape the project directory (#673)
    
    20
    +
    
    1 21
     =================
    
    2 22
     buildstream 1.2.2
    
    3 23
     =================
    

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -277,7 +277,7 @@ class ArtifactCache():
    277 277
                               "Please increase the cache-quota in {}."
    
    278 278
                               .format(self.context.config_origin or default_conf))
    
    279 279
     
    
    280
    -                if self.get_quota_exceeded():
    
    280
    +                if self.has_quota_exceeded():
    
    281 281
                         raise ArtifactError("Cache too full. Aborting.",
    
    282 282
                                             detail=detail,
    
    283 283
                                             reason="cache-too-full")
    
    ... ... @@ -364,14 +364,14 @@ class ArtifactCache():
    364 364
             self._cache_size = cache_size
    
    365 365
             self._write_cache_size(self._cache_size)
    
    366 366
     
    
    367
    -    # get_quota_exceeded()
    
    367
    +    # has_quota_exceeded()
    
    368 368
         #
    
    369 369
         # Checks if the current artifact cache size exceeds the quota.
    
    370 370
         #
    
    371 371
         # Returns:
    
    372 372
         #    (bool): True of the quota is exceeded
    
    373 373
         #
    
    374
    -    def get_quota_exceeded(self):
    
    374
    +    def has_quota_exceeded(self):
    
    375 375
             return self.get_cache_size() > self._cache_quota
    
    376 376
     
    
    377 377
         ################################################
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -43,6 +43,11 @@ from .._exceptions import ArtifactError
    43 43
     from . import ArtifactCache
    
    44 44
     
    
    45 45
     
    
    46
    +# The default limit for gRPC messages is 4 MiB.
    
    47
    +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    48
    +_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    49
    +
    
    50
    +
    
    46 51
     # A CASCache manages artifacts in a CAS repository as specified in the
    
    47 52
     # Remote Execution API.
    
    48 53
     #
    
    ... ... @@ -76,6 +81,7 @@ class CASCache(ArtifactCache):
    76 81
         ################################################
    
    77 82
         #     Implementation of abstract methods       #
    
    78 83
         ################################################
    
    84
    +
    
    79 85
         def contains(self, element, key):
    
    80 86
             refpath = self._refpath(self.get_artifact_fullname(element, key))
    
    81 87
     
    
    ... ... @@ -115,7 +121,7 @@ class CASCache(ArtifactCache):
    115 121
         def commit(self, element, content, keys):
    
    116 122
             refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    117 123
     
    
    118
    -        tree = self._create_tree(content)
    
    124
    +        tree = self._commit_directory(content)
    
    119 125
     
    
    120 126
             for ref in refs:
    
    121 127
                 self.set_ref(ref, tree)
    
    ... ... @@ -151,6 +157,7 @@ class CASCache(ArtifactCache):
    151 157
             q = multiprocessing.Queue()
    
    152 158
             for remote_spec in remote_specs:
    
    153 159
                 # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    160
    +            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    154 161
                 p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
    
    155 162
     
    
    156 163
                 try:
    
    ... ... @@ -263,109 +270,69 @@ class CASCache(ArtifactCache):
    263 270
     
    
    264 271
             self.set_ref(newref, tree)
    
    265 272
     
    
    273
    +    def _push_refs_to_remote(self, refs, remote):
    
    274
    +        skipped_remote = True
    
    275
    +        try:
    
    276
    +            for ref in refs:
    
    277
    +                tree = self.resolve_ref(ref)
    
    278
    +
    
    279
    +                # Check whether ref is already on the server in which case
    
    280
    +                # there is no need to push the artifact
    
    281
    +                try:
    
    282
    +                    request = buildstream_pb2.GetReferenceRequest()
    
    283
    +                    request.key = ref
    
    284
    +                    response = remote.ref_storage.GetReference(request)
    
    285
    +
    
    286
    +                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    287
    +                        # ref is already on the server with the same tree
    
    288
    +                        continue
    
    289
    +
    
    290
    +                except grpc.RpcError as e:
    
    291
    +                    if e.code() != grpc.StatusCode.NOT_FOUND:
    
    292
    +                        # Intentionally re-raise RpcError for outer except block.
    
    293
    +                        raise
    
    294
    +
    
    295
    +                self._send_directory(remote, tree)
    
    296
    +
    
    297
    +                request = buildstream_pb2.UpdateReferenceRequest()
    
    298
    +                request.keys.append(ref)
    
    299
    +                request.digest.hash = tree.hash
    
    300
    +                request.digest.size_bytes = tree.size_bytes
    
    301
    +                remote.ref_storage.UpdateReference(request)
    
    302
    +
    
    303
    +                skipped_remote = False
    
    304
    +        except grpc.RpcError as e:
    
    305
    +            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    306
    +                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    307
    +
    
    308
    +        return not skipped_remote
    
    309
    +
    
    266 310
         def push(self, element, keys):
    
    267
    -        refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    311
    +
    
    312
    +        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
    
    268 313
     
    
    269 314
             project = element._get_project()
    
    270 315
     
    
    271 316
             push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    272 317
     
    
    273 318
             pushed = False
    
    274
    -        display_key = element._get_brief_display_key()
    
    319
    +
    
    275 320
             for remote in push_remotes:
    
    276 321
                 remote.init()
    
    277
    -            skipped_remote = True
    
    322
    +            display_key = element._get_brief_display_key()
    
    278 323
                 element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
    
    279 324
     
    
    280
    -            try:
    
    281
    -                for ref in refs:
    
    282
    -                    tree = self.resolve_ref(ref)
    
    283
    -
    
    284
    -                    # Check whether ref is already on the server in which case
    
    285
    -                    # there is no need to push the artifact
    
    286
    -                    try:
    
    287
    -                        request = buildstream_pb2.GetReferenceRequest()
    
    288
    -                        request.key = ref
    
    289
    -                        response = remote.ref_storage.GetReference(request)
    
    290
    -
    
    291
    -                        if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    292
    -                            # ref is already on the server with the same tree
    
    293
    -                            continue
    
    294
    -
    
    295
    -                    except grpc.RpcError as e:
    
    296
    -                        if e.code() != grpc.StatusCode.NOT_FOUND:
    
    297
    -                            # Intentionally re-raise RpcError for outer except block.
    
    298
    -                            raise
    
    299
    -
    
    300
    -                    missing_blobs = {}
    
    301
    -                    required_blobs = self._required_blobs(tree)
    
    302
    -
    
    303
    -                    # Limit size of FindMissingBlobs request
    
    304
    -                    for required_blobs_group in _grouper(required_blobs, 512):
    
    305
    -                        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    306
    -
    
    307
    -                        for required_digest in required_blobs_group:
    
    308
    -                            d = request.blob_digests.add()
    
    309
    -                            d.hash = required_digest.hash
    
    310
    -                            d.size_bytes = required_digest.size_bytes
    
    311
    -
    
    312
    -                        response = remote.cas.FindMissingBlobs(request)
    
    313
    -                        for digest in response.missing_blob_digests:
    
    314
    -                            d = remote_execution_pb2.Digest()
    
    315
    -                            d.hash = digest.hash
    
    316
    -                            d.size_bytes = digest.size_bytes
    
    317
    -                            missing_blobs[d.hash] = d
    
    318
    -
    
    319
    -                    # Upload any blobs missing on the server
    
    320
    -                    skipped_remote = False
    
    321
    -                    for digest in missing_blobs.values():
    
    322
    -                        uuid_ = uuid.uuid4()
    
    323
    -                        resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
    
    324
    -                                                  digest.hash, str(digest.size_bytes)])
    
    325
    -
    
    326
    -                        def request_stream(resname):
    
    327
    -                            with open(self.objpath(digest), 'rb') as f:
    
    328
    -                                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    329
    -                                offset = 0
    
    330
    -                                finished = False
    
    331
    -                                remaining = digest.size_bytes
    
    332
    -                                while not finished:
    
    333
    -                                    chunk_size = min(remaining, 64 * 1024)
    
    334
    -                                    remaining -= chunk_size
    
    335
    -
    
    336
    -                                    request = bytestream_pb2.WriteRequest()
    
    337
    -                                    request.write_offset = offset
    
    338
    -                                    # max. 64 kB chunks
    
    339
    -                                    request.data = f.read(chunk_size)
    
    340
    -                                    request.resource_name = resname
    
    341
    -                                    request.finish_write = remaining <= 0
    
    342
    -                                    yield request
    
    343
    -                                    offset += chunk_size
    
    344
    -                                    finished = request.finish_write
    
    345
    -                        response = remote.bytestream.Write(request_stream(resource_name))
    
    346
    -
    
    347
    -                    request = buildstream_pb2.UpdateReferenceRequest()
    
    348
    -                    request.keys.append(ref)
    
    349
    -                    request.digest.hash = tree.hash
    
    350
    -                    request.digest.size_bytes = tree.size_bytes
    
    351
    -                    remote.ref_storage.UpdateReference(request)
    
    352
    -
    
    353
    -                    pushed = True
    
    354
    -
    
    355
    -                if not skipped_remote:
    
    356
    -                    element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    357
    -
    
    358
    -            except grpc.RpcError as e:
    
    359
    -                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    360
    -                    raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    361
    -
    
    362
    -            if skipped_remote:
    
    325
    +            if self._push_refs_to_remote(refs, remote):
    
    326
    +                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    327
    +                pushed = True
    
    328
    +            else:
    
    363 329
                     self.context.message(Message(
    
    364 330
                         None,
    
    365 331
                         MessageType.INFO,
    
    366 332
                         "Remote ({}) already has {} cached".format(
    
    367 333
                             remote.spec.url, element._get_brief_display_key())
    
    368 334
                     ))
    
    335
    +
    
    369 336
             return pushed
    
    370 337
     
    
    371 338
         ################################################
    
    ... ... @@ -451,7 +418,7 @@ class CASCache(ArtifactCache):
    451 418
         def set_ref(self, ref, tree):
    
    452 419
             refpath = self._refpath(ref)
    
    453 420
             os.makedirs(os.path.dirname(refpath), exist_ok=True)
    
    454
    -        with utils.save_file_atomic(refpath, 'wb') as f:
    
    421
    +        with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f:
    
    455 422
                 f.write(tree.SerializeToString())
    
    456 423
     
    
    457 424
         # resolve_ref():
    
    ... ... @@ -594,6 +561,7 @@ class CASCache(ArtifactCache):
    594 561
         ################################################
    
    595 562
         #             Local Private Methods            #
    
    596 563
         ################################################
    
    564
    +
    
    597 565
         def _checkout(self, dest, tree):
    
    598 566
             os.makedirs(dest, exist_ok=True)
    
    599 567
     
    
    ... ... @@ -623,7 +591,21 @@ class CASCache(ArtifactCache):
    623 591
         def _refpath(self, ref):
    
    624 592
             return os.path.join(self.casdir, 'refs', 'heads', ref)
    
    625 593
     
    
    626
    -    def _create_tree(self, path, *, digest=None):
    
    594
    +    # _commit_directory():
    
    595
    +    #
    
    596
    +    # Adds local directory to content addressable store.
    
    597
    +    #
    
    598
    +    # Adds files, symbolic links and recursively other directories in
    
    599
    +    # a local directory to the content addressable store.
    
    600
    +    #
    
    601
    +    # Args:
    
    602
    +    #     path (str): Path to the directory to add.
    
    603
    +    #     dir_digest (Digest): An optional Digest object to use.
    
    604
    +    #
    
    605
    +    # Returns:
    
    606
    +    #     (Digest): Digest object for the directory added.
    
    607
    +    #
    
    608
    +    def _commit_directory(self, path, *, dir_digest=None):
    
    627 609
             directory = remote_execution_pb2.Directory()
    
    628 610
     
    
    629 611
             for name in sorted(os.listdir(path)):
    
    ... ... @@ -632,7 +614,7 @@ class CASCache(ArtifactCache):
    632 614
                 if stat.S_ISDIR(mode):
    
    633 615
                     dirnode = directory.directories.add()
    
    634 616
                     dirnode.name = name
    
    635
    -                self._create_tree(full_path, digest=dirnode.digest)
    
    617
    +                self._commit_directory(full_path, dir_digest=dirnode.digest)
    
    636 618
                 elif stat.S_ISREG(mode):
    
    637 619
                     filenode = directory.files.add()
    
    638 620
                     filenode.name = name
    
    ... ... @@ -645,7 +627,8 @@ class CASCache(ArtifactCache):
    645 627
                 else:
    
    646 628
                     raise ArtifactError("Unsupported file type for {}".format(full_path))
    
    647 629
     
    
    648
    -        return self.add_object(digest=digest, buffer=directory.SerializeToString())
    
    630
    +        return self.add_object(digest=dir_digest,
    
    631
    +                               buffer=directory.SerializeToString())
    
    649 632
     
    
    650 633
         def _get_subdir(self, tree, subdir):
    
    651 634
             head, name = os.path.split(subdir)
    
    ... ... @@ -756,16 +739,16 @@ class CASCache(ArtifactCache):
    756 739
                 #
    
    757 740
                 q.put(str(e))
    
    758 741
     
    
    759
    -    def _required_blobs(self, tree):
    
    742
    +    def _required_blobs(self, directory_digest):
    
    760 743
             # parse directory, and recursively add blobs
    
    761 744
             d = remote_execution_pb2.Digest()
    
    762
    -        d.hash = tree.hash
    
    763
    -        d.size_bytes = tree.size_bytes
    
    745
    +        d.hash = directory_digest.hash
    
    746
    +        d.size_bytes = directory_digest.size_bytes
    
    764 747
             yield d
    
    765 748
     
    
    766 749
             directory = remote_execution_pb2.Directory()
    
    767 750
     
    
    768
    -        with open(self.objpath(tree), 'rb') as f:
    
    751
    +        with open(self.objpath(directory_digest), 'rb') as f:
    
    769 752
                 directory.ParseFromString(f.read())
    
    770 753
     
    
    771 754
             for filenode in directory.files:
    
    ... ... @@ -777,50 +760,203 @@ class CASCache(ArtifactCache):
    777 760
             for dirnode in directory.directories:
    
    778 761
                 yield from self._required_blobs(dirnode.digest)
    
    779 762
     
    
    780
    -    def _fetch_blob(self, remote, digest, out):
    
    763
    +    def _fetch_blob(self, remote, digest, stream):
    
    781 764
             resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    782 765
             request = bytestream_pb2.ReadRequest()
    
    783 766
             request.resource_name = resource_name
    
    784 767
             request.read_offset = 0
    
    785 768
             for response in remote.bytestream.Read(request):
    
    786
    -            out.write(response.data)
    
    769
    +            stream.write(response.data)
    
    770
    +        stream.flush()
    
    787 771
     
    
    788
    -        out.flush()
    
    789
    -        assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    772
    +        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    790 773
     
    
    791
    -    def _fetch_directory(self, remote, tree):
    
    792
    -        objpath = self.objpath(tree)
    
    774
    +    # _ensure_blob():
    
    775
    +    #
    
    776
    +    # Fetch and add blob if it's not already local.
    
    777
    +    #
    
    778
    +    # Args:
    
    779
    +    #     remote (Remote): The remote to use.
    
    780
    +    #     digest (Digest): Digest object for the blob to fetch.
    
    781
    +    #
    
    782
    +    # Returns:
    
    783
    +    #     (str): The path of the object
    
    784
    +    #
    
    785
    +    def _ensure_blob(self, remote, digest):
    
    786
    +        objpath = self.objpath(digest)
    
    793 787
             if os.path.exists(objpath):
    
    794
    -            # already in local cache
    
    795
    -            return
    
    788
    +            # already in local repository
    
    789
    +            return objpath
    
    796 790
     
    
    797
    -        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    798
    -            self._fetch_blob(remote, tree, out)
    
    791
    +        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    792
    +            self._fetch_blob(remote, digest, f)
    
    799 793
     
    
    800
    -            directory = remote_execution_pb2.Directory()
    
    794
    +            added_digest = self.add_object(path=f.name)
    
    795
    +            assert added_digest.hash == digest.hash
    
    801 796
     
    
    802
    -            with open(out.name, 'rb') as f:
    
    803
    -                directory.ParseFromString(f.read())
    
    797
    +        return objpath
    
    804 798
     
    
    805
    -            for filenode in directory.files:
    
    806
    -                fileobjpath = self.objpath(tree)
    
    807
    -                if os.path.exists(fileobjpath):
    
    808
    -                    # already in local cache
    
    809
    -                    continue
    
    799
    +    def _batch_download_complete(self, batch):
    
    800
    +        for digest, data in batch.send():
    
    801
    +            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    802
    +                f.write(data)
    
    803
    +                f.flush()
    
    804
    +
    
    805
    +                added_digest = self.add_object(path=f.name)
    
    806
    +                assert added_digest.hash == digest.hash
    
    807
    +
    
    808
    +    # Helper function for _fetch_directory().
    
    809
    +    def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
    
    810
    +        self._batch_download_complete(batch)
    
    811
    +
    
    812
    +        # All previously scheduled directories are now locally available,
    
    813
    +        # move them to the processing queue.
    
    814
    +        fetch_queue.extend(fetch_next_queue)
    
    815
    +        fetch_next_queue.clear()
    
    816
    +        return _CASBatchRead(remote)
    
    817
    +
    
    818
    +    # Helper function for _fetch_directory().
    
    819
    +    def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
    
    820
    +        in_local_cache = os.path.exists(self.objpath(digest))
    
    810 821
     
    
    811
    -                with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    812
    -                    self._fetch_blob(remote, filenode.digest, f)
    
    822
    +        if in_local_cache:
    
    823
    +            # Skip download, already in local cache.
    
    824
    +            pass
    
    825
    +        elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    826
    +                not remote.batch_read_supported):
    
    827
    +            # Too large for batch request, download in independent request.
    
    828
    +            self._ensure_blob(remote, digest)
    
    829
    +            in_local_cache = True
    
    830
    +        else:
    
    831
    +            if not batch.add(digest):
    
    832
    +                # Not enough space left in batch request.
    
    833
    +                # Complete pending batch first.
    
    834
    +                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    835
    +                batch.add(digest)
    
    836
    +
    
    837
    +        if recursive:
    
    838
    +            if in_local_cache:
    
    839
    +                # Add directory to processing queue.
    
    840
    +                fetch_queue.append(digest)
    
    841
    +            else:
    
    842
    +                # Directory will be available after completing pending batch.
    
    843
    +                # Add directory to deferred processing queue.
    
    844
    +                fetch_next_queue.append(digest)
    
    845
    +
    
    846
    +        return batch
    
    847
    +
    
    848
    +    # _fetch_directory():
    
    849
    +    #
    
    850
    +    # Fetches remote directory and adds it to content addressable store.
    
    851
    +    #
    
    852
    +    # Fetches files, symbolic links and recursively other directories in
    
    853
    +    # the remote directory and adds them to the content addressable
    
    854
    +    # store.
    
    855
    +    #
    
    856
    +    # Args:
    
    857
    +    #     remote (Remote): The remote to use.
    
    858
    +    #     dir_digest (Digest): Digest object for the directory to fetch.
    
    859
    +    #
    
    860
    +    def _fetch_directory(self, remote, dir_digest):
    
    861
    +        fetch_queue = [dir_digest]
    
    862
    +        fetch_next_queue = []
    
    863
    +        batch = _CASBatchRead(remote)
    
    864
    +
    
    865
    +        while len(fetch_queue) + len(fetch_next_queue) > 0:
    
    866
    +            if len(fetch_queue) == 0:
    
    867
    +                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    868
    +
    
    869
    +            dir_digest = fetch_queue.pop(0)
    
    813 870
     
    
    814
    -                    digest = self.add_object(path=f.name)
    
    815
    -                    assert digest.hash == filenode.digest.hash
    
    871
    +            objpath = self._ensure_blob(remote, dir_digest)
    
    872
    +
    
    873
    +            directory = remote_execution_pb2.Directory()
    
    874
    +            with open(objpath, 'rb') as f:
    
    875
    +                directory.ParseFromString(f.read())
    
    816 876
     
    
    817 877
                 for dirnode in directory.directories:
    
    818
    -                self._fetch_directory(remote, dirnode.digest)
    
    878
    +                batch = self._fetch_directory_node(remote, dirnode.digest, batch,
    
    879
    +                                                   fetch_queue, fetch_next_queue, recursive=True)
    
    880
    +
    
    881
    +            for filenode in directory.files:
    
    882
    +                batch = self._fetch_directory_node(remote, filenode.digest, batch,
    
    883
    +                                                   fetch_queue, fetch_next_queue)
    
    884
    +
    
    885
    +        # Fetch final batch
    
    886
    +        self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    887
    +
    
    888
    +    def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
    
    889
    +        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
    
    890
    +                                  digest.hash, str(digest.size_bytes)])
    
    891
    +
    
    892
    +        def request_stream(resname, instream):
    
    893
    +            offset = 0
    
    894
    +            finished = False
    
    895
    +            remaining = digest.size_bytes
    
    896
    +            while not finished:
    
    897
    +                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    898
    +                remaining -= chunk_size
    
    899
    +
    
    900
    +                request = bytestream_pb2.WriteRequest()
    
    901
    +                request.write_offset = offset
    
    902
    +                # max. _MAX_PAYLOAD_BYTES chunks
    
    903
    +                request.data = instream.read(chunk_size)
    
    904
    +                request.resource_name = resname
    
    905
    +                request.finish_write = remaining <= 0
    
    906
    +
    
    907
    +                yield request
    
    908
    +
    
    909
    +                offset += chunk_size
    
    910
    +                finished = request.finish_write
    
    911
    +
    
    912
    +        response = remote.bytestream.Write(request_stream(resource_name, stream))
    
    913
    +
    
    914
    +        assert response.committed_size == digest.size_bytes
    
    915
    +
    
    916
    +    def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
    
    917
    +        required_blobs = self._required_blobs(digest)
    
    918
    +
    
    919
    +        missing_blobs = dict()
    
    920
    +        # Limit size of FindMissingBlobs request
    
    921
    +        for required_blobs_group in _grouper(required_blobs, 512):
    
    922
    +            request = remote_execution_pb2.FindMissingBlobsRequest()
    
    923
    +
    
    924
    +            for required_digest in required_blobs_group:
    
    925
    +                d = request.blob_digests.add()
    
    926
    +                d.hash = required_digest.hash
    
    927
    +                d.size_bytes = required_digest.size_bytes
    
    928
    +
    
    929
    +            response = remote.cas.FindMissingBlobs(request)
    
    930
    +            for missing_digest in response.missing_blob_digests:
    
    931
    +                d = remote_execution_pb2.Digest()
    
    932
    +                d.hash = missing_digest.hash
    
    933
    +                d.size_bytes = missing_digest.size_bytes
    
    934
    +                missing_blobs[d.hash] = d
    
    935
    +
    
    936
    +        # Upload any blobs missing on the server
    
    937
    +        self._send_blobs(remote, missing_blobs.values(), u_uid)
    
    938
    +
    
    939
    +    def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
    
    940
    +        batch = _CASBatchUpdate(remote)
    
    941
    +
    
    942
    +        for digest in digests:
    
    943
    +            with open(self.objpath(digest), 'rb') as f:
    
    944
    +                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    945
    +
    
    946
    +                if (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    947
    +                        not remote.batch_update_supported):
    
    948
    +                    # Too large for batch request, upload in independent request.
    
    949
    +                    self._send_blob(remote, digest, f, u_uid=u_uid)
    
    950
    +                else:
    
    951
    +                    if not batch.add(digest, f):
    
    952
    +                        # Not enough space left in batch request.
    
    953
    +                        # Complete pending batch first.
    
    954
    +                        batch.send()
    
    955
    +                        batch = _CASBatchUpdate(remote)
    
    956
    +                        batch.add(digest, f)
    
    819 957
     
    
    820
    -            # place directory blob only in final location when we've downloaded
    
    821
    -            # all referenced blobs to avoid dangling references in the repository
    
    822
    -            digest = self.add_object(path=out.name)
    
    823
    -            assert digest.hash == tree.hash
    
    958
    +        # Send final batch
    
    959
    +        batch.send()
    
    824 960
     
    
    825 961
     
    
    826 962
     # Represents a single remote CAS cache.
    
    ... ... @@ -870,11 +1006,129 @@ class _CASRemote():
    870 1006
     
    
    871 1007
                 self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    872 1008
                 self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    1009
    +            self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
    
    873 1010
                 self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
    
    874 1011
     
    
    1012
    +            self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    1013
    +            try:
    
    1014
    +                request = remote_execution_pb2.GetCapabilitiesRequest()
    
    1015
    +                response = self.capabilities.GetCapabilities(request)
    
    1016
    +                server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
    
    1017
    +                if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
    
    1018
    +                    self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
    
    1019
    +            except grpc.RpcError as e:
    
    1020
    +                # Simply use the defaults for servers that don't implement GetCapabilities()
    
    1021
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1022
    +                    raise
    
    1023
    +
    
    1024
    +            # Check whether the server supports BatchReadBlobs()
    
    1025
    +            self.batch_read_supported = False
    
    1026
    +            try:
    
    1027
    +                request = remote_execution_pb2.BatchReadBlobsRequest()
    
    1028
    +                response = self.cas.BatchReadBlobs(request)
    
    1029
    +                self.batch_read_supported = True
    
    1030
    +            except grpc.RpcError as e:
    
    1031
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1032
    +                    raise
    
    1033
    +
    
    1034
    +            # Check whether the server supports BatchUpdateBlobs()
    
    1035
    +            self.batch_update_supported = False
    
    1036
    +            try:
    
    1037
    +                request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1038
    +                response = self.cas.BatchUpdateBlobs(request)
    
    1039
    +                self.batch_update_supported = True
    
    1040
    +            except grpc.RpcError as e:
    
    1041
    +                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    1042
    +                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    1043
    +                    raise
    
    1044
    +
    
    875 1045
                 self._initialized = True
    
    876 1046
     
    
    877 1047
     
    
    1048
    +# Represents a batch of blobs queued for fetching.
    
    1049
    +#
    
    1050
    +class _CASBatchRead():
    
    1051
    +    def __init__(self, remote):
    
    1052
    +        self._remote = remote
    
    1053
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1054
    +        self._request = remote_execution_pb2.BatchReadBlobsRequest()
    
    1055
    +        self._size = 0
    
    1056
    +        self._sent = False
    
    1057
    +
    
    1058
    +    def add(self, digest):
    
    1059
    +        assert not self._sent
    
    1060
    +
    
    1061
    +        new_batch_size = self._size + digest.size_bytes
    
    1062
    +        if new_batch_size > self._max_total_size_bytes:
    
    1063
    +            # Not enough space left in current batch
    
    1064
    +            return False
    
    1065
    +
    
    1066
    +        request_digest = self._request.digests.add()
    
    1067
    +        request_digest.hash = digest.hash
    
    1068
    +        request_digest.size_bytes = digest.size_bytes
    
    1069
    +        self._size = new_batch_size
    
    1070
    +        return True
    
    1071
    +
    
    1072
    +    def send(self):
    
    1073
    +        assert not self._sent
    
    1074
    +        self._sent = True
    
    1075
    +
    
    1076
    +        if len(self._request.digests) == 0:
    
    1077
    +            return
    
    1078
    +
    
    1079
    +        batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    1080
    +
    
    1081
    +        for response in batch_response.responses:
    
    1082
    +            if response.status.code != grpc.StatusCode.OK.value[0]:
    
    1083
    +                raise ArtifactError("Failed to download blob {}: {}".format(
    
    1084
    +                    response.digest.hash, response.status.code))
    
    1085
    +            if response.digest.size_bytes != len(response.data):
    
    1086
    +                raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    1087
    +                    response.digest.hash, response.digest.size_bytes, len(response.data)))
    
    1088
    +
    
    1089
    +            yield (response.digest, response.data)
    
    1090
    +
    
    1091
    +
    
    1092
    +# Represents a batch of blobs queued for upload.
    
    1093
    +#
    
    1094
    +class _CASBatchUpdate():
    
    1095
    +    def __init__(self, remote):
    
    1096
    +        self._remote = remote
    
    1097
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1098
    +        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1099
    +        self._size = 0
    
    1100
    +        self._sent = False
    
    1101
    +
    
    1102
    +    def add(self, digest, stream):
    
    1103
    +        assert not self._sent
    
    1104
    +
    
    1105
    +        new_batch_size = self._size + digest.size_bytes
    
    1106
    +        if new_batch_size > self._max_total_size_bytes:
    
    1107
    +            # Not enough space left in current batch
    
    1108
    +            return False
    
    1109
    +
    
    1110
    +        blob_request = self._request.requests.add()
    
    1111
    +        blob_request.digest.hash = digest.hash
    
    1112
    +        blob_request.digest.size_bytes = digest.size_bytes
    
    1113
    +        blob_request.data = stream.read(digest.size_bytes)
    
    1114
    +        self._size = new_batch_size
    
    1115
    +        return True
    
    1116
    +
    
    1117
    +    def send(self):
    
    1118
    +        assert not self._sent
    
    1119
    +        self._sent = True
    
    1120
    +
    
    1121
    +        if len(self._request.requests) == 0:
    
    1122
    +            return
    
    1123
    +
    
    1124
    +        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    1125
    +
    
    1126
    +        for response in batch_response.responses:
    
    1127
    +            if response.status.code != grpc.StatusCode.OK.value[0]:
    
    1128
    +                raise ArtifactError("Failed to upload blob {}: {}".format(
    
    1129
    +                    response.digest.hash, response.status.code))
    
    1130
    +
    
    1131
    +
    
    878 1132
     def _grouper(iterable, n):
    
    879 1133
         while True:
    
    880 1134
             try:
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -38,8 +38,9 @@ from .._context import Context
    38 38
     from .cascache import CASCache
    
    39 39
     
    
    40 40
     
    
    41
    -# The default limit for gRPC messages is 4 MiB
    
    42
    -_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
    
    41
    +# The default limit for gRPC messages is 4 MiB.
    
    42
    +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    43
    +_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    43 44
     
    
    44 45
     
    
    45 46
     # Trying to push an artifact that is too large
    
    ... ... @@ -69,7 +70,7 @@ def create_server(repo, *, enable_push):
    69 70
             _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    70 71
     
    
    71 72
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    72
    -        _ContentAddressableStorageServicer(artifactcache), server)
    
    73
    +        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    73 74
     
    
    74 75
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    75 76
             _CapabilitiesServicer(), server)
    
    ... ... @@ -158,7 +159,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    158 159
     
    
    159 160
                     remaining = client_digest.size_bytes - request.read_offset
    
    160 161
                     while remaining > 0:
    
    161
    -                    chunk_size = min(remaining, 64 * 1024)
    
    162
    +                    chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    162 163
                         remaining -= chunk_size
    
    163 164
     
    
    164 165
                         response = bytestream_pb2.ReadResponse()
    
    ... ... @@ -223,9 +224,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    223 224
     
    
    224 225
     
    
    225 226
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    226
    -    def __init__(self, cas):
    
    227
    +    def __init__(self, cas, *, enable_push):
    
    227 228
             super().__init__()
    
    228 229
             self.cas = cas
    
    230
    +        self.enable_push = enable_push
    
    229 231
     
    
    230 232
         def FindMissingBlobs(self, request, context):
    
    231 233
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    ... ... @@ -242,7 +244,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    242 244
     
    
    243 245
             for digest in request.digests:
    
    244 246
                 batch_size += digest.size_bytes
    
    245
    -            if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
    
    247
    +            if batch_size > _MAX_PAYLOAD_BYTES:
    
    246 248
                     context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    247 249
                     return response
    
    248 250
     
    
    ... ... @@ -261,6 +263,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    261 263
     
    
    262 264
             return response
    
    263 265
     
    
    266
    +    def BatchUpdateBlobs(self, request, context):
    
    267
    +        response = remote_execution_pb2.BatchUpdateBlobsResponse()
    
    268
    +
    
    269
    +        if not self.enable_push:
    
    270
    +            context.set_code(grpc.StatusCode.PERMISSION_DENIED)
    
    271
    +            return response
    
    272
    +
    
    273
    +        batch_size = 0
    
    274
    +
    
    275
    +        for blob_request in request.requests:
    
    276
    +            digest = blob_request.digest
    
    277
    +
    
    278
    +            batch_size += digest.size_bytes
    
    279
    +            if batch_size > _MAX_PAYLOAD_BYTES:
    
    280
    +                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    281
    +                return response
    
    282
    +
    
    283
    +            blob_response = response.responses.add()
    
    284
    +            blob_response.digest.hash = digest.hash
    
    285
    +            blob_response.digest.size_bytes = digest.size_bytes
    
    286
    +
    
    287
    +            if len(blob_request.data) != digest.size_bytes:
    
    288
    +                blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    289
    +                continue
    
    290
    +
    
    291
    +            try:
    
    292
    +                _clean_up_cache(self.cas, digest.size_bytes)
    
    293
    +
    
    294
    +                with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    295
    +                    out.write(blob_request.data)
    
    296
    +                    out.flush()
    
    297
    +                    server_digest = self.cas.add_object(path=out.name)
    
    298
    +                    if server_digest.hash != digest.hash:
    
    299
    +                        blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    300
    +
    
    301
    +            except ArtifactTooLargeException:
    
    302
    +                blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
    
    303
    +
    
    304
    +        return response
    
    305
    +
    
    264 306
     
    
    265 307
     class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
    
    266 308
         def GetCapabilities(self, request, context):
    
    ... ... @@ -269,7 +311,7 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
    269 311
             cache_capabilities = response.cache_capabilities
    
    270 312
             cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
    
    271 313
             cache_capabilities.action_cache_update_capabilities.update_enabled = False
    
    272
    -        cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
    
    314
    +        cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    273 315
             cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
    
    274 316
     
    
    275 317
             response.deprecated_api_version.major = 2
    

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -119,6 +119,8 @@ class Job():
    119 119
             self._result = None                    # Return value of child action in the parent
    
    120 120
             self._tries = 0                        # Try count, for retryable jobs
    
    121 121
             self._skipped_flag = False             # Indicate whether the job was skipped.
    
    122
    +        self._terminated = False               # Whether this job has been explicitly terminated
    
    123
    +
    
    122 124
             # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
    
    123 125
             #
    
    124 126
             self._retry_flag = True
    
    ... ... @@ -188,6 +190,8 @@ class Job():
    188 190
             # Terminate the process using multiprocessing API pathway
    
    189 191
             self._process.terminate()
    
    190 192
     
    
    193
    +        self._terminated = True
    
    194
    +
    
    191 195
         # terminate_wait()
    
    192 196
         #
    
    193 197
         # Wait for terminated jobs to complete
    
    ... ... @@ -271,18 +275,22 @@ class Job():
    271 275
         # running the integration commands).
    
    272 276
         #
    
    273 277
         # Args:
    
    274
    -    #     (int): The plugin identifier for this task
    
    278
    +    #     task_id (int): The plugin identifier for this task
    
    275 279
         #
    
    276 280
         def set_task_id(self, task_id):
    
    277 281
             self._task_id = task_id
    
    278 282
     
    
    279 283
         # skipped
    
    280 284
         #
    
    285
    +    # This will evaluate to True if the job was skipped
    
    286
    +    # during processing, or if it was forcefully terminated.
    
    287
    +    #
    
    281 288
         # Returns:
    
    282
    -    #    bool: True if the job was skipped while processing.
    
    289
    +    #    (bool): Whether the job should appear as skipped
    
    290
    +    #
    
    283 291
         @property
    
    284 292
         def skipped(self):
    
    285
    -        return self._skipped_flag
    
    293
    +        return self._skipped_flag or self._terminated
    
    286 294
     
    
    287 295
         #######################################################
    
    288 296
         #                  Abstract Methods                   #
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -65,7 +65,7 @@ class BuildQueue(Queue):
    65 65
             # If the estimated size outgrows the quota, ask the scheduler
    
    66 66
             # to queue a job to actually check the real cache size.
    
    67 67
             #
    
    68
    -        if artifacts.get_quota_exceeded():
    
    68
    +        if artifacts.has_quota_exceeded():
    
    69 69
                 self._scheduler.check_cache_size()
    
    70 70
     
    
    71 71
         def done(self, job, element, result, success):
    

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -325,15 +325,22 @@ class Queue():
    325 325
                               detail=traceback.format_exc())
    
    326 326
                 self.failed_elements.append(element)
    
    327 327
             else:
    
    328
    -
    
    329
    -            # No exception occured, handle the success/failure state in the normal way
    
    330 328
                 #
    
    329
    +            # No exception occured in post processing
    
    330
    +            #
    
    331
    +
    
    332
    +            # Only place in the output done queue if the job
    
    333
    +            # was considered successful
    
    331 334
                 if success:
    
    332 335
                     self._done_queue.append(job)
    
    333
    -                if not job.skipped:
    
    334
    -                    self.processed_elements.append(element)
    
    335
    -                else:
    
    336
    -                    self.skipped_elements.append(element)
    
    336
    +
    
    337
    +            # A Job can be skipped whether or not it has failed,
    
    338
    +            # we want to only bookkeep them as processed or failed
    
    339
    +            # if they are not skipped.
    
    340
    +            if job.skipped:
    
    341
    +                self.skipped_elements.append(element)
    
    342
    +            elif success:
    
    343
    +                self.processed_elements.append(element)
    
    337 344
                 else:
    
    338 345
                     self.failed_elements.append(element)
    
    339 346
     
    

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -349,7 +349,7 @@ class Scheduler():
    349 349
             platform = Platform.get_platform()
    
    350 350
             artifacts = platform.artifactcache
    
    351 351
     
    
    352
    -        if not artifacts.get_quota_exceeded():
    
    352
    +        if not artifacts.has_quota_exceeded():
    
    353 353
                 return
    
    354 354
     
    
    355 355
             job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
    
    ... ... @@ -387,6 +387,15 @@ class Scheduler():
    387 387
         # A loop registered event callback for keyboard interrupts
    
    388 388
         #
    
    389 389
         def _interrupt_event(self):
    
    390
    +
    
    391
    +        # FIXME: This should not be needed, but for some reason we receive an
    
    392
    +        #        additional SIGINT event when the user hits ^C a second time
    
    393
    +        #        to inform us that they really intend to terminate; even though
    
    394
    +        #        we have disconnected our handlers at this time.
    
    395
    +        #
    
    396
    +        if self.terminated:
    
    397
    +            return
    
    398
    +
    
    390 399
             # Leave this to the frontend to decide, if no
    
    391 400
             # interrrupt callback was specified, then just terminate.
    
    392 401
             if self._interrupt_callback:
    

  • buildstream/_yaml.py
    ... ... @@ -467,7 +467,7 @@ def node_get_project_path(node, key, project_dir, *,
    467 467
                             "{}: Specified path '{}' does not exist"
    
    468 468
                             .format(provenance, path_str))
    
    469 469
     
    
    470
    -    is_inside = project_dir_path in full_resolved_path.parents or (
    
    470
    +    is_inside = project_dir_path.resolve() in full_resolved_path.parents or (
    
    471 471
             full_resolved_path == project_dir_path)
    
    472 472
     
    
    473 473
         if path.is_absolute() or not is_inside:
    

  • buildstream/element.py
    ... ... @@ -434,7 +434,7 @@ class Element(Plugin):
    434 434
                                                     visited=visited, recursed=True)
    
    435 435
     
    
    436 436
             # Yeild self only at the end, after anything needed has been traversed
    
    437
    -        if should_yield and (recurse or recursed) and (scope == Scope.ALL or scope == Scope.RUN):
    
    437
    +        if should_yield and (recurse or recursed) and scope != Scope.BUILD:
    
    438 438
                 yield self
    
    439 439
     
    
    440 440
         def search(self, scope, name):
    
    ... ... @@ -1289,17 +1289,21 @@ class Element(Plugin):
    1289 1289
                     if scope == Scope.BUILD:
    
    1290 1290
                         self.stage(sandbox)
    
    1291 1291
                     elif scope == Scope.RUN:
    
    1292
    -                    # Stage deps in the sandbox root
    
    1293 1292
                         if deps == 'run':
    
    1294
    -                        with self.timed_activity("Staging dependencies", silent_nested=True):
    
    1295
    -                            self.stage_dependency_artifacts(sandbox, scope)
    
    1296
    -
    
    1297
    -                        # Run any integration commands provided by the dependencies
    
    1298
    -                        # once they are all staged and ready
    
    1299
    -                        if integrate:
    
    1300
    -                            with self.timed_activity("Integrating sandbox"):
    
    1301
    -                                for dep in self.dependencies(scope):
    
    1302
    -                                    dep.integrate(sandbox)
    
    1293
    +                        dependency_scope = Scope.RUN
    
    1294
    +                    else:
    
    1295
    +                        dependency_scope = None
    
    1296
    +
    
    1297
    +                    # Stage deps in the sandbox root
    
    1298
    +                    with self.timed_activity("Staging dependencies", silent_nested=True):
    
    1299
    +                        self.stage_dependency_artifacts(sandbox, dependency_scope)
    
    1300
    +
    
    1301
    +                    # Run any integration commands provided by the dependencies
    
    1302
    +                    # once they are all staged and ready
    
    1303
    +                    if integrate:
    
    1304
    +                        with self.timed_activity("Integrating sandbox"):
    
    1305
    +                            for dep in self.dependencies(dependency_scope):
    
    1306
    +                                dep.integrate(sandbox)
    
    1303 1307
     
    
    1304 1308
                 yield sandbox
    
    1305 1309
     
    

  • buildstream/plugins/sources/git.py
    ... ... @@ -164,10 +164,18 @@ class GitMirror(SourceFetcher):
    164 164
                              cwd=self.mirror)
    
    165 165
     
    
    166 166
         def fetch(self, alias_override=None):
    
    167
    -        self.ensure(alias_override)
    
    168
    -        if not self.has_ref():
    
    169
    -            self._fetch(alias_override)
    
    170
    -        self.assert_ref()
    
    167
    +        # Resolve the URL for the message
    
    168
    +        resolved_url = self.source.translate_url(self.url,
    
    169
    +                                                 alias_override=alias_override,
    
    170
    +                                                 primary=self.primary)
    
    171
    +
    
    172
    +        with self.source.timed_activity("Fetching from {}"
    
    173
    +                                        .format(resolved_url),
    
    174
    +                                        silent_nested=True):
    
    175
    +            self.ensure(alias_override)
    
    176
    +            if not self.has_ref():
    
    177
    +                self._fetch(alias_override)
    
    178
    +            self.assert_ref()
    
    171 179
     
    
    172 180
         def has_ref(self):
    
    173 181
             if not self.ref:
    

  • buildstream/source.py
    ... ... @@ -585,28 +585,48 @@ class Source(Plugin):
    585 585
         #
    
    586 586
         def _fetch(self):
    
    587 587
             project = self._get_project()
    
    588
    -        source_fetchers = self.get_source_fetchers()
    
    588
    +        context = self._get_context()
    
    589
    +
    
    590
    +        # Silence the STATUS messages which might happen as a result
    
    591
    +        # of checking the source fetchers.
    
    592
    +        with context.silence():
    
    593
    +            source_fetchers = self.get_source_fetchers()
    
    589 594
     
    
    590 595
             # Use the source fetchers if they are provided
    
    591 596
             #
    
    592 597
             if source_fetchers:
    
    593
    -            for fetcher in source_fetchers:
    
    594
    -                alias = fetcher._get_alias()
    
    595
    -                for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
    
    596
    -                    try:
    
    597
    -                        fetcher.fetch(uri)
    
    598
    -                    # FIXME: Need to consider temporary vs. permanent failures,
    
    599
    -                    #        and how this works with retries.
    
    600
    -                    except BstError as e:
    
    601
    -                        last_error = e
    
    602
    -                        continue
    
    603
    -
    
    604
    -                    # No error, we're done with this fetcher
    
    605
    -                    break
    
    606 598
     
    
    607
    -                else:
    
    608
    -                    # No break occurred, raise the last detected error
    
    609
    -                    raise last_error
    
    599
    +            # Use a contorted loop here, this is to allow us to
    
    600
    +            # silence the messages which can result from consuming
    
    601
    +            # the items of source_fetchers, if it happens to be a generator.
    
    602
    +            #
    
    603
    +            source_fetchers = iter(source_fetchers)
    
    604
    +            try:
    
    605
    +
    
    606
    +                while True:
    
    607
    +
    
    608
    +                    with context.silence():
    
    609
    +                        fetcher = next(source_fetchers)
    
    610
    +
    
    611
    +                    alias = fetcher._get_alias()
    
    612
    +                    for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
    
    613
    +                        try:
    
    614
    +                            fetcher.fetch(uri)
    
    615
    +                        # FIXME: Need to consider temporary vs. permanent failures,
    
    616
    +                        #        and how this works with retries.
    
    617
    +                        except BstError as e:
    
    618
    +                            last_error = e
    
    619
    +                            continue
    
    620
    +
    
    621
    +                        # No error, we're done with this fetcher
    
    622
    +                        break
    
    623
    +
    
    624
    +                    else:
    
    625
    +                        # No break occurred, raise the last detected error
    
    626
    +                        raise last_error
    
    627
    +
    
    628
    +            except StopIteration:
    
    629
    +                pass
    
    610 630
     
    
    611 631
             # Default codepath is to reinstantiate the Source
    
    612 632
             #
    

  • buildstream/utils.py
    ... ... @@ -496,7 +496,7 @@ def get_bst_version():
    496 496
     
    
    497 497
     @contextmanager
    
    498 498
     def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
    
    499
    -                     errors=None, newline=None, closefd=True, opener=None):
    
    499
    +                     errors=None, newline=None, closefd=True, opener=None, tempdir=None):
    
    500 500
         """Save a file with a temporary name and rename it into place when ready.
    
    501 501
     
    
    502 502
         This is a context manager which is meant for saving data to files.
    
    ... ... @@ -523,8 +523,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
    523 523
         # https://bugs.python.org/issue8604
    
    524 524
     
    
    525 525
         assert os.path.isabs(filename), "The utils.save_file_atomic() parameter ``filename`` must be an absolute path"
    
    526
    -    dirname = os.path.dirname(filename)
    
    527
    -    fd, tempname = tempfile.mkstemp(dir=dirname)
    
    526
    +    if tempdir is None:
    
    527
    +        tempdir = os.path.dirname(filename)
    
    528
    +    fd, tempname = tempfile.mkstemp(dir=tempdir)
    
    528 529
         os.close(fd)
    
    529 530
     
    
    530 531
         f = open(tempname, mode=mode, buffering=buffering, encoding=encoding,
    
    ... ... @@ -556,6 +557,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
    556 557
     #
    
    557 558
     # Get the disk usage of a given directory in bytes.
    
    558 559
     #
    
    560
    +# This function assumes that files do not inadvertantly
    
    561
    +# disappear while this function is running.
    
    562
    +#
    
    559 563
     # Arguments:
    
    560 564
     #     (str) The path whose size to check.
    
    561 565
     #
    
    ... ... @@ -675,7 +679,7 @@ def _force_rmtree(rootpath, **kwargs):
    675 679
     
    
    676 680
         try:
    
    677 681
             shutil.rmtree(rootpath, **kwargs)
    
    678
    -    except shutil.Error as e:
    
    682
    +    except OSError as e:
    
    679 683
             raise UtilError("Failed to remove cache directory '{}': {}"
    
    680 684
                             .format(rootpath, e))
    
    681 685
     
    

  • tests/format/project.py
    ... ... @@ -181,3 +181,15 @@ def test_project_refs_options(cli, datafiles):
    181 181
     
    
    182 182
         # Assert that the cache keys are different
    
    183 183
         assert result1.output != result2.output
    
    184
    +
    
    185
    +
    
    186
    +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'element-path'))
    
    187
    +def test_element_path_project_path_contains_symlinks(cli, datafiles, tmpdir):
    
    188
    +    real_project = str(datafiles)
    
    189
    +    linked_project = os.path.join(str(tmpdir), 'linked')
    
    190
    +    os.symlink(real_project, linked_project)
    
    191
    +    os.makedirs(os.path.join(real_project, 'elements'), exist_ok=True)
    
    192
    +    with open(os.path.join(real_project, 'elements', 'element.bst'), 'w') as f:
    
    193
    +        f.write("kind: manual\n")
    
    194
    +    result = cli.run(project=linked_project, args=['show', 'element.bst'])
    
    195
    +    result.assert_success()

  • tests/frontend/buildcheckout.py
    ... ... @@ -65,9 +65,10 @@ def test_build_checkout(datafiles, cli, strict, hardlinks):
    65 65
     def test_build_checkout_deps(datafiles, cli, deps):
    
    66 66
         project = os.path.join(datafiles.dirname, datafiles.basename)
    
    67 67
         checkout = os.path.join(cli.directory, 'checkout')
    
    68
    +    element_name = "checkout-deps.bst"
    
    68 69
     
    
    69 70
         # First build it
    
    70
    -    result = cli.run(project=project, args=['build', 'target.bst'])
    
    71
    +    result = cli.run(project=project, args=['build', element_name])
    
    71 72
         result.assert_success()
    
    72 73
     
    
    73 74
         # Assert that after a successful build, the builddir is empty
    
    ... ... @@ -76,20 +77,15 @@ def test_build_checkout_deps(datafiles, cli, deps):
    76 77
         assert not os.listdir(builddir)
    
    77 78
     
    
    78 79
         # Now check it out
    
    79
    -    result = cli.run(project=project, args=['checkout', 'target.bst', '--deps', deps, checkout])
    
    80
    +    result = cli.run(project=project, args=['checkout', element_name, '--deps', deps, checkout])
    
    80 81
         result.assert_success()
    
    81 82
     
    
    82
    -    # Check that the executable hello file is found in the checkout
    
    83
    -    filename = os.path.join(checkout, 'usr', 'bin', 'hello')
    
    84
    -
    
    85
    -    if deps == "run":
    
    86
    -        assert os.path.exists(filename)
    
    87
    -    else:
    
    88
    -        assert not os.path.exists(filename)
    
    89
    -
    
    90
    -    # Check that the executable hello file is found in the checkout
    
    91
    -    filename = os.path.join(checkout, 'usr', 'include', 'pony.h')
    
    83
    +    # Verify output of this element
    
    84
    +    filename = os.path.join(checkout, 'etc', 'buildstream', 'config')
    
    85
    +    assert os.path.exists(filename)
    
    92 86
     
    
    87
    +    # Verify output of this element's runtime dependencies
    
    88
    +    filename = os.path.join(checkout, 'usr', 'bin', 'hello')
    
    93 89
         if deps == "run":
    
    94 90
             assert os.path.exists(filename)
    
    95 91
         else:
    

  • tests/frontend/project/elements/checkout-deps.bst
    1
    +kind: import
    
    2
    +description: It is important for this element to have both build and runtime dependencies
    
    3
    +sources:
    
    4
    +- kind: local
    
    5
    +  path: files/etc-files
    
    6
    +depends:
    
    7
    +- filename: import-dev.bst
    
    8
    +  type: build
    
    9
    +- filename: import-bin.bst
    
    10
    +  type: runtime

  • tests/frontend/project/files/etc-files/etc/buildstream/config
    1
    +config



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]