[Notes] [Git][BuildStream/buildstream][tpollard/494] 9 commits: ci: upper case variables



Title: GitLab

Tom Pollard pushed to branch tpollard/494 at BuildStream / buildstream

Commits:

10 changed files:

Changes:

  • .gitlab-ci.yml
    ... ... @@ -161,14 +161,14 @@ docs:
    161 161
     .overnight-tests: &overnight-tests-template
    
    162 162
       stage: test
    
    163 163
       variables:
    
    164
    -    bst_ext_url: git+https://gitlab.com/BuildStream/bst-external.git
    
    165
    -    bst_ext_ref: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
    
    166
    -    fd_sdk_ref: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.12
    
    164
    +    BST_EXT_URL: git+https://gitlab.com/BuildStream/bst-external.git
    
    165
    +    BST_EXT_REF: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
    
    166
    +    FD_SDK_REF: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.11-35-g88d7c22c
    
    167 167
       before_script:
    
    168 168
       - (cd dist && ./unpack.sh && cd buildstream && pip3 install .)
    
    169
    -  - pip3 install --user -e ${bst_ext_url}@${bst_ext_ref}#egg=bst_ext
    
    169
    +  - pip3 install --user -e ${BST_EXT_URL}@${BST_EXT_REF}#egg=bst_ext
    
    170 170
       - git clone https://gitlab.com/freedesktop-sdk/freedesktop-sdk.git
    
    171
    -  - git -C freedesktop-sdk checkout ${fd_sdk_ref}
    
    171
    +  - git -C freedesktop-sdk checkout ${FD_SDK_REF}
    
    172 172
       only:
    
    173 173
       - schedules
    
    174 174
     
    

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -38,8 +38,9 @@ CACHE_SIZE_FILE = "cache_size"
    38 38
     #     url (str): Location of the remote artifact cache
    
    39 39
     #     push (bool): Whether we should attempt to push artifacts to this cache,
    
    40 40
     #                  in addition to pulling from it.
    
    41
    +#     buildtrees (bool): Whether the default action of pull should include the artifact buildtree
    
    41 42
     #
    
    42
    -class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert client_key client_cert')):
    
    43
    +class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push buildtrees server_cert client_key client_cert')):
    
    43 44
     
    
    44 45
         # _new_from_config_node
    
    45 46
         #
    
    ... ... @@ -47,9 +48,10 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert cl
    47 48
         #
    
    48 49
         @staticmethod
    
    49 50
         def _new_from_config_node(spec_node, basedir=None):
    
    50
    -        _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert'])
    
    51
    +        _yaml.node_validate(spec_node, ['url', 'push', 'pullbuildtrees', 'server-cert', 'client-key', 'client-cert'])
    
    51 52
             url = _yaml.node_get(spec_node, str, 'url')
    
    52 53
             push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
    
    54
    +        buildtrees = _yaml.node_get(spec_node, bool, 'pullbuildtrees', default_value=False)
    
    53 55
             if not url:
    
    54 56
                 provenance = _yaml.node_get_provenance(spec_node, 'url')
    
    55 57
                 raise LoadError(LoadErrorReason.INVALID_DATA,
    
    ... ... @@ -77,10 +79,10 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert cl
    77 79
                 raise LoadError(LoadErrorReason.INVALID_DATA,
    
    78 80
                                 "{}: 'client-cert' was specified without 'client-key'".format(provenance))
    
    79 81
     
    
    80
    -        return ArtifactCacheSpec(url, push, server_cert, client_key, client_cert)
    
    82
    +        return ArtifactCacheSpec(url, push, buildtrees, server_cert, client_key, client_cert)
    
    81 83
     
    
    82 84
     
    
    83
    -ArtifactCacheSpec.__new__.__defaults__ = (None, None, None)
    
    85
    +ArtifactCacheSpec.__new__.__defaults__ = (False, None, None, None)
    
    84 86
     
    
    85 87
     
    
    86 88
     # An ArtifactCache manages artifacts.
    
    ... ... @@ -426,6 +428,22 @@ class ArtifactCache():
    426 428
             raise ImplError("Cache '{kind}' does not implement contains()"
    
    427 429
                             .format(kind=type(self).__name__))
    
    428 430
     
    
    431
    +    # contains_subdir_artifact():
    
    432
    +    #
    
    433
    +    # Check whether an artifact element contains a digest for a subdir
    
    434
    +    # which is populated in the cache, i.e non dangling.
    
    435
    +    #
    
    436
    +    # Args:
    
    437
    +    #     element (Element): The Element to check
    
    438
    +    #     key (str): The cache key to use
    
    439
    +    #     subdir (str): The subdir to check
    
    440
    +    #
    
    441
    +    # Returns: True if the subdir exists & is populated in the cache, False otherwise
    
    442
    +    #
    
    443
    +    def contains_subdir_artifact(self, element, key, subdir):
    
    444
    +        raise ImplError("Cache '{kind}' does not implement contains_subdir_artifact()"
    
    445
    +                        .format(kind=type(self).__name__))
    
    446
    +
    
    429 447
         # list_artifacts():
    
    430 448
         #
    
    431 449
         # List artifacts in this cache in LRU order.
    
    ... ... @@ -551,11 +569,12 @@ class ArtifactCache():
    551 569
         #     element (Element): The Element whose artifact is to be fetched
    
    552 570
         #     key (str): The cache key to use
    
    553 571
         #     progress (callable): The progress callback, if any
    
    572
    +    #     subdir (str): The optional specific subdir to pull
    
    554 573
         #
    
    555 574
         # Returns:
    
    556 575
         #   (bool): True if pull was successful, False if artifact was not available
    
    557 576
         #
    
    558
    -    def pull(self, element, key, *, progress=None):
    
    577
    +    def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None):
    
    559 578
             raise ImplError("Cache '{kind}' does not implement pull()"
    
    560 579
                             .format(kind=type(self).__name__))
    
    561 580
     
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -92,6 +92,16 @@ class CASCache(ArtifactCache):
    92 92
             # This assumes that the repository doesn't have any dangling pointers
    
    93 93
             return os.path.exists(refpath)
    
    94 94
     
    
    95
    +    def contains_subdir_artifact(self, element, key, subdir):
    
    96
    +        tree = self.resolve_ref(self.get_artifact_fullname(element, key))
    
    97
    +
    
    98
    +        # This assumes that the subdir digest is present in the element tree
    
    99
    +        subdirdigest = self._get_subdir(tree, subdir)
    
    100
    +        objpath = self.objpath(subdirdigest)
    
    101
    +
    
    102
    +        # True if subdir content is cached or if empty as expected
    
    103
    +        return os.path.exists(objpath)
    
    104
    +
    
    95 105
         def extract(self, element, key):
    
    96 106
             ref = self.get_artifact_fullname(element, key)
    
    97 107
     
    
    ... ... @@ -228,7 +238,7 @@ class CASCache(ArtifactCache):
    228 238
                 remotes_for_project = self._remotes[element._get_project()]
    
    229 239
                 return any(remote.spec.push for remote in remotes_for_project)
    
    230 240
     
    
    231
    -    def pull(self, element, key, *, progress=None):
    
    241
    +    def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None):
    
    232 242
             ref = self.get_artifact_fullname(element, key)
    
    233 243
     
    
    234 244
             project = element._get_project()
    
    ... ... @@ -247,8 +257,14 @@ class CASCache(ArtifactCache):
    247 257
                     tree.hash = response.digest.hash
    
    248 258
                     tree.size_bytes = response.digest.size_bytes
    
    249 259
     
    
    250
    -                self._fetch_directory(remote, tree)
    
    260
    +                # Check if the element artifact is present, if so just fetch subdir
    
    261
    +                if subdir and os.path.exists(self.objpath(tree)):
    
    262
    +                    self._fetch_subdir(remote, tree, subdir)
    
    263
    +                else:
    
    264
    +                    # Fetch artifact, excluded_subdirs determined in pullqueue
    
    265
    +                    self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
    
    251 266
     
    
    267
    +                # tree is the remote value, so is the same without or without dangling ref locally
    
    252 268
                     self.set_ref(ref, tree)
    
    253 269
     
    
    254 270
                     element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    ... ... @@ -668,8 +684,10 @@ class CASCache(ArtifactCache):
    668 684
                              stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
    
    669 685
     
    
    670 686
             for dirnode in directory.directories:
    
    671
    -            fullpath = os.path.join(dest, dirnode.name)
    
    672
    -            self._checkout(fullpath, dirnode.digest)
    
    687
    +            # Don't try to checkout a dangling ref
    
    688
    +            if os.path.exists(self.objpath(dirnode.digest)):
    
    689
    +                fullpath = os.path.join(dest, dirnode.name)
    
    690
    +                self._checkout(fullpath, dirnode.digest)
    
    673 691
     
    
    674 692
             for symlinknode in directory.symlinks:
    
    675 693
                 # symlink
    
    ... ... @@ -948,10 +966,12 @@ class CASCache(ArtifactCache):
    948 966
         #     remote (Remote): The remote to use.
    
    949 967
         #     dir_digest (Digest): Digest object for the directory to fetch.
    
    950 968
         #
    
    951
    -    def _fetch_directory(self, remote, dir_digest):
    
    969
    +    def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None):
    
    952 970
             fetch_queue = [dir_digest]
    
    953 971
             fetch_next_queue = []
    
    954 972
             batch = _CASBatchRead(remote)
    
    973
    +        if not excluded_subdirs:
    
    974
    +            excluded_subdirs = []
    
    955 975
     
    
    956 976
             while len(fetch_queue) + len(fetch_next_queue) > 0:
    
    957 977
                 if len(fetch_queue) == 0:
    
    ... ... @@ -966,8 +986,9 @@ class CASCache(ArtifactCache):
    966 986
                     directory.ParseFromString(f.read())
    
    967 987
     
    
    968 988
                 for dirnode in directory.directories:
    
    969
    -                batch = self._fetch_directory_node(remote, dirnode.digest, batch,
    
    970
    -                                                   fetch_queue, fetch_next_queue, recursive=True)
    
    989
    +                if dirnode.name not in excluded_subdirs:
    
    990
    +                    batch = self._fetch_directory_node(remote, dirnode.digest, batch,
    
    991
    +                                                       fetch_queue, fetch_next_queue, recursive=True)
    
    971 992
     
    
    972 993
                 for filenode in directory.files:
    
    973 994
                     batch = self._fetch_directory_node(remote, filenode.digest, batch,
    
    ... ... @@ -976,6 +997,10 @@ class CASCache(ArtifactCache):
    976 997
             # Fetch final batch
    
    977 998
             self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    978 999
     
    
    1000
    +    def _fetch_subdir(self, remote, tree, subdir):
    
    1001
    +        subdirdigest = self._get_subdir(tree, subdir)
    
    1002
    +        self._fetch_directory(remote, subdirdigest)
    
    1003
    +
    
    979 1004
         def _fetch_tree(self, remote, digest):
    
    980 1005
             # download but do not store the Tree object
    
    981 1006
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    ... ... @@ -1048,10 +1073,29 @@ class CASCache(ArtifactCache):
    1048 1073
                     missing_blobs[d.hash] = d
    
    1049 1074
     
    
    1050 1075
             # Upload any blobs missing on the server
    
    1051
    -        for blob_digest in missing_blobs.values():
    
    1052
    -            with open(self.objpath(blob_digest), 'rb') as f:
    
    1053
    -                assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
    
    1054
    -                self._send_blob(remote, blob_digest, f, u_uid=u_uid)
    
    1076
    +        self._send_blobs(remote, missing_blobs.values(), u_uid)
    
    1077
    +
    
    1078
    +    def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
    
    1079
    +        batch = _CASBatchUpdate(remote)
    
    1080
    +
    
    1081
    +        for digest in digests:
    
    1082
    +            with open(self.objpath(digest), 'rb') as f:
    
    1083
    +                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    1084
    +
    
    1085
    +                if (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    1086
    +                        not remote.batch_update_supported):
    
    1087
    +                    # Too large for batch request, upload in independent request.
    
    1088
    +                    self._send_blob(remote, digest, f, u_uid=u_uid)
    
    1089
    +                else:
    
    1090
    +                    if not batch.add(digest, f):
    
    1091
    +                        # Not enough space left in batch request.
    
    1092
    +                        # Complete pending batch first.
    
    1093
    +                        batch.send()
    
    1094
    +                        batch = _CASBatchUpdate(remote)
    
    1095
    +                        batch.add(digest, f)
    
    1096
    +
    
    1097
    +        # Send final batch
    
    1098
    +        batch.send()
    
    1055 1099
     
    
    1056 1100
     
    
    1057 1101
     # Represents a single remote CAS cache.
    
    ... ... @@ -1126,6 +1170,17 @@ class _CASRemote():
    1126 1170
                     if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1127 1171
                         raise
    
    1128 1172
     
    
    1173
    +            # Check whether the server supports BatchUpdateBlobs()
    
    1174
    +            self.batch_update_supported = False
    
    1175
    +            try:
    
    1176
    +                request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1177
    +                response = self.cas.BatchUpdateBlobs(request)
    
    1178
    +                self.batch_update_supported = True
    
    1179
    +            except grpc.RpcError as e:
    
    1180
    +                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    1181
    +                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    1182
    +                    raise
    
    1183
    +
    
    1129 1184
                 self._initialized = True
    
    1130 1185
     
    
    1131 1186
     
    
    ... ... @@ -1173,6 +1228,46 @@ class _CASBatchRead():
    1173 1228
                 yield (response.digest, response.data)
    
    1174 1229
     
    
    1175 1230
     
    
    1231
    +# Represents a batch of blobs queued for upload.
    
    1232
    +#
    
    1233
    +class _CASBatchUpdate():
    
    1234
    +    def __init__(self, remote):
    
    1235
    +        self._remote = remote
    
    1236
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1237
    +        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1238
    +        self._size = 0
    
    1239
    +        self._sent = False
    
    1240
    +
    
    1241
    +    def add(self, digest, stream):
    
    1242
    +        assert not self._sent
    
    1243
    +
    
    1244
    +        new_batch_size = self._size + digest.size_bytes
    
    1245
    +        if new_batch_size > self._max_total_size_bytes:
    
    1246
    +            # Not enough space left in current batch
    
    1247
    +            return False
    
    1248
    +
    
    1249
    +        blob_request = self._request.requests.add()
    
    1250
    +        blob_request.digest.hash = digest.hash
    
    1251
    +        blob_request.digest.size_bytes = digest.size_bytes
    
    1252
    +        blob_request.data = stream.read(digest.size_bytes)
    
    1253
    +        self._size = new_batch_size
    
    1254
    +        return True
    
    1255
    +
    
    1256
    +    def send(self):
    
    1257
    +        assert not self._sent
    
    1258
    +        self._sent = True
    
    1259
    +
    
    1260
    +        if len(self._request.requests) == 0:
    
    1261
    +            return
    
    1262
    +
    
    1263
    +        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    1264
    +
    
    1265
    +        for response in batch_response.responses:
    
    1266
    +            if response.status.code != grpc.StatusCode.OK.value[0]:
    
    1267
    +                raise ArtifactError("Failed to upload blob {}: {}".format(
    
    1268
    +                    response.digest.hash, response.status.code))
    
    1269
    +
    
    1270
    +
    
    1176 1271
     def _grouper(iterable, n):
    
    1177 1272
         while True:
    
    1178 1273
             try:
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -68,7 +68,7 @@ def create_server(repo, *, enable_push):
    68 68
             _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    69 69
     
    
    70 70
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    71
    -        _ContentAddressableStorageServicer(artifactcache), server)
    
    71
    +        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    72 72
     
    
    73 73
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    74 74
             _CapabilitiesServicer(), server)
    
    ... ... @@ -222,9 +222,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    222 222
     
    
    223 223
     
    
    224 224
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    225
    -    def __init__(self, cas):
    
    225
    +    def __init__(self, cas, *, enable_push):
    
    226 226
             super().__init__()
    
    227 227
             self.cas = cas
    
    228
    +        self.enable_push = enable_push
    
    228 229
     
    
    229 230
         def FindMissingBlobs(self, request, context):
    
    230 231
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    ... ... @@ -260,6 +261,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    260 261
     
    
    261 262
             return response
    
    262 263
     
    
    264
    +    def BatchUpdateBlobs(self, request, context):
    
    265
    +        response = remote_execution_pb2.BatchUpdateBlobsResponse()
    
    266
    +
    
    267
    +        if not self.enable_push:
    
    268
    +            context.set_code(grpc.StatusCode.PERMISSION_DENIED)
    
    269
    +            return response
    
    270
    +
    
    271
    +        batch_size = 0
    
    272
    +
    
    273
    +        for blob_request in request.requests:
    
    274
    +            digest = blob_request.digest
    
    275
    +
    
    276
    +            batch_size += digest.size_bytes
    
    277
    +            if batch_size > _MAX_PAYLOAD_BYTES:
    
    278
    +                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    279
    +                return response
    
    280
    +
    
    281
    +            blob_response = response.responses.add()
    
    282
    +            blob_response.digest.hash = digest.hash
    
    283
    +            blob_response.digest.size_bytes = digest.size_bytes
    
    284
    +
    
    285
    +            if len(blob_request.data) != digest.size_bytes:
    
    286
    +                blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    287
    +                continue
    
    288
    +
    
    289
    +            try:
    
    290
    +                _clean_up_cache(self.cas, digest.size_bytes)
    
    291
    +
    
    292
    +                with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    293
    +                    out.write(blob_request.data)
    
    294
    +                    out.flush()
    
    295
    +                    server_digest = self.cas.add_object(path=out.name)
    
    296
    +                    if server_digest.hash != digest.hash:
    
    297
    +                        blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    298
    +
    
    299
    +            except ArtifactTooLargeException:
    
    300
    +                blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
    
    301
    +
    
    302
    +        return response
    
    303
    +
    
    263 304
     
    
    264 305
     class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
    
    265 306
         def GetCapabilities(self, request, context):
    

  • buildstream/_frontend/cli.py
    ... ... @@ -305,10 +305,12 @@ def init(app, project_name, format_version, element_path, force):
    305 305
                   help="Allow tracking to cross junction boundaries")
    
    306 306
     @click.option('--track-save', default=False, is_flag=True,
    
    307 307
                   help="Deprecated: This is ignored")
    
    308
    +@click.option('--pull-buildtrees', default=False, is_flag=True,
    
    309
    +              help="Pull buildtrees from a remote cache server")
    
    308 310
     @click.argument('elements', nargs=-1,
    
    309 311
                     type=click.Path(readable=False))
    
    310 312
     @click.pass_obj
    
    311
    -def build(app, elements, all_, track_, track_save, track_all, track_except, track_cross_junctions):
    
    313
    +def build(app, elements, all_, track_, track_save, track_all, track_except, track_cross_junctions, pull_buildtrees):
    
    312 314
         """Build elements in a pipeline"""
    
    313 315
     
    
    314 316
         if (track_except or track_cross_junctions) and not (track_ or track_all):
    
    ... ... @@ -327,7 +329,8 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac
    327 329
                              track_targets=track_,
    
    328 330
                              track_except=track_except,
    
    329 331
                              track_cross_junctions=track_cross_junctions,
    
    330
    -                         build_all=all_)
    
    332
    +                         build_all=all_,
    
    333
    +                         pull_buildtrees=pull_buildtrees)
    
    331 334
     
    
    332 335
     
    
    333 336
     ##################################################################
    
    ... ... @@ -429,10 +432,12 @@ def track(app, elements, deps, except_, cross_junctions):
    429 432
                   help='The dependency artifacts to pull (default: none)')
    
    430 433
     @click.option('--remote', '-r',
    
    431 434
                   help="The URL of the remote cache (defaults to the first configured cache)")
    
    435
    +@click.option('--pull-buildtrees', default=False, is_flag=True,
    
    436
    +              help="Pull buildtrees from a remote cache server")
    
    432 437
     @click.argument('elements', nargs=-1,
    
    433 438
                     type=click.Path(readable=False))
    
    434 439
     @click.pass_obj
    
    435
    -def pull(app, elements, deps, remote):
    
    440
    +def pull(app, elements, deps, remote, pull_buildtrees):
    
    436 441
         """Pull a built artifact from the configured remote artifact cache.
    
    437 442
     
    
    438 443
         By default the artifact will be pulled one of the configured caches
    
    ... ... @@ -446,7 +451,7 @@ def pull(app, elements, deps, remote):
    446 451
             all:   All dependencies
    
    447 452
         """
    
    448 453
         with app.initialized(session_name="Pull"):
    
    449
    -        app.stream.pull(elements, selection=deps, remote=remote)
    
    454
    +        app.stream.pull(elements, selection=deps, remote=remote, pull_buildtrees=pull_buildtrees)
    
    450 455
     
    
    451 456
     
    
    452 457
     ##################################################################
    

  • buildstream/_scheduler/queues/pullqueue.py
    ... ... @@ -32,9 +32,20 @@ class PullQueue(Queue):
    32 32
         complete_name = "Pulled"
    
    33 33
         resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
    
    34 34
     
    
    35
    +    def __init__(self, scheduler, buildtrees=False):
    
    36
    +        super().__init__(scheduler)
    
    37
    +
    
    38
    +        # Current default exclusions on pull
    
    39
    +        self._excluded_subdirs = ["buildtree"]
    
    40
    +        self._subdir = None
    
    41
    +        # If buildtrees are to be pulled, remove the value from exclusion list
    
    42
    +        if buildtrees:
    
    43
    +            self._subdir = "buildtree"
    
    44
    +            self._excluded_subdirs.remove(self._subdir)
    
    45
    +
    
    35 46
         def process(self, element):
    
    36 47
             # returns whether an artifact was downloaded or not
    
    37
    -        if not element._pull():
    
    48
    +        if not element._pull(subdir=self._subdir, excluded_subdirs=self._excluded_subdirs):
    
    38 49
                 raise SkipJob(self.action_name)
    
    39 50
     
    
    40 51
         def status(self, element):
    
    ... ... @@ -49,7 +60,7 @@ class PullQueue(Queue):
    49 60
             if not element._can_query_cache():
    
    50 61
                 return QueueStatus.WAIT
    
    51 62
     
    
    52
    -        if element._pull_pending():
    
    63
    +        if element._pull_pending(subdir=self._subdir):
    
    53 64
                 return QueueStatus.READY
    
    54 65
             else:
    
    55 66
                 return QueueStatus.SKIP
    

  • buildstream/_stream.py
    ... ... @@ -160,12 +160,14 @@ class Stream():
    160 160
         #    track_cross_junctions (bool): Whether tracking should cross junction boundaries
    
    161 161
         #    build_all (bool): Whether to build all elements, or only those
    
    162 162
         #                      which are required to build the target.
    
    163
    +    #    pull_buildtrees (bool): Whether to pull buildtrees from a remote cache server
    
    163 164
         #
    
    164 165
         def build(self, targets, *,
    
    165 166
                   track_targets=None,
    
    166 167
                   track_except=None,
    
    167 168
                   track_cross_junctions=False,
    
    168
    -              build_all=False):
    
    169
    +              build_all=False,
    
    170
    +              pull_buildtrees=False):
    
    169 171
     
    
    170 172
             if build_all:
    
    171 173
                 selection = PipelineSelection.ALL
    
    ... ... @@ -195,7 +197,11 @@ class Stream():
    195 197
                 self._add_queue(track_queue, track=True)
    
    196 198
     
    
    197 199
             if self._artifacts.has_fetch_remotes():
    
    198
    -            self._add_queue(PullQueue(self._scheduler))
    
    200
    +            # Query if any of the user defined artifact servers have buildtrees set
    
    201
    +            for cache in self._context.artifact_cache_specs:
    
    202
    +                if cache.buildtrees:
    
    203
    +                    pull_buildtrees = True
    
    204
    +            self._add_queue(PullQueue(self._scheduler, buildtrees=pull_buildtrees))
    
    199 205
     
    
    200 206
             self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
    
    201 207
             self._add_queue(BuildQueue(self._scheduler))
    
    ... ... @@ -295,7 +301,8 @@ class Stream():
    295 301
         #
    
    296 302
         def pull(self, targets, *,
    
    297 303
                  selection=PipelineSelection.NONE,
    
    298
    -             remote=None):
    
    304
    +             remote=None,
    
    305
    +             pull_buildtrees=False):
    
    299 306
     
    
    300 307
             use_config = True
    
    301 308
             if remote:
    
    ... ... @@ -310,8 +317,13 @@ class Stream():
    310 317
             if not self._artifacts.has_fetch_remotes():
    
    311 318
                 raise StreamError("No artifact caches available for pulling artifacts")
    
    312 319
     
    
    320
    +        # Query if any of the user defined artifact servers have buildtrees set
    
    321
    +        for cache in self._context.artifact_cache_specs:
    
    322
    +            if cache.buildtrees:
    
    323
    +                pull_buildtrees = True
    
    324
    +
    
    313 325
             self._pipeline.assert_consistent(elements)
    
    314
    -        self._add_queue(PullQueue(self._scheduler))
    
    326
    +        self._add_queue(PullQueue(self._scheduler, buildtrees=pull_buildtrees))
    
    315 327
             self._enqueue_plan(elements)
    
    316 328
             self._run()
    
    317 329
     
    

  • buildstream/element.py
    ... ... @@ -1689,18 +1689,26 @@ class Element(Plugin):
    1689 1689
     
    
    1690 1690
         # _pull_pending()
    
    1691 1691
         #
    
    1692
    -    # Check whether the artifact will be pulled.
    
    1692
    +    # Check whether the artifact will be pulled. If the pull operation is to
    
    1693
    +    # include a specific subdir of the element artifact (from cli or user conf)
    
    1694
    +    # then the local cache is queried for the subdirs existence.
    
    1695
    +    #
    
    1696
    +    # Args:
    
    1697
    +    #    subdir (str): Whether the pull has been invoked with a specific subdir set
    
    1693 1698
         #
    
    1694 1699
         # Returns:
    
    1695 1700
         #   (bool): Whether a pull operation is pending
    
    1696 1701
         #
    
    1697
    -    def _pull_pending(self):
    
    1702
    +    def _pull_pending(self, subdir=None):
    
    1698 1703
             if self._get_workspace():
    
    1699 1704
                 # Workspace builds are never pushed to artifact servers
    
    1700 1705
                 return False
    
    1701 1706
     
    
    1702
    -        if self.__strong_cached:
    
    1703
    -            # Artifact already in local cache
    
    1707
    +        if self.__strong_cached and subdir:
    
    1708
    +            # If we've specified a subdir, check if the subdir is cached locally
    
    1709
    +            if self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, subdir):
    
    1710
    +                return False
    
    1711
    +        elif self.__strong_cached:
    
    1704 1712
                 return False
    
    1705 1713
     
    
    1706 1714
             # Pull is pending if artifact remote server available
    
    ... ... @@ -1722,11 +1730,10 @@ class Element(Plugin):
    1722 1730
     
    
    1723 1731
             self._update_state()
    
    1724 1732
     
    
    1725
    -    def _pull_strong(self, *, progress=None):
    
    1733
    +    def _pull_strong(self, *, progress=None, subdir=None, excluded_subdirs=None):
    
    1726 1734
             weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1727
    -
    
    1728 1735
             key = self.__strict_cache_key
    
    1729
    -        if not self.__artifacts.pull(self, key, progress=progress):
    
    1736
    +        if not self.__artifacts.pull(self, key, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
    
    1730 1737
                 return False
    
    1731 1738
     
    
    1732 1739
             # update weak ref by pointing it to this newly fetched artifact
    
    ... ... @@ -1734,10 +1741,10 @@ class Element(Plugin):
    1734 1741
     
    
    1735 1742
             return True
    
    1736 1743
     
    
    1737
    -    def _pull_weak(self, *, progress=None):
    
    1744
    +    def _pull_weak(self, *, progress=None, subdir=None, excluded_subdirs=None):
    
    1738 1745
             weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
    
    1739
    -
    
    1740
    -        if not self.__artifacts.pull(self, weak_key, progress=progress):
    
    1746
    +        if not self.__artifacts.pull(self, weak_key, progress=progress, subdir=subdir,
    
    1747
    +                                     excluded_subdirs=excluded_subdirs):
    
    1741 1748
                 return False
    
    1742 1749
     
    
    1743 1750
             # extract strong cache key from this newly fetched artifact
    
    ... ... @@ -1755,17 +1762,17 @@ class Element(Plugin):
    1755 1762
         #
    
    1756 1763
         # Returns: True if the artifact has been downloaded, False otherwise
    
    1757 1764
         #
    
    1758
    -    def _pull(self):
    
    1765
    +    def _pull(self, subdir=None, excluded_subdirs=None):
    
    1759 1766
             context = self._get_context()
    
    1760 1767
     
    
    1761 1768
             def progress(percent, message):
    
    1762 1769
                 self.status(message)
    
    1763 1770
     
    
    1764 1771
             # Attempt to pull artifact without knowing whether it's available
    
    1765
    -        pulled = self._pull_strong(progress=progress)
    
    1772
    +        pulled = self._pull_strong(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
    
    1766 1773
     
    
    1767 1774
             if not pulled and not self._cached() and not context.get_strict():
    
    1768
    -            pulled = self._pull_weak(progress=progress)
    
    1775
    +            pulled = self._pull_weak(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
    
    1769 1776
     
    
    1770 1777
             if not pulled:
    
    1771 1778
                 return False
    
    ... ... @@ -1788,10 +1795,14 @@ class Element(Plugin):
    1788 1795
             if not self._cached():
    
    1789 1796
                 return True
    
    1790 1797
     
    
    1791
    -        # Do not push tained artifact
    
    1798
    +        # Do not push tainted artifact
    
    1792 1799
             if self.__get_tainted():
    
    1793 1800
                 return True
    
    1794 1801
     
    
    1802
    +        # Do not push elements that have a dangling buildtree artifact unless element type is
    
    1803
    +        # expected to have an empty buildtree directory
    
    1804
    +        if not self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, 'buildtree'):
    
    1805
    +            return True
    
    1795 1806
             return False
    
    1796 1807
     
    
    1797 1808
         # _push():
    

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -177,15 +177,11 @@ class SandboxRemote(Sandbox):
    177 177
             if not cascache.verify_digest_pushed(self._get_project(), upload_vdir.ref):
    
    178 178
                 raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    179 179
     
    
    180
    -        # Set up environment and working directory
    
    181
    -        if cwd is None:
    
    182
    -            cwd = self._get_work_directory()
    
    183
    -
    
    184
    -        if cwd is None:
    
    185
    -            cwd = '/'
    
    186
    -
    
    187
    -        if env is None:
    
    188
    -            env = self._get_environment()
    
    180
    +        # Fallback to the sandbox default settings for
    
    181
    +        # the cwd and env.
    
    182
    +        #
    
    183
    +        cwd = self._get_work_directory(cwd=cwd)
    
    184
    +        env = self._get_environment(cwd=cwd, env=env)
    
    189 185
     
    
    190 186
             # We want command args as a list of strings
    
    191 187
             if isinstance(command, str):
    

  • tests/completions/completions.py
    ... ... @@ -103,7 +103,7 @@ def test_commands(cli, cmd, word_idx, expected):
    103 103
         ('bst --no-colors build -', 3, ['--all ', '--track ', '--track-all ',
    
    104 104
                                         '--track-except ',
    
    105 105
                                         '--track-cross-junctions ', '-J ',
    
    106
    -                                    '--track-save ']),
    
    106
    +                                    '--track-save ', '--pull-buildtrees ']),
    
    107 107
     
    
    108 108
         # Test the behavior of completing after an option that has a
    
    109 109
         # parameter that cannot be completed, vs an option that has
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]