[Notes] [Git][BuildStream/buildstream][jmac/remote_execution_client] 26 commits: Use ArtifactCache's get_cache_size when calculating the quota



Title: GitLab

Jim MacArthur pushed to branch jmac/remote_execution_client at BuildStream / buildstream

Commits:

27 changed files:

Changes:

  • .gitlab-ci.yml
    1
    -image: buildstream/testsuite-debian:9-master-112-a9f63c5e
    
    1
    +image: buildstream/testsuite-debian:9-master-114-4cab18e3
    
    2 2
     
    
    3 3
     cache:
    
    4 4
       key: "$CI_JOB_NAME-"
    
    ... ... @@ -79,25 +79,25 @@ source_dist:
    79 79
         - coverage-linux/
    
    80 80
     
    
    81 81
     tests-debian-9:
    
    82
    -  image: buildstream/testsuite-debian:9-master-112-a9f63c5e
    
    82
    +  image: buildstream/testsuite-debian:9-master-114-4cab18e3
    
    83 83
       <<: *linux-tests
    
    84 84
     
    
    85 85
     tests-fedora-27:
    
    86
    -  image: buildstream/testsuite-fedora:27-master-112-a9f63c5e
    
    86
    +  image: buildstream/testsuite-fedora:27-master-114-4cab18e3
    
    87 87
       <<: *linux-tests
    
    88 88
     
    
    89 89
     tests-fedora-28:
    
    90
    -  image: buildstream/testsuite-fedora:28-master-112-a9f63c5e
    
    90
    +  image: buildstream/testsuite-fedora:28-master-114-4cab18e3
    
    91 91
       <<: *linux-tests
    
    92 92
     
    
    93 93
     tests-ubuntu-18.04:
    
    94
    -  image: buildstream/testsuite-ubuntu:18.04-master-112-a9f63c5e
    
    94
    +  image: buildstream/testsuite-ubuntu:18.04-master-114-4cab18e3
    
    95 95
       <<: *linux-tests
    
    96 96
     
    
    97 97
     tests-unix:
    
    98 98
       # Use fedora here, to a) run a test on fedora and b) ensure that we
    
    99 99
       # can get rid of ostree - this is not possible with debian-8
    
    100
    -  image: buildstream/testsuite-fedora:27-master-112-a9f63c5e
    
    100
    +  image: buildstream/testsuite-fedora:27-master-114-4cab18e3
    
    101 101
       stage: test
    
    102 102
       variables:
    
    103 103
         BST_FORCE_BACKEND: "unix"
    

  • buildstream/_artifactcache/__init__.py
    ... ... @@ -17,4 +17,4 @@
    17 17
     #  Authors:
    
    18 18
     #        Tristan Van Berkom <tristan vanberkom codethink co uk>
    
    19 19
     
    
    20
    -from .artifactcache import ArtifactCache, ArtifactCacheSpec
    20
    +from .artifactcache import ArtifactCache, ArtifactCacheSpec, CACHE_SIZE_FILE

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -28,6 +28,9 @@ from .. import utils
    28 28
     from .. import _yaml
    
    29 29
     
    
    30 30
     
    
    31
    +CACHE_SIZE_FILE = "cache_size"
    
    32
    +
    
    33
    +
    
    31 34
     # An ArtifactCacheSpec holds the user configuration for a single remote
    
    32 35
     # artifact cache.
    
    33 36
     #
    
    ... ... @@ -82,7 +85,6 @@ class ArtifactCache():
    82 85
             self.extractdir = os.path.join(context.artifactdir, 'extract')
    
    83 86
             self.tmpdir = os.path.join(context.artifactdir, 'tmp')
    
    84 87
     
    
    85
    -        self.max_size = context.cache_quota
    
    86 88
             self.estimated_size = None
    
    87 89
     
    
    88 90
             self.global_remote_specs = []
    
    ... ... @@ -90,6 +92,8 @@ class ArtifactCache():
    90 92
     
    
    91 93
             self._local = False
    
    92 94
             self.cache_size = None
    
    95
    +        self.cache_quota = None
    
    96
    +        self.cache_lower_threshold = None
    
    93 97
     
    
    94 98
             os.makedirs(self.extractdir, exist_ok=True)
    
    95 99
             os.makedirs(self.tmpdir, exist_ok=True)
    
    ... ... @@ -227,7 +231,7 @@ class ArtifactCache():
    227 231
         def clean(self):
    
    228 232
             artifacts = self.list_artifacts()
    
    229 233
     
    
    230
    -        while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold:
    
    234
    +        while self.calculate_cache_size() >= self.cache_quota - self.cache_lower_threshold:
    
    231 235
                 try:
    
    232 236
                     to_remove = artifacts.pop(0)
    
    233 237
                 except IndexError:
    
    ... ... @@ -241,7 +245,7 @@ class ArtifactCache():
    241 245
                               "Please increase the cache-quota in {}."
    
    242 246
                               .format(self.context.config_origin or default_conf))
    
    243 247
     
    
    244
    -                if self.calculate_cache_size() > self.context.cache_quota:
    
    248
    +                if self.calculate_cache_size() > self.cache_quota:
    
    245 249
                         raise ArtifactError("Cache too full. Aborting.",
    
    246 250
                                             detail=detail,
    
    247 251
                                             reason="cache-too-full")
    
    ... ... @@ -282,7 +286,11 @@ class ArtifactCache():
    282 286
             # If we don't currently have an estimate, figure out the real
    
    283 287
             # cache size.
    
    284 288
             if self.estimated_size is None:
    
    285
    -            self.estimated_size = self.calculate_cache_size()
    
    289
    +            stored_size = self._read_cache_size()
    
    290
    +            if stored_size is not None:
    
    291
    +                self.estimated_size = stored_size
    
    292
    +            else:
    
    293
    +                self.estimated_size = self.calculate_cache_size()
    
    286 294
     
    
    287 295
             return self.estimated_size
    
    288 296
     
    
    ... ... @@ -541,6 +549,7 @@ class ArtifactCache():
    541 549
                 self.estimated_size = self.calculate_cache_size()
    
    542 550
     
    
    543 551
             self.estimated_size += artifact_size
    
    552
    +        self._write_cache_size(self.estimated_size)
    
    544 553
     
    
    545 554
         # _set_cache_size()
    
    546 555
         #
    
    ... ... @@ -551,6 +560,109 @@ class ArtifactCache():
    551 560
         def _set_cache_size(self, cache_size):
    
    552 561
             self.estimated_size = cache_size
    
    553 562
     
    
    563
    +        # set_cache_size is called in cleanup, where it may set the cache to None
    
    564
    +        if self.estimated_size is not None:
    
    565
    +            self._write_cache_size(self.estimated_size)
    
    566
    +
    
    567
    +    # _write_cache_size()
    
    568
    +    #
    
    569
    +    # Writes the given size of the artifact to the cache's size file
    
    570
    +    #
    
    571
    +    def _write_cache_size(self, size):
    
    572
    +        assert isinstance(size, int)
    
    573
    +        size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
    
    574
    +        with open(size_file_path, "w") as f:
    
    575
    +            f.write(str(size))
    
    576
    +
    
    577
    +    # _read_cache_size()
    
    578
    +    #
    
    579
    +    # Reads and returns the size of the artifact cache that's stored in the
    
    580
    +    # cache's size file
    
    581
    +    #
    
    582
    +    def _read_cache_size(self):
    
    583
    +        size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
    
    584
    +
    
    585
    +        if not os.path.exists(size_file_path):
    
    586
    +            return None
    
    587
    +
    
    588
    +        with open(size_file_path, "r") as f:
    
    589
    +            size = f.read()
    
    590
    +
    
    591
    +        try:
    
    592
    +            num_size = int(size)
    
    593
    +        except ValueError as e:
    
    594
    +            raise ArtifactError("Size '{}' parsed from '{}' was not an integer".format(
    
    595
    +                size, size_file_path)) from e
    
    596
    +
    
    597
    +        return num_size
    
    598
    +
    
    599
    +    # _calculate_cache_quota()
    
    600
    +    #
    
    601
    +    # Calculates and sets the cache quota and lower threshold based on the
    
    602
    +    # quota set in Context.
    
    603
    +    # It checks that the quota is both a valid _expression_, and that there is
    
    604
    +    # enough disk space to satisfy that quota
    
    605
    +    #
    
    606
    +    def _calculate_cache_quota(self):
    
    607
    +        # Headroom intended to give BuildStream a bit of leeway.
    
    608
    +        # This acts as the minimum size of cache_quota and also
    
    609
    +        # is taken from the user requested cache_quota.
    
    610
    +        #
    
    611
    +        if 'BST_TEST_SUITE' in os.environ:
    
    612
    +            headroom = 0
    
    613
    +        else:
    
    614
    +            headroom = 2e9
    
    615
    +
    
    616
    +        artifactdir_volume = self.context.artifactdir
    
    617
    +        while not os.path.exists(artifactdir_volume):
    
    618
    +            artifactdir_volume = os.path.dirname(artifactdir_volume)
    
    619
    +
    
    620
    +        try:
    
    621
    +            cache_quota = utils._parse_size(self.context.config_cache_quota, artifactdir_volume)
    
    622
    +        except utils.UtilError as e:
    
    623
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    624
    +                            "{}\nPlease specify the value in bytes or as a % of full disk space.\n"
    
    625
    +                            "\nValid values are, for example: 800M 10G 1T 50%\n"
    
    626
    +                            .format(str(e))) from e
    
    627
    +
    
    628
    +        stat = os.statvfs(artifactdir_volume)
    
    629
    +        available_space = (stat.f_bsize * stat.f_bavail)
    
    630
    +
    
    631
    +        cache_size = self.get_approximate_cache_size()
    
    632
    +
    
    633
    +        # Ensure system has enough storage for the cache_quota
    
    634
    +        #
    
    635
    +        # If cache_quota is none, set it to the maximum it could possibly be.
    
    636
    +        #
    
    637
    +        # Also check that cache_quota is atleast as large as our headroom.
    
    638
    +        #
    
    639
    +        if cache_quota is None:  # Infinity, set to max system storage
    
    640
    +            cache_quota = cache_size + available_space
    
    641
    +        if cache_quota < headroom:  # Check minimum
    
    642
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    643
    +                            "Invalid cache quota ({}): ".format(utils._pretty_size(cache_quota)) +
    
    644
    +                            "BuildStream requires a minimum cache quota of 2G.")
    
    645
    +        elif cache_quota > cache_size + available_space:  # Check maximum
    
    646
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    647
    +                            ("Your system does not have enough available " +
    
    648
    +                             "space to support the cache quota specified.\n" +
    
    649
    +                             "You currently have:\n" +
    
    650
    +                             "- {used} of cache in use at {local_cache_path}\n" +
    
    651
    +                             "- {available} of available system storage").format(
    
    652
    +                                 used=utils._pretty_size(cache_size),
    
    653
    +                                 local_cache_path=self.context.artifactdir,
    
    654
    +                                 available=utils._pretty_size(available_space)))
    
    655
    +
    
    656
    +        # Place a slight headroom (2e9 (2GB) on the cache_quota) into
    
    657
    +        # cache_quota to try and avoid exceptions.
    
    658
    +        #
    
    659
    +        # Of course, we might still end up running out during a build
    
    660
    +        # if we end up writing more than 2G, but hey, this stuff is
    
    661
    +        # already really fuzzy.
    
    662
    +        #
    
    663
    +        self.cache_quota = cache_quota - headroom
    
    664
    +        self.cache_lower_threshold = self.cache_quota / 2
    
    665
    +
    
    554 666
     
    
    555 667
     # _configured_remote_artifact_cache_specs():
    
    556 668
     #
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -61,6 +61,8 @@ class CASCache(ArtifactCache):
    61 61
             os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
    
    62 62
             os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
    
    63 63
     
    
    64
    +        self._calculate_cache_quota()
    
    65
    +
    
    64 66
             self._enable_push = enable_push
    
    65 67
     
    
    66 68
             # Per-project list of _CASRemote instances.
    
    ... ... @@ -215,6 +217,29 @@ class CASCache(ArtifactCache):
    215 217
                 remotes_for_project = self._remotes[element._get_project()]
    
    216 218
                 return any(remote.spec.push for remote in remotes_for_project)
    
    217 219
     
    
    220
    +    def pull_key(self, key, size_bytes, project):
    
    221
    +        """ Pull a single key rather than an artifact.
    
    222
    +        Does not update local refs. """
    
    223
    +
    
    224
    +        for remote in self._remotes[project]:
    
    225
    +            try:
    
    226
    +                remote.init()
    
    227
    +
    
    228
    +                tree = remote_execution_pb2.Digest()
    
    229
    +                tree.hash = key
    
    230
    +                tree.size_bytes = size_bytes
    
    231
    +
    
    232
    +                self._fetch_directory(remote, tree)
    
    233
    +
    
    234
    +                # no need to pull from additional remotes
    
    235
    +                return True
    
    236
    +
    
    237
    +            except grpc.RpcError as e:
    
    238
    +                if e.code() != grpc.StatusCode.NOT_FOUND:
    
    239
    +                    raise
    
    240
    +
    
    241
    +        return False
    
    242
    +
    
    218 243
         def pull(self, element, key, *, progress=None):
    
    219 244
             ref = self.get_artifact_fullname(element, key)
    
    220 245
     
    
    ... ... @@ -256,10 +281,96 @@ class CASCache(ArtifactCache):
    256 281
     
    
    257 282
             self.set_ref(newref, tree)
    
    258 283
     
    
    284
    +    def _push_refs_to_remote(self, refs, remote, may_have_dependencies):
    
    285
    +        skipped_remote = True
    
    286
    +        element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
    
    287
    +        try:
    
    288
    +            for ref in refs:
    
    289
    +                tree = self.resolve_ref(ref)
    
    290
    +
    
    291
    +                # Check whether ref is already on the server in which case
    
    292
    +                # there is no need to push the artifact
    
    293
    +                try:
    
    294
    +                    request = buildstream_pb2.GetReferenceRequest()
    
    295
    +                    request.key = ref
    
    296
    +                    response = remote.ref_storage.GetReference(request)
    
    297
    +
    
    298
    +                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    299
    +                        # ref is already on the server with the same tree
    
    300
    +                        continue
    
    301
    +
    
    302
    +                except grpc.RpcError as e:
    
    303
    +                    if e.code() != grpc.StatusCode.NOT_FOUND:
    
    304
    +                        # Intentionally re-raise RpcError for outer except block.
    
    305
    +                        raise
    
    306
    +
    
    307
    +                missing_blobs = {}
    
    308
    +                required_blobs = self._required_blobs(tree, may_have_dependencies)
    
    309
    +
    
    310
    +                # Limit size of FindMissingBlobs request
    
    311
    +                for required_blobs_group in _grouper(required_blobs, 512):
    
    312
    +                    request = remote_execution_pb2.FindMissingBlobsRequest()
    
    313
    +
    
    314
    +                    for required_digest in required_blobs_group:
    
    315
    +                        d = request.blob_digests.add()
    
    316
    +                        d.hash = required_digest.hash
    
    317
    +                        d.size_bytes = required_digest.size_bytes
    
    318
    +
    
    319
    +                    response = remote.cas.FindMissingBlobs(request)
    
    320
    +                    for digest in response.missing_blob_digests:
    
    321
    +                        d = remote_execution_pb2.Digest()
    
    322
    +                        d.hash = digest.hash
    
    323
    +                        d.size_bytes = digest.size_bytes
    
    324
    +                        missing_blobs[d.hash] = d
    
    325
    +
    
    326
    +                # Upload any blobs missing on the server
    
    327
    +                skipped_remote = False
    
    328
    +                for digest in missing_blobs.values():
    
    329
    +                    uuid_ = uuid.uuid4()
    
    330
    +                    resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
    
    331
    +                                              digest.hash, str(digest.size_bytes)])
    
    332
    +
    
    333
    +                    def request_stream(resname):
    
    334
    +                        with open(self.objpath(digest), 'rb') as f:
    
    335
    +                            assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    336
    +                            offset = 0
    
    337
    +                            finished = False
    
    338
    +                            remaining = digest.size_bytes
    
    339
    +                            while not finished:
    
    340
    +                                chunk_size = min(remaining, 64 * 1024)
    
    341
    +                                remaining -= chunk_size
    
    342
    +
    
    343
    +                                request = bytestream_pb2.WriteRequest()
    
    344
    +                                request.write_offset = offset
    
    345
    +                                # max. 64 kB chunks
    
    346
    +                                request.data = f.read(chunk_size)
    
    347
    +                                request.resource_name = resname
    
    348
    +                                request.finish_write = remaining <= 0
    
    349
    +                                yield request
    
    350
    +                                offset += chunk_size
    
    351
    +                                finished = request.finish_write
    
    352
    +                    response = remote.bytestream.Write(request_stream(resource_name))
    
    353
    +
    
    354
    +                request = buildstream_pb2.UpdateReferenceRequest()
    
    355
    +                request.keys.append(ref)
    
    356
    +                request.digest.hash = tree.hash
    
    357
    +                request.digest.size_bytes = tree.size_bytes
    
    358
    +                remote.ref_storage.UpdateReference(request)
    
    359
    +
    
    360
    +        except grpc.RpcError as e:
    
    361
    +            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    362
    +                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    363
    +
    
    364
    +        return not skipped_remote
    
    365
    +
    
    259 366
         def push(self, element, keys):
    
    367
    +        keys = list(keys)
    
    260 368
             refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    261 369
     
    
    262 370
             project = element._get_project()
    
    371
    +        return self.push_refs(refs, project, element=element)
    
    372
    +
    
    373
    +    def push_refs(self, refs, project, may_have_dependencies=True, element=None):
    
    263 374
     
    
    264 375
             push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    265 376
     
    
    ... ... @@ -267,97 +378,52 @@ class CASCache(ArtifactCache):
    267 378
     
    
    268 379
             for remote in push_remotes:
    
    269 380
                 remote.init()
    
    270
    -            skipped_remote = True
    
    271
    -            element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
    
    272
    -
    
    273
    -            try:
    
    274
    -                for ref in refs:
    
    275
    -                    tree = self.resolve_ref(ref)
    
    276
    -
    
    277
    -                    # Check whether ref is already on the server in which case
    
    278
    -                    # there is no need to push the artifact
    
    279
    -                    try:
    
    280
    -                        request = buildstream_pb2.GetReferenceRequest()
    
    281
    -                        request.key = ref
    
    282
    -                        response = remote.ref_storage.GetReference(request)
    
    283
    -
    
    284
    -                        if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    285
    -                            # ref is already on the server with the same tree
    
    286
    -                            continue
    
    287
    -
    
    288
    -                    except grpc.RpcError as e:
    
    289
    -                        if e.code() != grpc.StatusCode.NOT_FOUND:
    
    290
    -                            # Intentionally re-raise RpcError for outer except block.
    
    291
    -                            raise
    
    292
    -
    
    293
    -                    missing_blobs = {}
    
    294
    -                    required_blobs = self._required_blobs(tree)
    
    295
    -
    
    296
    -                    # Limit size of FindMissingBlobs request
    
    297
    -                    for required_blobs_group in _grouper(required_blobs, 512):
    
    298
    -                        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    299
    -
    
    300
    -                        for required_digest in required_blobs_group:
    
    301
    -                            d = request.blob_digests.add()
    
    302
    -                            d.hash = required_digest.hash
    
    303
    -                            d.size_bytes = required_digest.size_bytes
    
    304
    -
    
    305
    -                        response = remote.cas.FindMissingBlobs(request)
    
    306
    -                        for digest in response.missing_blob_digests:
    
    307
    -                            d = remote_execution_pb2.Digest()
    
    308
    -                            d.hash = digest.hash
    
    309
    -                            d.size_bytes = digest.size_bytes
    
    310
    -                            missing_blobs[d.hash] = d
    
    311
    -
    
    312
    -                    # Upload any blobs missing on the server
    
    313
    -                    skipped_remote = False
    
    314
    -                    for digest in missing_blobs.values():
    
    315
    -                        uuid_ = uuid.uuid4()
    
    316
    -                        resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
    
    317
    -                                                  digest.hash, str(digest.size_bytes)])
    
    318
    -
    
    319
    -                        def request_stream():
    
    320
    -                            with open(self.objpath(digest), 'rb') as f:
    
    321
    -                                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    322
    -                                offset = 0
    
    323
    -                                finished = False
    
    324
    -                                remaining = digest.size_bytes
    
    325
    -                                while not finished:
    
    326
    -                                    chunk_size = min(remaining, 64 * 1024)
    
    327
    -                                    remaining -= chunk_size
    
    328
    -
    
    329
    -                                    request = bytestream_pb2.WriteRequest()
    
    330
    -                                    request.write_offset = offset
    
    331
    -                                    # max. 64 kB chunks
    
    332
    -                                    request.data = f.read(chunk_size)
    
    333
    -                                    request.resource_name = resource_name
    
    334
    -                                    request.finish_write = remaining <= 0
    
    335
    -                                    yield request
    
    336
    -                                    offset += chunk_size
    
    337
    -                                    finished = request.finish_write
    
    338
    -                        response = remote.bytestream.Write(request_stream())
    
    339
    -
    
    340
    -                    request = buildstream_pb2.UpdateReferenceRequest()
    
    341
    -                    request.keys.append(ref)
    
    342
    -                    request.digest.hash = tree.hash
    
    343
    -                    request.digest.size_bytes = tree.size_bytes
    
    344
    -                    remote.ref_storage.UpdateReference(request)
    
    345
    -
    
    346
    -                    pushed = True
    
    347
    -
    
    348
    -            except grpc.RpcError as e:
    
    349
    -                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    350
    -                    raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    351
    -
    
    352
    -            if skipped_remote:
    
    381
    +            if self._push_refs_to_remote(refs, remote, may_have_dependencies):
    
    382
    +                pushed = True
    
    383
    +            elif element:
    
    353 384
                     self.context.message(Message(
    
    354 385
                         None,
    
    355 386
                         MessageType.SKIPPED,
    
    356 387
                         "Remote ({}) already has {} cached".format(
    
    357 388
                             remote.spec.url, element._get_brief_display_key())
    
    358 389
                     ))
    
    390
    +
    
    359 391
             return pushed
    
    360 392
     
    
    393
    +    def verify_key_pushed(self, key, project):
    
    394
    +        ref = key
    
    395
    +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    396
    +
    
    397
    +        pushed = False
    
    398
    +
    
    399
    +        for remote in push_remotes:
    
    400
    +            remote.init()
    
    401
    +
    
    402
    +            if self._verify_ref_on_remote(ref, remote):
    
    403
    +                pushed = True
    
    404
    +
    
    405
    +        return pushed
    
    406
    +
    
    407
    +    def _verify_ref_on_remote(self, ref, remote):
    
    408
    +        tree = self.resolve_ref(ref)
    
    409
    +
    
    410
    +        # Check whether ref is already on the server in which case
    
    411
    +        # there is no need to push the artifact
    
    412
    +        try:
    
    413
    +            request = buildstream_pb2.GetReferenceRequest()
    
    414
    +            request.key = ref
    
    415
    +            response = remote.ref_storage.GetReference(request)
    
    416
    +
    
    417
    +            if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    418
    +                # ref is already on the server with the same tree
    
    419
    +                return True
    
    420
    +
    
    421
    +        except grpc.RpcError as e:
    
    422
    +            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    423
    +                raise
    
    424
    +
    
    425
    +        return False
    
    426
    +
    
    361 427
         ################################################
    
    362 428
         #                API Private Methods           #
    
    363 429
         ################################################
    
    ... ... @@ -731,26 +797,27 @@ class CASCache(ArtifactCache):
    731 797
                 #
    
    732 798
                 q.put(str(e))
    
    733 799
     
    
    734
    -    def _required_blobs(self, tree):
    
    800
    +    def _required_blobs(self, tree, may_have_dependencies=True):
    
    735 801
             # parse directory, and recursively add blobs
    
    736 802
             d = remote_execution_pb2.Digest()
    
    737 803
             d.hash = tree.hash
    
    738 804
             d.size_bytes = tree.size_bytes
    
    739 805
             yield d
    
    740 806
     
    
    741
    -        directory = remote_execution_pb2.Directory()
    
    807
    +        if may_have_dependencies:
    
    808
    +            directory = remote_execution_pb2.Directory()
    
    742 809
     
    
    743
    -        with open(self.objpath(tree), 'rb') as f:
    
    744
    -            directory.ParseFromString(f.read())
    
    810
    +            with open(self.objpath(tree), 'rb') as f:
    
    811
    +                directory.ParseFromString(f.read())
    
    745 812
     
    
    746
    -        for filenode in directory.files:
    
    747
    -            d = remote_execution_pb2.Digest()
    
    748
    -            d.hash = filenode.digest.hash
    
    749
    -            d.size_bytes = filenode.digest.size_bytes
    
    750
    -            yield d
    
    813
    +            for filenode in directory.files:
    
    814
    +                d = remote_execution_pb2.Digest()
    
    815
    +                d.hash = filenode.digest.hash
    
    816
    +                d.size_bytes = filenode.digest.size_bytes
    
    817
    +                yield d
    
    751 818
     
    
    752
    -        for dirnode in directory.directories:
    
    753
    -            yield from self._required_blobs(dirnode.digest)
    
    819
    +            for dirnode in directory.directories:
    
    820
    +                yield from self._required_blobs(dirnode.digest)
    
    754 821
     
    
    755 822
         def _fetch_blob(self, remote, digest, out):
    
    756 823
             resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    

  • buildstream/_context.py
    ... ... @@ -64,12 +64,6 @@ class Context():
    64 64
             # The locations from which to push and pull prebuilt artifacts
    
    65 65
             self.artifact_cache_specs = []
    
    66 66
     
    
    67
    -        # The artifact cache quota
    
    68
    -        self.cache_quota = None
    
    69
    -
    
    70
    -        # The lower threshold to which we aim to reduce the cache size
    
    71
    -        self.cache_lower_threshold = None
    
    72
    -
    
    73 67
             # The directory to store build logs
    
    74 68
             self.logdir = None
    
    75 69
     
    
    ... ... @@ -124,6 +118,8 @@ class Context():
    124 118
             self._workspaces = None
    
    125 119
             self._log_handle = None
    
    126 120
             self._log_filename = None
    
    121
    +        self.config_cache_quota = 'infinity'
    
    122
    +        self.artifactdir_volume = None
    
    127 123
     
    
    128 124
         # load()
    
    129 125
         #
    
    ... ... @@ -183,71 +179,7 @@ class Context():
    183 179
             cache = _yaml.node_get(defaults, Mapping, 'cache')
    
    184 180
             _yaml.node_validate(cache, ['quota'])
    
    185 181
     
    
    186
    -        artifactdir_volume = self.artifactdir
    
    187
    -        while not os.path.exists(artifactdir_volume):
    
    188
    -            artifactdir_volume = os.path.dirname(artifactdir_volume)
    
    189
    -
    
    190
    -        # We read and parse the cache quota as specified by the user
    
    191
    -        cache_quota = _yaml.node_get(cache, str, 'quota', default_value='infinity')
    
    192
    -        try:
    
    193
    -            cache_quota = utils._parse_size(cache_quota, artifactdir_volume)
    
    194
    -        except utils.UtilError as e:
    
    195
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    196
    -                            "{}\nPlease specify the value in bytes or as a % of full disk space.\n"
    
    197
    -                            "\nValid values are, for example: 800M 10G 1T 50%\n"
    
    198
    -                            .format(str(e))) from e
    
    199
    -
    
    200
    -        # Headroom intended to give BuildStream a bit of leeway.
    
    201
    -        # This acts as the minimum size of cache_quota and also
    
    202
    -        # is taken from the user requested cache_quota.
    
    203
    -        #
    
    204
    -        if 'BST_TEST_SUITE' in os.environ:
    
    205
    -            headroom = 0
    
    206
    -        else:
    
    207
    -            headroom = 2e9
    
    208
    -
    
    209
    -        stat = os.statvfs(artifactdir_volume)
    
    210
    -        available_space = (stat.f_bsize * stat.f_bavail)
    
    211
    -
    
    212
    -        # Again, the artifact directory may not yet have been created yet
    
    213
    -        #
    
    214
    -        if not os.path.exists(self.artifactdir):
    
    215
    -            cache_size = 0
    
    216
    -        else:
    
    217
    -            cache_size = utils._get_dir_size(self.artifactdir)
    
    218
    -
    
    219
    -        # Ensure system has enough storage for the cache_quota
    
    220
    -        #
    
    221
    -        # If cache_quota is none, set it to the maximum it could possibly be.
    
    222
    -        #
    
    223
    -        # Also check that cache_quota is atleast as large as our headroom.
    
    224
    -        #
    
    225
    -        if cache_quota is None:  # Infinity, set to max system storage
    
    226
    -            cache_quota = cache_size + available_space
    
    227
    -        if cache_quota < headroom:  # Check minimum
    
    228
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    229
    -                            "Invalid cache quota ({}): ".format(utils._pretty_size(cache_quota)) +
    
    230
    -                            "BuildStream requires a minimum cache quota of 2G.")
    
    231
    -        elif cache_quota > cache_size + available_space:  # Check maximum
    
    232
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    233
    -                            ("Your system does not have enough available " +
    
    234
    -                             "space to support the cache quota specified.\n" +
    
    235
    -                             "You currently have:\n" +
    
    236
    -                             "- {used} of cache in use at {local_cache_path}\n" +
    
    237
    -                             "- {available} of available system storage").format(
    
    238
    -                                 used=utils._pretty_size(cache_size),
    
    239
    -                                 local_cache_path=self.artifactdir,
    
    240
    -                                 available=utils._pretty_size(available_space)))
    
    241
    -
    
    242
    -        # Place a slight headroom (2e9 (2GB) on the cache_quota) into
    
    243
    -        # cache_quota to try and avoid exceptions.
    
    244
    -        #
    
    245
    -        # Of course, we might still end up running out during a build
    
    246
    -        # if we end up writing more than 2G, but hey, this stuff is
    
    247
    -        # already really fuzzy.
    
    248
    -        #
    
    249
    -        self.cache_quota = cache_quota - headroom
    
    250
    -        self.cache_lower_threshold = self.cache_quota / 2
    
    182
    +        self.config_cache_quota = _yaml.node_get(cache, str, 'quota', default_value='infinity')
    
    251 183
     
    
    252 184
             # Load artifact share configuration
    
    253 185
             self.artifact_cache_specs = ArtifactCache.specs_from_config_node(defaults)
    

  • buildstream/_frontend/app.py
    ... ... @@ -198,8 +198,10 @@ class App():
    198 198
                 option_value = self._main_options.get(cli_option)
    
    199 199
                 if option_value is not None:
    
    200 200
                     setattr(self.context, context_attr, option_value)
    
    201
    -
    
    202
    -        Platform.create_instance(self.context)
    
    201
    +        try:
    
    202
    +            Platform.create_instance(self.context)
    
    203
    +        except BstError as e:
    
    204
    +            self._error_exit(e, "Error instantiating platform")
    
    203 205
     
    
    204 206
             # Create the logger right before setting the message handler
    
    205 207
             self.logger = LogLine(self.context,
    

  • buildstream/_includes.py
    ... ... @@ -10,11 +10,15 @@ from ._exceptions import LoadError, LoadErrorReason
    10 10
     #
    
    11 11
     # Args:
    
    12 12
     #    loader (Loader): The Loader object
    
    13
    +#    copy_tree (bool): Whether to make a copy, of tree in
    
    14
    +#                      provenance. Should be true if intended to be
    
    15
    +#                      serialized.
    
    13 16
     class Includes:
    
    14 17
     
    
    15
    -    def __init__(self, loader):
    
    18
    +    def __init__(self, loader, *, copy_tree=False):
    
    16 19
             self._loader = loader
    
    17 20
             self._loaded = {}
    
    21
    +        self._copy_tree = copy_tree
    
    18 22
     
    
    19 23
         # process()
    
    20 24
         #
    
    ... ... @@ -96,10 +100,11 @@ class Includes:
    96 100
             directory = project.directory
    
    97 101
             file_path = os.path.join(directory, include)
    
    98 102
             key = (current_loader, file_path)
    
    99
    -        if file_path not in self._loaded:
    
    103
    +        if key not in self._loaded:
    
    100 104
                 self._loaded[key] = _yaml.load(os.path.join(directory, include),
    
    101 105
                                                shortname=shortname,
    
    102
    -                                           project=project)
    
    106
    +                                           project=project,
    
    107
    +                                           copy_tree=self._copy_tree)
    
    103 108
             return self._loaded[key], file_path, current_loader
    
    104 109
     
    
    105 110
         # _process_value()
    

  • buildstream/_loader/loadelement.py
    ... ... @@ -71,7 +71,7 @@ class LoadElement():
    71 71
                 'kind', 'depends', 'sources', 'sandbox',
    
    72 72
                 'variables', 'environment', 'environment-nocache',
    
    73 73
                 'config', 'public', 'description',
    
    74
    -            'build-depends', 'runtime-depends',
    
    74
    +            'build-depends', 'runtime-depends'
    
    75 75
             ])
    
    76 76
     
    
    77 77
             # Extract the Dependencies
    

  • buildstream/_loader/loader.py
    ... ... @@ -78,7 +78,7 @@ class Loader():
    78 78
             self._elements = {}       # Dict of elements
    
    79 79
             self._loaders = {}        # Dict of junction loaders
    
    80 80
     
    
    81
    -        self._includes = Includes(self)
    
    81
    +        self._includes = Includes(self, copy_tree=True)
    
    82 82
     
    
    83 83
         # load():
    
    84 84
         #
    

  • buildstream/_loader/types.py
    ... ... @@ -41,6 +41,7 @@ class Symbol():
    41 41
         DIRECTORY = "directory"
    
    42 42
         JUNCTION = "junction"
    
    43 43
         SANDBOX = "sandbox"
    
    44
    +    REMOTE_EXECUTION = "remote-execution"
    
    44 45
     
    
    45 46
     
    
    46 47
     # Dependency()
    

  • buildstream/_pipeline.py
    ... ... @@ -235,6 +235,9 @@ class Pipeline():
    235 235
         #                       exceptions removed
    
    236 236
         #
    
    237 237
         def except_elements(self, targets, elements, except_targets):
    
    238
    +        if not except_targets:
    
    239
    +            return elements
    
    240
    +
    
    238 241
             targeted = list(self.dependencies(targets, Scope.ALL))
    
    239 242
             visited = []
    
    240 243
     
    

  • buildstream/_project.py
    ... ... @@ -129,6 +129,7 @@ class Project():
    129 129
     
    
    130 130
             self.artifact_cache_specs = None
    
    131 131
             self._sandbox = None
    
    132
    +        self._remote_execution = None
    
    132 133
             self._splits = None
    
    133 134
     
    
    134 135
             self._context.add_project(self)
    
    ... ... @@ -419,7 +420,7 @@ class Project():
    419 420
                                  parent=parent_loader,
    
    420 421
                                  tempdir=tempdir)
    
    421 422
     
    
    422
    -        self._project_includes = Includes(self.loader)
    
    423
    +        self._project_includes = Includes(self.loader, copy_tree=False)
    
    423 424
     
    
    424 425
             project_conf_first_pass = _yaml.node_copy(self._project_conf)
    
    425 426
             self._project_includes.process(project_conf_first_pass, only_local=True)
    
    ... ... @@ -460,7 +461,7 @@ class Project():
    460 461
                 'aliases', 'name',
    
    461 462
                 'artifacts', 'options',
    
    462 463
                 'fail-on-overlap', 'shell', 'fatal-warnings',
    
    463
    -            'ref-storage', 'sandbox', 'mirrors'
    
    464
    +            'ref-storage', 'sandbox', 'mirrors', 'remote-execution'
    
    464 465
             ])
    
    465 466
     
    
    466 467
             #
    
    ... ... @@ -478,6 +479,9 @@ class Project():
    478 479
             # Load sandbox configuration
    
    479 480
             self._sandbox = _yaml.node_get(config, Mapping, 'sandbox')
    
    480 481
     
    
    482
    +        # Load remote execution configuration
    
    483
    +        self._remote_execution = _yaml.node_get(config, Mapping, 'remote-execution')
    
    484
    +
    
    481 485
             # Load project split rules
    
    482 486
             self._splits = _yaml.node_get(config, Mapping, 'split-rules')
    
    483 487
     
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -97,7 +97,7 @@ class BuildQueue(Queue):
    97 97
                 cache = element._get_artifact_cache()
    
    98 98
                 cache._add_artifact_size(artifact_size)
    
    99 99
     
    
    100
    -            if cache.get_approximate_cache_size() > self._scheduler.context.cache_quota:
    
    100
    +            if cache.get_approximate_cache_size() > cache.cache_quota:
    
    101 101
                     self._scheduler._check_cache_size_real()
    
    102 102
     
    
    103 103
         def done(self, job, element, result, success):
    

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -29,6 +29,7 @@ from contextlib import contextmanager
    29 29
     # Local imports
    
    30 30
     from .resources import Resources, ResourceType
    
    31 31
     from .jobs import CacheSizeJob, CleanupJob
    
    32
    +from .._platform import Platform
    
    32 33
     
    
    33 34
     
    
    34 35
     # A decent return code for Scheduler.run()
    
    ... ... @@ -316,7 +317,8 @@ class Scheduler():
    316 317
             self._sched()
    
    317 318
     
    
    318 319
         def _run_cleanup(self, cache_size):
    
    319
    -        if cache_size and cache_size < self.context.cache_quota:
    
    320
    +        platform = Platform.get_platform()
    
    321
    +        if cache_size and cache_size < platform.artifactcache.cache_quota:
    
    320 322
                 return
    
    321 323
     
    
    322 324
             job = CleanupJob(self, 'cleanup', 'cleanup',
    

  • buildstream/buildelement.py
    ... ... @@ -155,6 +155,9 @@ class BuildElement(Element):
    155 155
                 command_dir = build_root
    
    156 156
             sandbox.set_work_directory(command_dir)
    
    157 157
     
    
    158
    +        # Tell sandbox which directory is preserved in the finished artifact
    
    159
    +        sandbox.set_output_directory(install_root)
    
    160
    +
    
    158 161
             # Setup environment
    
    159 162
             sandbox.set_environment(self.get_environment())
    
    160 163
     
    

  • buildstream/data/projectconfig.yaml
    ... ... @@ -204,3 +204,6 @@ shell:
    204 204
       # Command to run when `bst shell` does not provide a command
    
    205 205
       #
    
    206 206
       command: [ 'sh', '-i' ]
    
    207
    +
    
    208
    +remote-execution:
    
    209
    +  url: ""
    \ No newline at end of file

  • buildstream/element.py
    ... ... @@ -95,6 +95,7 @@ from . import _site
    95 95
     from ._platform import Platform
    
    96 96
     from .plugin import CoreWarnings
    
    97 97
     from .sandbox._config import SandboxConfig
    
    98
    +from .sandbox._sandboxremote import SandboxRemote
    
    98 99
     
    
    99 100
     from .storage.directory import Directory
    
    100 101
     from .storage._filebaseddirectory import FileBasedDirectory
    
    ... ... @@ -250,6 +251,9 @@ class Element(Plugin):
    250 251
             # Extract Sandbox config
    
    251 252
             self.__sandbox_config = self.__extract_sandbox_config(meta)
    
    252 253
     
    
    254
    +        # Extract remote execution URL
    
    255
    +        self.__remote_execution_url = self.__extract_remote_execution_config(meta)
    
    256
    +
    
    253 257
         def __lt__(self, other):
    
    254 258
             return self.name < other.name
    
    255 259
     
    
    ... ... @@ -1545,6 +1549,8 @@ class Element(Plugin):
    1545 1549
                     finally:
    
    1546 1550
                         if collect is not None:
    
    1547 1551
                             try:
    
    1552
    +                            # Sandbox will probably have replaced its virtual directory, so get it again
    
    1553
    +                            sandbox_vroot = sandbox.get_virtual_directory()
    
    1548 1554
                                 collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
    
    1549 1555
                             except VirtualDirectoryError:
    
    1550 1556
                                 # No collect directory existed
    
    ... ... @@ -2117,7 +2123,24 @@ class Element(Plugin):
    2117 2123
             project = self._get_project()
    
    2118 2124
             platform = Platform.get_platform()
    
    2119 2125
     
    
    2120
    -        if directory is not None and os.path.exists(directory):
    
    2126
    +        if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
    
    2127
    +            if not self.__artifacts.has_push_remotes(element=self):
    
    2128
    +                # Give an early warning if remote execution will not work
    
    2129
    +                raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
    
    2130
    +                                   .format(self.name) +
    
    2131
    +                                   "The remote artifact server(s) may not be correctly configured or contactable.")
    
    2132
    +
    
    2133
    +            self.info("Using a remote 'sandbox' for artifact {}".format(self.name))
    
    2134
    +            sandbox = SandboxRemote(context, project,
    
    2135
    +                                    directory,
    
    2136
    +                                    stdout=stdout,
    
    2137
    +                                    stderr=stderr,
    
    2138
    +                                    config=config,
    
    2139
    +                                    server_url=self.__remote_execution_url,
    
    2140
    +                                    allow_real_directory=False)
    
    2141
    +            yield sandbox
    
    2142
    +        elif directory is not None and os.path.exists(directory):
    
    2143
    +            self.info("Using a local sandbox for artifact {}".format(self.name))
    
    2121 2144
                 sandbox = platform.create_sandbox(context, project,
    
    2122 2145
                                                   directory,
    
    2123 2146
                                                   stdout=stdout,
    
    ... ... @@ -2289,6 +2312,18 @@ class Element(Plugin):
    2289 2312
             return SandboxConfig(self.node_get_member(sandbox_config, int, 'build-uid'),
    
    2290 2313
                                  self.node_get_member(sandbox_config, int, 'build-gid'))
    
    2291 2314
     
    
    2315
    +    def __extract_remote_execution_config(self, meta):
    
    2316
    +        if self.__is_junction:
    
    2317
    +            return ''
    
    2318
    +        else:
    
    2319
    +            project = self._get_project()
    
    2320
    +            project.ensure_fully_loaded()
    
    2321
    +            if project._remote_execution:
    
    2322
    +                rexec_config = _yaml.node_chain_copy(project._remote_execution)
    
    2323
    +                return self.node_get_member(rexec_config, str, 'url')
    
    2324
    +            else:
    
    2325
    +                return ''
    
    2326
    +
    
    2292 2327
         # This makes a special exception for the split rules, which
    
    2293 2328
         # elements may extend but whos defaults are defined in the project.
    
    2294 2329
         #
    

  • buildstream/plugins/elements/autotools.py
    ... ... @@ -57,7 +57,7 @@ from buildstream import BuildElement
    57 57
     
    
    58 58
     # Element implementation for the 'autotools' kind.
    
    59 59
     class AutotoolsElement(BuildElement):
    
    60
    -    pass
    
    60
    +    BST_VIRTUAL_DIRECTORY = True
    
    61 61
     
    
    62 62
     
    
    63 63
     # Plugin entry point
    

  • buildstream/sandbox/__init__.py
    ... ... @@ -20,3 +20,4 @@
    20 20
     from .sandbox import Sandbox, SandboxFlags
    
    21 21
     from ._sandboxchroot import SandboxChroot
    
    22 22
     from ._sandboxbwrap import SandboxBwrap
    
    23
    +from ._sandboxremote import SandboxRemote

  • buildstream/sandbox/_sandboxremote.py
    1
    +#!/usr/bin/env python3
    
    2
    +#
    
    3
    +#  Copyright (C) 2018 Codethink Limited
    
    4
    +#
    
    5
    +#  This program is free software; you can redistribute it and/or
    
    6
    +#  modify it under the terms of the GNU Lesser General Public
    
    7
    +#  License as published by the Free Software Foundation; either
    
    8
    +#  version 2 of the License, or (at your option) any later version.
    
    9
    +#
    
    10
    +#  This library is distributed in the hope that it will be useful,
    
    11
    +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    12
    +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    13
    +#  Lesser General Public License for more details.
    
    14
    +#
    
    15
    +#  You should have received a copy of the GNU Lesser General Public
    
    16
    +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    17
    +#
    
    18
    +#  Authors:
    
    19
    +#        Jim MacArthur <jim macarthur codethink co uk>
    
    20
    +
    
    21
    +import os
    
    22
    +import re
    
    23
    +
    
    24
    +import grpc
    
    25
    +
    
    26
    +from . import Sandbox
    
    27
    +from ..storage._filebaseddirectory import FileBasedDirectory
    
    28
    +from ..storage._casbaseddirectory import CasBasedDirectory
    
    29
    +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    30
    +
    
    31
    +from .._artifactcache.cascache import CASCache
    
    32
    +
    
    33
    +
    
    34
    +class SandboxError(Exception):
    
    35
    +    pass
    
    36
    +
    
    37
    +
    
    38
    +# SandboxRemote()
    
    39
    +#
    
    40
    +# This isn't really a sandbox, it's a stub which sends all the source
    
    41
    +# to a remote server and retrieves the results from it.
    
    42
    +#
    
    43
    +class SandboxRemote(Sandbox):
    
    44
    +
    
    45
    +    def __init__(self, *args, **kwargs):
    
    46
    +        super().__init__(*args, **kwargs)
    
    47
    +        self.cascache = None
    
    48
    +        self.server_url = kwargs['server_url']
    
    49
    +        # Check the format of the url ourselves to save the user from
    
    50
    +        # whatever error messages grpc will produce
    
    51
    +        m = re.match(r'^(.+):(\d+)$', self.server_url)
    
    52
    +        if m is None:
    
    53
    +            raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
    
    54
    +                               .format(self.server_url) +
    
    55
    +                               "It should be of the form <protocol>://<domain name>:<port>.")
    
    56
    +
    
    57
    +    def _get_cascache(self):
    
    58
    +        if self.cascache is None:
    
    59
    +            self.cascache = CASCache(self._get_context())
    
    60
    +            self.cascache.setup_remotes(use_config=True)
    
    61
    +        return self.cascache
    
    62
    +
    
    63
    +    def __run_remote_command(self, cascache, command, input_root_digest, environment):
    
    64
    +
    
    65
    +        environment_variables = [remote_execution_pb2.Command.
    
    66
    +                                 EnvironmentVariable(name=k, value=v)
    
    67
    +                                 for (k, v) in environment.items()]
    
    68
    +
    
    69
    +        # Create and send the Command object.
    
    70
    +        remote_command = remote_execution_pb2.Command(arguments=command, environment_variables=environment_variables,
    
    71
    +                                                      output_files=[],
    
    72
    +                                                      output_directories=[self._output_directory],
    
    73
    +                                                      platform=None)
    
    74
    +        command_digest = cascache.add_object(buffer=remote_command.SerializeToString())
    
    75
    +        command_ref = 'worker-command/{}'.format(command_digest.hash)
    
    76
    +        cascache.set_ref(command_ref, command_digest)
    
    77
    +
    
    78
    +        command_push_successful = cascache.push_refs([command_ref], self._get_project(), may_have_dependencies=False)
    
    79
    +        if not command_push_successful and not cascache.verify_key_pushed(command_ref, self._get_project()):
    
    80
    +            # Command push failed
    
    81
    +            return None
    
    82
    +
    
    83
    +        # Create and send the action.
    
    84
    +
    
    85
    +        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    86
    +                                             input_root_digest=input_root_digest,
    
    87
    +                                             timeout=None,
    
    88
    +                                             do_not_cache=True)
    
    89
    +
    
    90
    +        action_digest = cascache.add_object(buffer=action.SerializeToString())
    
    91
    +        action_ref = 'worker-action/{}'.format(command_digest.hash)
    
    92
    +        cascache.set_ref(action_ref, action_digest)
    
    93
    +        action_push_successful = cascache.push_refs([action_ref], self._get_project(), may_have_dependencies=False)
    
    94
    +
    
    95
    +        if not action_push_successful and not cascache.verify_key_pushed(action_ref, self._get_project()):
    
    96
    +            # Action push failed
    
    97
    +            return None
    
    98
    +
    
    99
    +        # Next, try to create a communication channel to the BuildGrid server.
    
    100
    +
    
    101
    +        channel = grpc.insecure_channel(self.server_url)
    
    102
    +        stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    103
    +        request = remote_execution_pb2.ExecuteRequest(instance_name='default',
    
    104
    +                                                      action_digest=action_digest,
    
    105
    +                                                      skip_cache_lookup=True)
    
    106
    +
    
    107
    +        operation_iterator = stub.Execute(request)
    
    108
    +        operation = None
    
    109
    +        with self._get_context().timed_activity("Waiting for the remote build to complete"):
    
    110
    +            # It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here,
    
    111
    +            # which will check the server is actually contactable. However, calling it when the
    
    112
    +            # server is available seems to cause .code() to hang forever.
    
    113
    +            for operation in operation_iterator:
    
    114
    +                if operation.done:
    
    115
    +                    break
    
    116
    +        return operation
    
    117
    +
    
    118
    +    def process_job_output(self, output_directories, output_files):
    
    119
    +        # output_directories is an array of OutputDirectory objects.
    
    120
    +        # output_files is an array of OutputFile objects.
    
    121
    +        #
    
    122
    +        # We only specify one output_directory, so it's an error
    
    123
    +        # for there to be any output files or more than one directory at the moment.
    
    124
    +
    
    125
    +        if output_files:
    
    126
    +            raise SandboxError("Output files were returned when we didn't request any.")
    
    127
    +        elif len(output_directories) > 1:
    
    128
    +            error_text = "More than one output directory was returned from the build server: {}"
    
    129
    +            raise SandboxError(error_text.format(output_directories))
    
    130
    +        elif len(output_directories) < 1:  # pylint: disable=len-as-condition
    
    131
    +            error_text = "No output directory was returned from the build server."
    
    132
    +            raise SandboxError(error_text)
    
    133
    +
    
    134
    +        digest = output_directories[0].tree_digest
    
    135
    +        if digest is None or digest.hash is None or digest.hash == "":
    
    136
    +            raise SandboxError("Output directory structure had no digest attached.")
    
    137
    +
    
    138
    +        # Now do a pull to ensure we have the necessary parts.
    
    139
    +        cascache = self._get_cascache()
    
    140
    +        cascache.pull_key(digest.hash, digest.size_bytes, self._get_project())
    
    141
    +        path_components = os.path.split(self._output_directory)
    
    142
    +
    
    143
    +        # Now what we have is a digest for the output. Once we return, the calling process will
    
    144
    +        # attempt to descend into our directory and find that directory, so we need to overwrite
    
    145
    +        # that.
    
    146
    +
    
    147
    +        if not path_components:
    
    148
    +            # The artifact wants the whole directory; we could just return the returned hash in its
    
    149
    +            # place, but we don't have a means to do that yet.
    
    150
    +            raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
    
    151
    +
    
    152
    +        # At the moment, we will get the whole directory back in the first directory argument and we need
    
    153
    +        # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
    
    154
    +        # from another hash will be interesting, though...
    
    155
    +
    
    156
    +        new_dir = CasBasedDirectory(self._get_context(), ref=digest)
    
    157
    +        self._set_virtual_directory(new_dir)
    
    158
    +
    
    159
    +    def run(self, command, flags, *, cwd=None, env=None):
    
    160
    +        # Upload sources
    
    161
    +        upload_vdir = self.get_virtual_directory()
    
    162
    +
    
    163
    +        if isinstance(upload_vdir, FileBasedDirectory):
    
    164
    +            # Make a new temporary directory to put source in
    
    165
    +            upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
    
    166
    +            upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
    
    167
    +
    
    168
    +        # Now, push that key (without necessarily needing a ref) to the remote.
    
    169
    +        cascache = self._get_cascache()
    
    170
    +
    
    171
    +        ref = 'worker-source/{}'.format(upload_vdir.ref.hash)
    
    172
    +        upload_vdir._save(ref)
    
    173
    +        source_push_successful = cascache.push_refs([ref], self._get_project())
    
    174
    +
    
    175
    +        # Set up environment and PWD
    
    176
    +        if env is None:
    
    177
    +            env = self._get_environment()
    
    178
    +        if 'PWD' not in env:
    
    179
    +            env['PWD'] = self._get_work_directory()
    
    180
    +
    
    181
    +        # We want command args as a list of strings
    
    182
    +        if isinstance(command, str):
    
    183
    +            command = [command]
    
    184
    +
    
    185
    +        # Now transmit the command to execute
    
    186
    +        if source_push_successful or cascache.verify_key_pushed(ref, self._get_project()):
    
    187
    +            response = self.__run_remote_command(cascache, command, upload_vdir.ref, env)
    
    188
    +
    
    189
    +            if response is None:
    
    190
    +                # Failure of remote execution, usually due to an error in BuildStream
    
    191
    +                # NB This error could be raised in __run_remote_command
    
    192
    +                raise SandboxError("No response returned from server")
    
    193
    +
    
    194
    +            assert(response.HasField("error") or response.HasField("response"))
    
    195
    +
    
    196
    +            if response.HasField("error"):
    
    197
    +                # A normal error during the build: the remote execution system
    
    198
    +                # has worked correctly but the command failed.
    
    199
    +                # response.error also contains 'message' (str) and 'details'
    
    200
    +                # (iterator of Any) which we ignore at the moment.
    
    201
    +                return response.error.code
    
    202
    +            else:
    
    203
    +
    
    204
    +                # At the moment, response can either be an
    
    205
    +                # ExecutionResponse containing an ActionResult, or an
    
    206
    +                # ActionResult directly.
    
    207
    +                executeResponse = remote_execution_pb2.ExecuteResponse()
    
    208
    +                if response.response.Is(executeResponse.DESCRIPTOR):
    
    209
    +                    # Unpack ExecuteResponse and set response to its response
    
    210
    +                    response.response.Unpack(executeResponse)
    
    211
    +                    response = executeResponse
    
    212
    +
    
    213
    +                actionResult = remote_execution_pb2.ActionResult()
    
    214
    +                if response.response.Is(actionResult.DESCRIPTOR):
    
    215
    +                    response.response.Unpack(actionResult)
    
    216
    +                    self.process_job_output(actionResult.output_directories, actionResult.output_files)
    
    217
    +                else:
    
    218
    +                    raise SandboxError("Received unknown message from server (expected ExecutionResponse).")
    
    219
    +        else:
    
    220
    +            raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    221
    +        return 0

  • buildstream/sandbox/sandbox.py
    ... ... @@ -99,9 +99,11 @@ class Sandbox():
    99 99
             self.__stdout = kwargs['stdout']
    
    100 100
             self.__stderr = kwargs['stderr']
    
    101 101
     
    
    102
    -        # Setup the directories. Root should be available to subclasses, hence
    
    103
    -        # being single-underscore. The others are private to this class.
    
    102
    +        # Setup the directories. Root and output_directory should be
    
    103
    +        # available to subclasses, hence being single-underscore. The
    
    104
    +        # others are private to this class.
    
    104 105
             self._root = os.path.join(directory, 'root')
    
    106
    +        self._output_directory = None
    
    105 107
             self.__directory = directory
    
    106 108
             self.__scratch = os.path.join(self.__directory, 'scratch')
    
    107 109
             for directory_ in [self._root, self.__scratch]:
    
    ... ... @@ -144,11 +146,30 @@ class Sandbox():
    144 146
                     self._vdir = FileBasedDirectory(self._root)
    
    145 147
             return self._vdir
    
    146 148
     
    
    149
    +    def _set_virtual_directory(self, vdir):
    
    150
    +        """ Sets virtual directory. Useful after remote execution
    
    151
    +        has rewritten the working directory.
    
    152
    +        """
    
    153
    +        self._vdir = vdir
    
    154
    +
    
    155
    +    def get_virtual_toplevel_directory(self):
    
    156
    +        """Fetches the sandbox's toplevel directory
    
    157
    +
    
    158
    +        The toplevel directory contains 'root', 'scratch' and later
    
    159
    +        'artifact' where output is copied to.
    
    160
    +
    
    161
    +        Returns:
    
    162
    +           (str): The sandbox toplevel directory
    
    163
    +
    
    164
    +        """
    
    165
    +        # For now, just create a new Directory every time we're asked
    
    166
    +        return FileBasedDirectory(self.__directory)
    
    167
    +
    
    147 168
         def set_environment(self, environment):
    
    148 169
             """Sets the environment variables for the sandbox
    
    149 170
     
    
    150 171
             Args:
    
    151
    -           directory (dict): The environment variables to use in the sandbox
    
    172
    +           environment (dict): The environment variables to use in the sandbox
    
    152 173
             """
    
    153 174
             self.__env = environment
    
    154 175
     
    
    ... ... @@ -160,6 +181,15 @@ class Sandbox():
    160 181
             """
    
    161 182
             self.__cwd = directory
    
    162 183
     
    
    184
    +    def set_output_directory(self, directory):
    
    185
    +        """Sets the output directory - the directory which is preserved
    
    186
    +        as an artifact after assembly.
    
    187
    +
    
    188
    +        Args:
    
    189
    +           directory (str): An absolute path within the sandbox
    
    190
    +        """
    
    191
    +        self._output_directory = directory
    
    192
    +
    
    163 193
         def mark_directory(self, directory, *, artifact=False):
    
    164 194
             """Marks a sandbox directory and ensures it will exist
    
    165 195
     
    

  • buildstream/source.py
    ... ... @@ -794,7 +794,7 @@ class Source(Plugin):
    794 794
                     # Save the ref in the originating file
    
    795 795
                     #
    
    796 796
                     try:
    
    797
    -                    _yaml.dump(_yaml.node_sanitize(provenance.toplevel), provenance.filename.name)
    
    797
    +                    _yaml.dump(provenance.toplevel, provenance.filename.name)
    
    798 798
                     except OSError as e:
    
    799 799
                         raise SourceError("{}: Error saving source reference to '{}': {}"
    
    800 800
                                           .format(self, provenance.filename.name, e),
    

  • buildstream/storage/_casbaseddirectory.py
    ... ... @@ -358,6 +358,20 @@ class CasBasedDirectory(Directory):
    358 358
                         result.files_written.append(relative_pathname)
    
    359 359
             return result
    
    360 360
     
    
    361
    +    def _save(self, name):
    
    362
    +        """ Saves this directory into the content cache as a named ref. This function is not
    
    363
    +        currently in use, but may be useful later. """
    
    364
    +        self._recalculate_recursing_up()
    
    365
    +        self._recalculate_recursing_down()
    
    366
    +        (rel_refpath, refname) = os.path.split(name)
    
    367
    +        refdir = os.path.join(self.cas_directory, 'refs', 'heads', rel_refpath)
    
    368
    +        refname = os.path.join(refdir, refname)
    
    369
    +
    
    370
    +        if not os.path.exists(refdir):
    
    371
    +            os.makedirs(refdir)
    
    372
    +        with open(refname, "wb") as f:
    
    373
    +            f.write(self.ref.SerializeToString())
    
    374
    +
    
    361 375
         def import_files(self, external_pathspec, *, files=None,
    
    362 376
                          report_written=True, update_utimes=False,
    
    363 377
                          can_link=False):
    

  • dev-requirements.txt
    1 1
     coverage == 4.4.0
    
    2 2
     pep8
    
    3
    -pytest >= 3.1.0
    
    3
    +pylint == 2.1.1
    
    4
    +pytest >= 3.7
    
    4 5
     pytest-cov >= 2.5.0
    
    5 6
     pytest-datafiles
    
    6 7
     pytest-env
    

  • doc/source/format_project.rst
    ... ... @@ -204,6 +204,23 @@ with an artifact share.
    204 204
     You can also specify a list of caches here; earlier entries in the list
    
    205 205
     will have higher priority than later ones.
    
    206 206
     
    
    207
    +Remote execution
    
    208
    +~~~~~~~~~~~~~~~~
    
    209
    +Buildstream supports remote execution using the Google Remote Execution API
    
    210
    +(REAPI). A description of how remote execution works is beyond the scope
    
    211
    +of this document, but you can specify a remote server complying with the REAPI
    
    212
    +using the `remote-execution` option:
    
    213
    +
    
    214
    +.. code:: yaml
    
    215
    +
    
    216
    +  remote-execution:
    
    217
    +
    
    218
    +    # A url defining a remote execution server
    
    219
    +    url: buildserver.example.com:50051
    
    220
    +
    
    221
    +The url should be a hostname and port separated by ':'. Do not include a protocol.
    
    222
    +
    
    223
    +The Remote Execution API can be found via https://github.com/bazelbuild/remote-apis.
    
    207 224
     
    
    208 225
     .. _project_essentials_mirrors:
    
    209 226
     
    

  • tests/artifactcache/cache_size.py
    1
    +import os
    
    2
    +import pytest
    
    3
    +
    
    4
    +from buildstream import _yaml
    
    5
    +from buildstream._artifactcache import CACHE_SIZE_FILE
    
    6
    +
    
    7
    +from tests.testutils import cli, create_element_size
    
    8
    +
    
    9
    +# XXX: Currently lacking:
    
    10
    +#      * A way to check whether it's faster to read cache size on
    
    11
    +#        successive invocations.
    
    12
    +#      * A way to check whether the cache size file has been read.
    
    13
    +
    
    14
    +
    
    15
    +def create_project(project_dir):
    
    16
    +    project_file = os.path.join(project_dir, "project.conf")
    
    17
    +    project_conf = {
    
    18
    +        "name": "test"
    
    19
    +    }
    
    20
    +    _yaml.dump(project_conf, project_file)
    
    21
    +    element_name = "test.bst"
    
    22
    +    create_element_size(element_name, project_dir, ".", [], 1024)
    
    23
    +
    
    24
    +
    
    25
    +def test_cache_size_roundtrip(cli, tmpdir):
    
    26
    +    # Builds (to put files in the cache), then invokes buildstream again
    
    27
    +    # to check nothing breaks
    
    28
    +
    
    29
    +    # Create project
    
    30
    +    project_dir = str(tmpdir)
    
    31
    +    create_project(project_dir)
    
    32
    +
    
    33
    +    # Build, to populate the cache
    
    34
    +    res = cli.run(project=project_dir, args=["build", "test.bst"])
    
    35
    +    res.assert_success()
    
    36
    +
    
    37
    +    # Show, to check that nothing breaks while reading cache size
    
    38
    +    res = cli.run(project=project_dir, args=["show", "test.bst"])
    
    39
    +    res.assert_success()
    
    40
    +
    
    41
    +
    
    42
    +def test_cache_size_write(cli, tmpdir):
    
    43
    +    # Builds (to put files in the cache), then checks a number is
    
    44
    +    # written to the cache size file.
    
    45
    +
    
    46
    +    project_dir = str(tmpdir)
    
    47
    +    create_project(project_dir)
    
    48
    +
    
    49
    +    # Artifact cache must be in a known place
    
    50
    +    artifactdir = os.path.join(project_dir, "artifacts")
    
    51
    +    cli.configure({"artifactdir": artifactdir})
    
    52
    +
    
    53
    +    # Build, to populate the cache
    
    54
    +    res = cli.run(project=project_dir, args=["build", "test.bst"])
    
    55
    +    res.assert_success()
    
    56
    +
    
    57
    +    # Inspect the artifact cache
    
    58
    +    sizefile = os.path.join(artifactdir, CACHE_SIZE_FILE)
    
    59
    +    assert os.path.isfile(sizefile)
    
    60
    +    with open(sizefile, "r") as f:
    
    61
    +        size_data = f.read()
    
    62
    +    size = int(size_data)

  • tests/testutils/artifactshare.py
    ... ... @@ -140,6 +140,7 @@ class ArtifactShare():
    140 140
     
    
    141 141
             return statvfs_result(f_blocks=self.total_space,
    
    142 142
                                   f_bfree=self.free_space - repo_size,
    
    143
    +                              f_bavail=self.free_space - repo_size,
    
    143 144
                                   f_bsize=1)
    
    144 145
     
    
    145 146
     
    
    ... ... @@ -156,4 +157,4 @@ def create_artifact_share(directory, *, total_space=None, free_space=None):
    156 157
             share.close()
    
    157 158
     
    
    158 159
     
    
    159
    -statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize')
    160
    +statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize f_bavail')



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]