[Notes] [Git][BuildStream/buildstream][jmac/remote_execution_client] 23 commits: Improve documentation for artifact cache installation



Title: GitLab

Martin Blanchard pushed to branch jmac/remote_execution_client at BuildStream / buildstream

Commits:

27 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -19,6 +19,7 @@
    19 19
     
    
    20 20
     import hashlib
    
    21 21
     import itertools
    
    22
    +import io
    
    22 23
     import multiprocessing
    
    23 24
     import os
    
    24 25
     import signal
    
    ... ... @@ -76,6 +77,7 @@ class CASCache(ArtifactCache):
    76 77
         ################################################
    
    77 78
         #     Implementation of abstract methods       #
    
    78 79
         ################################################
    
    80
    +
    
    79 81
         def contains(self, element, key):
    
    80 82
             refpath = self._refpath(self.get_artifact_fullname(element, key))
    
    81 83
     
    
    ... ... @@ -259,6 +261,25 @@ class CASCache(ArtifactCache):
    259 261
     
    
    260 262
             return False
    
    261 263
     
    
    264
    +    def pull_tree(self, project, digest):
    
    265
    +        """ Pull a single Tree rather than an artifact.
    
    266
    +        Does not update local refs. """
    
    267
    +
    
    268
    +        for remote in self._remotes[project]:
    
    269
    +            try:
    
    270
    +                remote.init()
    
    271
    +
    
    272
    +                digest = self._fetch_tree(remote, digest)
    
    273
    +
    
    274
    +                # no need to pull from additional remotes
    
    275
    +                return digest
    
    276
    +
    
    277
    +            except grpc.RpcError as e:
    
    278
    +                if e.code() != grpc.StatusCode.NOT_FOUND:
    
    279
    +                    raise
    
    280
    +
    
    281
    +        return None
    
    282
    +
    
    262 283
         def link_key(self, element, oldkey, newkey):
    
    263 284
             oldref = self.get_artifact_fullname(element, oldkey)
    
    264 285
             newref = self.get_artifact_fullname(element, newkey)
    
    ... ... @@ -267,8 +288,46 @@ class CASCache(ArtifactCache):
    267 288
     
    
    268 289
             self.set_ref(newref, tree)
    
    269 290
     
    
    291
    +    def _push_refs_to_remote(self, refs, remote):
    
    292
    +        skipped_remote = True
    
    293
    +        try:
    
    294
    +            for ref in refs:
    
    295
    +                tree = self.resolve_ref(ref)
    
    296
    +
    
    297
    +                # Check whether ref is already on the server in which case
    
    298
    +                # there is no need to push the artifact
    
    299
    +                try:
    
    300
    +                    request = buildstream_pb2.GetReferenceRequest()
    
    301
    +                    request.key = ref
    
    302
    +                    response = remote.ref_storage.GetReference(request)
    
    303
    +
    
    304
    +                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    305
    +                        # ref is already on the server with the same tree
    
    306
    +                        continue
    
    307
    +
    
    308
    +                except grpc.RpcError as e:
    
    309
    +                    if e.code() != grpc.StatusCode.NOT_FOUND:
    
    310
    +                        # Intentionally re-raise RpcError for outer except block.
    
    311
    +                        raise
    
    312
    +
    
    313
    +                self._send_directory(remote, tree)
    
    314
    +
    
    315
    +                request = buildstream_pb2.UpdateReferenceRequest()
    
    316
    +                request.keys.append(ref)
    
    317
    +                request.digest.hash = tree.hash
    
    318
    +                request.digest.size_bytes = tree.size_bytes
    
    319
    +                remote.ref_storage.UpdateReference(request)
    
    320
    +
    
    321
    +                skipped_remote = False
    
    322
    +        except grpc.RpcError as e:
    
    323
    +            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    324
    +                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    325
    +
    
    326
    +        return not skipped_remote
    
    327
    +
    
    270 328
         def push(self, element, keys):
    
    271
    -        refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    329
    +
    
    330
    +        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
    
    272 331
     
    
    273 332
             project = element._get_project()
    
    274 333
     
    
    ... ... @@ -278,97 +337,80 @@ class CASCache(ArtifactCache):
    278 337
     
    
    279 338
             for remote in push_remotes:
    
    280 339
                 remote.init()
    
    281
    -            skipped_remote = True
    
    282
    -            element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
    
    283 340
     
    
    284
    -            try:
    
    285
    -                for ref in refs:
    
    286
    -                    tree = self.resolve_ref(ref)
    
    287
    -
    
    288
    -                    # Check whether ref is already on the server in which case
    
    289
    -                    # there is no need to push the artifact
    
    290
    -                    try:
    
    291
    -                        request = buildstream_pb2.GetReferenceRequest()
    
    292
    -                        request.key = ref
    
    293
    -                        response = remote.ref_storage.GetReference(request)
    
    294
    -
    
    295
    -                        if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    296
    -                            # ref is already on the server with the same tree
    
    297
    -                            continue
    
    298
    -
    
    299
    -                    except grpc.RpcError as e:
    
    300
    -                        if e.code() != grpc.StatusCode.NOT_FOUND:
    
    301
    -                            # Intentionally re-raise RpcError for outer except block.
    
    302
    -                            raise
    
    303
    -
    
    304
    -                    missing_blobs = {}
    
    305
    -                    required_blobs = self._required_blobs(tree)
    
    306
    -
    
    307
    -                    # Limit size of FindMissingBlobs request
    
    308
    -                    for required_blobs_group in _grouper(required_blobs, 512):
    
    309
    -                        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    310
    -
    
    311
    -                        for required_digest in required_blobs_group:
    
    312
    -                            d = request.blob_digests.add()
    
    313
    -                            d.hash = required_digest.hash
    
    314
    -                            d.size_bytes = required_digest.size_bytes
    
    315
    -
    
    316
    -                        response = remote.cas.FindMissingBlobs(request)
    
    317
    -                        for digest in response.missing_blob_digests:
    
    318
    -                            d = remote_execution_pb2.Digest()
    
    319
    -                            d.hash = digest.hash
    
    320
    -                            d.size_bytes = digest.size_bytes
    
    321
    -                            missing_blobs[d.hash] = d
    
    322
    -
    
    323
    -                    # Upload any blobs missing on the server
    
    324
    -                    skipped_remote = False
    
    325
    -                    for digest in missing_blobs.values():
    
    326
    -                        uuid_ = uuid.uuid4()
    
    327
    -                        resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
    
    328
    -                                                  digest.hash, str(digest.size_bytes)])
    
    329
    -
    
    330
    -                        def request_stream(resname):
    
    331
    -                            with open(self.objpath(digest), 'rb') as f:
    
    332
    -                                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    333
    -                                offset = 0
    
    334
    -                                finished = False
    
    335
    -                                remaining = digest.size_bytes
    
    336
    -                                while not finished:
    
    337
    -                                    chunk_size = min(remaining, 64 * 1024)
    
    338
    -                                    remaining -= chunk_size
    
    339
    -
    
    340
    -                                    request = bytestream_pb2.WriteRequest()
    
    341
    -                                    request.write_offset = offset
    
    342
    -                                    # max. 64 kB chunks
    
    343
    -                                    request.data = f.read(chunk_size)
    
    344
    -                                    request.resource_name = resname
    
    345
    -                                    request.finish_write = remaining <= 0
    
    346
    -                                    yield request
    
    347
    -                                    offset += chunk_size
    
    348
    -                                    finished = request.finish_write
    
    349
    -                        response = remote.bytestream.Write(request_stream(resource_name))
    
    350
    -
    
    351
    -                    request = buildstream_pb2.UpdateReferenceRequest()
    
    352
    -                    request.keys.append(ref)
    
    353
    -                    request.digest.hash = tree.hash
    
    354
    -                    request.digest.size_bytes = tree.size_bytes
    
    355
    -                    remote.ref_storage.UpdateReference(request)
    
    356
    -
    
    357
    -                    pushed = True
    
    358
    -
    
    359
    -            except grpc.RpcError as e:
    
    360
    -                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    361
    -                    raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    341
    +            element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
    
    362 342
     
    
    363
    -            if skipped_remote:
    
    343
    +            if self._push_refs_to_remote(refs, remote):
    
    344
    +                pushed = True
    
    345
    +            else:
    
    364 346
                     self.context.message(Message(
    
    365 347
                         None,
    
    366 348
                         MessageType.SKIPPED,
    
    367 349
                         "Remote ({}) already has {} cached".format(
    
    368 350
                             remote.spec.url, element._get_brief_display_key())
    
    369 351
                     ))
    
    352
    +
    
    353
    +        return pushed
    
    354
    +
    
    355
    +    def push_directory(self, project, directory):
    
    356
    +
    
    357
    +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    358
    +
    
    359
    +        if directory.ref is None:
    
    360
    +            return None
    
    361
    +
    
    362
    +        for remote in push_remotes:
    
    363
    +            remote.init()
    
    364
    +
    
    365
    +            self._send_directory(remote, directory.ref)
    
    366
    +
    
    367
    +        return directory.ref
    
    368
    +
    
    369
    +    def push_message(self, project, message):
    
    370
    +
    
    371
    +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    372
    +
    
    373
    +        message_buffer = message.SerializeToString()
    
    374
    +        message_sha = hashlib.sha256(message_buffer)
    
    375
    +        message_digest = remote_execution_pb2.Digest()
    
    376
    +        message_digest.hash = message_sha.hexdigest()
    
    377
    +        message_digest.size_bytes = len(message_buffer)
    
    378
    +
    
    379
    +        for remote in push_remotes:
    
    380
    +            remote.init()
    
    381
    +
    
    382
    +            with io.BytesIO(message_buffer) as b:
    
    383
    +                self._send_blob(remote, message_digest, b)
    
    384
    +
    
    385
    +        return message_digest
    
    386
    +
    
    387
    +    def _verify_digest_on_remote(self, remote, digest):
    
    388
    +        # Check whether ref is already on the server in which case
    
    389
    +        # there is no need to push the artifact
    
    390
    +        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    391
    +        request.blob_digests.extend([digest])
    
    392
    +
    
    393
    +        response = remote.cas.FindMissingBlobs(request)
    
    394
    +        if digest in response.missing_blob_digests:
    
    395
    +            return False
    
    396
    +
    
    397
    +        return True
    
    398
    +
    
    399
    +    def verify_digest_pushed(self, project, digest):
    
    400
    +
    
    401
    +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    402
    +
    
    403
    +        pushed = False
    
    404
    +
    
    405
    +        for remote in push_remotes:
    
    406
    +            remote.init()
    
    407
    +
    
    408
    +            if self._verify_digest_on_remote(remote, digest):
    
    409
    +                pushed = True
    
    410
    +
    
    370 411
             return pushed
    
    371 412
     
    
    413
    +
    
    372 414
         ################################################
    
    373 415
         #                API Private Methods           #
    
    374 416
         ################################################
    
    ... ... @@ -599,6 +641,7 @@ class CASCache(ArtifactCache):
    599 641
         ################################################
    
    600 642
         #             Local Private Methods            #
    
    601 643
         ################################################
    
    644
    +
    
    602 645
         def _checkout(self, dest, tree):
    
    603 646
             os.makedirs(dest, exist_ok=True)
    
    604 647
     
    
    ... ... @@ -761,16 +804,16 @@ class CASCache(ArtifactCache):
    761 804
                 #
    
    762 805
                 q.put(str(e))
    
    763 806
     
    
    764
    -    def _required_blobs(self, tree):
    
    807
    +    def _required_blobs(self, directory_digest):
    
    765 808
             # parse directory, and recursively add blobs
    
    766 809
             d = remote_execution_pb2.Digest()
    
    767
    -        d.hash = tree.hash
    
    768
    -        d.size_bytes = tree.size_bytes
    
    810
    +        d.hash = directory_digest.hash
    
    811
    +        d.size_bytes = directory_digest.size_bytes
    
    769 812
             yield d
    
    770 813
     
    
    771 814
             directory = remote_execution_pb2.Directory()
    
    772 815
     
    
    773
    -        with open(self.objpath(tree), 'rb') as f:
    
    816
    +        with open(self.objpath(directory_digest), 'rb') as f:
    
    774 817
                 directory.ParseFromString(f.read())
    
    775 818
     
    
    776 819
             for filenode in directory.files:
    
    ... ... @@ -782,16 +825,16 @@ class CASCache(ArtifactCache):
    782 825
             for dirnode in directory.directories:
    
    783 826
                 yield from self._required_blobs(dirnode.digest)
    
    784 827
     
    
    785
    -    def _fetch_blob(self, remote, digest, out):
    
    828
    +    def _fetch_blob(self, remote, digest, stream):
    
    786 829
             resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    787 830
             request = bytestream_pb2.ReadRequest()
    
    788 831
             request.resource_name = resource_name
    
    789 832
             request.read_offset = 0
    
    790 833
             for response in remote.bytestream.Read(request):
    
    791
    -            out.write(response.data)
    
    834
    +            stream.write(response.data)
    
    835
    +        stream.flush()
    
    792 836
     
    
    793
    -        out.flush()
    
    794
    -        assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    837
    +        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    795 838
     
    
    796 839
         def _fetch_directory(self, remote, tree):
    
    797 840
             objpath = self.objpath(tree)
    
    ... ... @@ -827,6 +870,92 @@ class CASCache(ArtifactCache):
    827 870
                 digest = self.add_object(path=out.name)
    
    828 871
                 assert digest.hash == tree.hash
    
    829 872
     
    
    873
    +    def _fetch_tree(self, remote, digest):
    
    874
    +        # download but do not store the Tree object
    
    875
    +        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    876
    +            self._fetch_blob(remote, digest, out)
    
    877
    +
    
    878
    +            tree = remote_execution_pb2.Tree()
    
    879
    +
    
    880
    +            with open(out.name, 'rb') as f:
    
    881
    +                tree.ParseFromString(f.read())
    
    882
    +
    
    883
    +            tree.children.extend([tree.root])
    
    884
    +            for directory in tree.children:
    
    885
    +                for filenode in directory.files:
    
    886
    +                    fileobjpath = self.objpath(filenode.digest)
    
    887
    +                    if os.path.exists(fileobjpath):
    
    888
    +                        # already in local cache
    
    889
    +                        continue
    
    890
    +
    
    891
    +                    with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    892
    +                        self._fetch_blob(remote, filenode.digest, f)
    
    893
    +
    
    894
    +                        added_digest = self.add_object(path=f.name)
    
    895
    +                        assert added_digest.hash == filenode.digest.hash
    
    896
    +
    
    897
    +                # place directory blob only in final location when we've downloaded
    
    898
    +                # all referenced blobs to avoid dangling references in the repository
    
    899
    +                dirbuffer = directory.SerializeToString()
    
    900
    +                dirdigest = self.add_object(buffer=dirbuffer)
    
    901
    +                assert dirdigest.size_bytes == len(dirbuffer)
    
    902
    +
    
    903
    +        return dirdigest
    
    904
    +
    
    905
    +    def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
    
    906
    +        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
    
    907
    +                                  digest.hash, str(digest.size_bytes)])
    
    908
    +
    
    909
    +        def request_stream(resname, instream):
    
    910
    +            offset = 0
    
    911
    +            finished = False
    
    912
    +            remaining = digest.size_bytes
    
    913
    +            while not finished:
    
    914
    +                chunk_size = min(remaining, 64 * 1024)
    
    915
    +                remaining -= chunk_size
    
    916
    +
    
    917
    +                request = bytestream_pb2.WriteRequest()
    
    918
    +                request.write_offset = offset
    
    919
    +                # max. 64 kB chunks
    
    920
    +                request.data = instream.read(chunk_size)
    
    921
    +                request.resource_name = resname
    
    922
    +                request.finish_write = remaining <= 0
    
    923
    +
    
    924
    +                yield request
    
    925
    +
    
    926
    +                offset += chunk_size
    
    927
    +                finished = request.finish_write
    
    928
    +
    
    929
    +        response = remote.bytestream.Write(request_stream(resource_name, stream))
    
    930
    +
    
    931
    +        assert response.committed_size == digest.size_bytes
    
    932
    +
    
    933
    +    def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
    
    934
    +        required_blobs = self._required_blobs(digest)
    
    935
    +
    
    936
    +        missing_blobs = dict()
    
    937
    +        # Limit size of FindMissingBlobs request
    
    938
    +        for required_blobs_group in _grouper(required_blobs, 512):
    
    939
    +            request = remote_execution_pb2.FindMissingBlobsRequest()
    
    940
    +
    
    941
    +            for required_digest in required_blobs_group:
    
    942
    +                d = request.blob_digests.add()
    
    943
    +                d.hash = required_digest.hash
    
    944
    +                d.size_bytes = required_digest.size_bytes
    
    945
    +
    
    946
    +            response = remote.cas.FindMissingBlobs(request)
    
    947
    +            for missing_digest in response.missing_blob_digests:
    
    948
    +                d = remote_execution_pb2.Digest()
    
    949
    +                d.hash = missing_digest.hash
    
    950
    +                d.size_bytes = missing_digest.size_bytes
    
    951
    +                missing_blobs[d.hash] = d
    
    952
    +
    
    953
    +        # Upload any blobs missing on the server
    
    954
    +        for blob_digest in missing_blobs.values():
    
    955
    +            with open(self.objpath(blob_digest), 'rb') as f:
    
    956
    +                assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
    
    957
    +                self._send_blob(remote, blob_digest, f, u_uid=u_uid)
    
    958
    +
    
    830 959
     
    
    831 960
     # Represents a single remote CAS cache.
    
    832 961
     #
    

  • buildstream/_project.py
    ... ... @@ -129,6 +129,7 @@ class Project():
    129 129
     
    
    130 130
             self.artifact_cache_specs = None
    
    131 131
             self._sandbox = None
    
    132
    +        self._remote_execution = None
    
    132 133
             self._splits = None
    
    133 134
     
    
    134 135
             self._context.add_project(self)
    
    ... ... @@ -471,7 +472,7 @@ class Project():
    471 472
                 'aliases', 'name',
    
    472 473
                 'artifacts', 'options',
    
    473 474
                 'fail-on-overlap', 'shell', 'fatal-warnings',
    
    474
    -            'ref-storage', 'sandbox', 'mirrors'
    
    475
    +            'ref-storage', 'sandbox', 'mirrors', 'remote-execution'
    
    475 476
             ])
    
    476 477
     
    
    477 478
             #
    
    ... ... @@ -489,6 +490,9 @@ class Project():
    489 490
             # Load sandbox configuration
    
    490 491
             self._sandbox = _yaml.node_get(config, Mapping, 'sandbox')
    
    491 492
     
    
    493
    +        # Load remote execution configuration
    
    494
    +        self._remote_execution = _yaml.node_get(config, Mapping, 'remote-execution')
    
    495
    +
    
    492 496
             # Load project split rules
    
    493 497
             self._splits = _yaml.node_get(config, Mapping, 'split-rules')
    
    494 498
     
    

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -109,7 +109,7 @@ class Job():
    109 109
             # Private members
    
    110 110
             #
    
    111 111
             self._scheduler = scheduler            # The scheduler
    
    112
    -        self._queue = multiprocessing.Queue()  # A message passing queue
    
    112
    +        self._queue = None                     # A message passing queue
    
    113 113
             self._process = None                   # The Process object
    
    114 114
             self._watcher = None                   # Child process watcher
    
    115 115
             self._listening = False                # Whether the parent is currently listening
    
    ... ... @@ -130,6 +130,8 @@ class Job():
    130 130
         #
    
    131 131
         def spawn(self):
    
    132 132
     
    
    133
    +        self._queue = multiprocessing.Queue()
    
    134
    +
    
    133 135
             self._tries += 1
    
    134 136
             self._parent_start_listening()
    
    135 137
     
    
    ... ... @@ -552,6 +554,9 @@ class Job():
    552 554
             self.parent_complete(returncode == RC_OK, self._result)
    
    553 555
             self._scheduler.job_completed(self, returncode == RC_OK)
    
    554 556
     
    
    557
    +        # Force the deletion of the queue and process objects to try and clean up FDs
    
    558
    +        self._queue = self._process = None
    
    559
    +
    
    555 560
         # _parent_process_envelope()
    
    556 561
         #
    
    557 562
         # Processes a message Envelope deserialized form the message queue.
    

  • buildstream/buildelement.py
    ... ... @@ -155,6 +155,9 @@ class BuildElement(Element):
    155 155
                 command_dir = build_root
    
    156 156
             sandbox.set_work_directory(command_dir)
    
    157 157
     
    
    158
    +        # Tell sandbox which directory is preserved in the finished artifact
    
    159
    +        sandbox.set_output_directory(install_root)
    
    160
    +
    
    158 161
             # Setup environment
    
    159 162
             sandbox.set_environment(self.get_environment())
    
    160 163
     
    

  • buildstream/data/projectconfig.yaml
    ... ... @@ -204,3 +204,6 @@ shell:
    204 204
       # Command to run when `bst shell` does not provide a command
    
    205 205
       #
    
    206 206
       command: [ 'sh', '-i' ]
    
    207
    +
    
    208
    +remote-execution:
    
    209
    +  url: ""
    \ No newline at end of file

  • buildstream/element.py
    ... ... @@ -95,6 +95,7 @@ from . import _site
    95 95
     from ._platform import Platform
    
    96 96
     from .plugin import CoreWarnings
    
    97 97
     from .sandbox._config import SandboxConfig
    
    98
    +from .sandbox._sandboxremote import SandboxRemote
    
    98 99
     
    
    99 100
     from .storage.directory import Directory
    
    100 101
     from .storage._filebaseddirectory import FileBasedDirectory
    
    ... ... @@ -250,6 +251,9 @@ class Element(Plugin):
    250 251
             # Extract Sandbox config
    
    251 252
             self.__sandbox_config = self.__extract_sandbox_config(meta)
    
    252 253
     
    
    254
    +        # Extract remote execution URL
    
    255
    +        self.__remote_execution_url = self.__extract_remote_execution_config(meta)
    
    256
    +
    
    253 257
         def __lt__(self, other):
    
    254 258
             return self.name < other.name
    
    255 259
     
    
    ... ... @@ -1570,6 +1574,8 @@ class Element(Plugin):
    1570 1574
                     finally:
    
    1571 1575
                         if collect is not None:
    
    1572 1576
                             try:
    
    1577
    +                            # Sandbox will probably have replaced its virtual directory, so get it again
    
    1578
    +                            sandbox_vroot = sandbox.get_virtual_directory()
    
    1573 1579
                                 collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
    
    1574 1580
                             except VirtualDirectoryError:
    
    1575 1581
                                 # No collect directory existed
    
    ... ... @@ -2146,7 +2152,32 @@ class Element(Plugin):
    2146 2152
             project = self._get_project()
    
    2147 2153
             platform = Platform.get_platform()
    
    2148 2154
     
    
    2149
    -        if directory is not None and os.path.exists(directory):
    
    2155
    +        if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
    
    2156
    +            if not self.__artifacts.has_push_remotes(element=self):
    
    2157
    +                # Give an early warning if remote execution will not work
    
    2158
    +                raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
    
    2159
    +                                   .format(self.name) +
    
    2160
    +                                   "The remote artifact server(s) may not be correctly configured or contactable.")
    
    2161
    +
    
    2162
    +            self.info("Using a remote sandbox for artifact {}".format(self.name))
    
    2163
    +
    
    2164
    +            sandbox = SandboxRemote(context, project,
    
    2165
    +                                    directory,
    
    2166
    +                                    stdout=stdout,
    
    2167
    +                                    stderr=stderr,
    
    2168
    +                                    config=config,
    
    2169
    +                                    server_url=self.__remote_execution_url,
    
    2170
    +                                    allow_real_directory=False)
    
    2171
    +            yield sandbox
    
    2172
    +
    
    2173
    +        elif directory is not None and os.path.exists(directory):
    
    2174
    +            if self.__remote_execution_url:
    
    2175
    +                self.warn("Artifact {} is configured to use remote execution but element plugin does not support it."
    
    2176
    +                          .format(self.name), detail="Element plugin '{kind}' does not support virtual directories."
    
    2177
    +                          .format(kind=self.get_kind()), warning_token="remote-failure")
    
    2178
    +
    
    2179
    +                self.info("Falling back to local sandbox for artifact {}".format(self.name))
    
    2180
    +
    
    2150 2181
                 sandbox = platform.create_sandbox(context, project,
    
    2151 2182
                                                   directory,
    
    2152 2183
                                                   stdout=stdout,
    
    ... ... @@ -2318,6 +2349,20 @@ class Element(Plugin):
    2318 2349
             return SandboxConfig(self.node_get_member(sandbox_config, int, 'build-uid'),
    
    2319 2350
                                  self.node_get_member(sandbox_config, int, 'build-gid'))
    
    2320 2351
     
    
    2352
    +    # Remote execution configuration data (server URL), to be used by the remote sandbox.
    
    2353
    +    #
    
    2354
    +    def __extract_remote_execution_config(self, meta):
    
    2355
    +        if self.__is_junction:
    
    2356
    +            return None
    
    2357
    +        else:
    
    2358
    +            project = self._get_project()
    
    2359
    +            project.ensure_fully_loaded()
    
    2360
    +            if project._remote_execution:
    
    2361
    +                rexec_config = _yaml.node_chain_copy(project._remote_execution)
    
    2362
    +                return self.node_get_member(rexec_config, str, 'url')
    
    2363
    +            else:
    
    2364
    +                return None
    
    2365
    +
    
    2321 2366
         # This makes a special exception for the split rules, which
    
    2322 2367
         # elements may extend but whos defaults are defined in the project.
    
    2323 2368
         #
    

  • buildstream/plugins/elements/autotools.py
    ... ... @@ -57,7 +57,8 @@ from buildstream import BuildElement
    57 57
     
    
    58 58
     # Element implementation for the 'autotools' kind.
    
    59 59
     class AutotoolsElement(BuildElement):
    
    60
    -    pass
    
    60
    +    # Supports virtual directories (required for remote execution)
    
    61
    +    BST_VIRTUAL_DIRECTORY = True
    
    61 62
     
    
    62 63
     
    
    63 64
     # Plugin entry point
    

  • buildstream/plugins/elements/cmake.py
    ... ... @@ -56,7 +56,8 @@ from buildstream import BuildElement
    56 56
     
    
    57 57
     # Element implementation for the 'cmake' kind.
    
    58 58
     class CMakeElement(BuildElement):
    
    59
    -    pass
    
    59
    +    # Supports virtual directories (required for remote execution)
    
    60
    +    BST_VIRTUAL_DIRECTORY = True
    
    60 61
     
    
    61 62
     
    
    62 63
     # Plugin entry point
    

  • buildstream/plugins/elements/make.py
    ... ... @@ -38,7 +38,8 @@ from buildstream import BuildElement
    38 38
     
    
    39 39
     # Element implementation for the 'make' kind.
    
    40 40
     class MakeElement(BuildElement):
    
    41
    -    pass
    
    41
    +    # Supports virtual directories (required for remote execution)
    
    42
    +    BST_VIRTUAL_DIRECTORY = True
    
    42 43
     
    
    43 44
     
    
    44 45
     # Plugin entry point
    

  • buildstream/plugins/elements/meson.py
    ... ... @@ -53,7 +53,8 @@ from buildstream import BuildElement
    53 53
     
    
    54 54
     # Element implementation for the 'meson' kind.
    
    55 55
     class MesonElement(BuildElement):
    
    56
    -    pass
    
    56
    +    # Supports virtual directories (required for remote execution)
    
    57
    +    BST_VIRTUAL_DIRECTORY = True
    
    57 58
     
    
    58 59
     
    
    59 60
     # Plugin entry point
    

  • buildstream/plugins/elements/qmake.py
    ... ... @@ -33,7 +33,8 @@ from buildstream import BuildElement
    33 33
     
    
    34 34
     # Element implementation for the 'qmake' kind.
    
    35 35
     class QMakeElement(BuildElement):
    
    36
    -    pass
    
    36
    +    # Supports virtual directories (required for remote execution)
    
    37
    +    BST_VIRTUAL_DIRECTORY = True
    
    37 38
     
    
    38 39
     
    
    39 40
     # Plugin entry point
    

  • buildstream/sandbox/__init__.py
    ... ... @@ -20,3 +20,4 @@
    20 20
     from .sandbox import Sandbox, SandboxFlags
    
    21 21
     from ._sandboxchroot import SandboxChroot
    
    22 22
     from ._sandboxbwrap import SandboxBwrap
    
    23
    +from ._sandboxremote import SandboxRemote

  • buildstream/sandbox/_sandboxremote.py
    1
    +#!/usr/bin/env python3
    
    2
    +#
    
    3
    +#  Copyright (C) 2018 Bloomberg LP
    
    4
    +#
    
    5
    +#  This program is free software; you can redistribute it and/or
    
    6
    +#  modify it under the terms of the GNU Lesser General Public
    
    7
    +#  License as published by the Free Software Foundation; either
    
    8
    +#  version 2 of the License, or (at your option) any later version.
    
    9
    +#
    
    10
    +#  This library is distributed in the hope that it will be useful,
    
    11
    +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    12
    +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    13
    +#  Lesser General Public License for more details.
    
    14
    +#
    
    15
    +#  You should have received a copy of the GNU Lesser General Public
    
    16
    +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    17
    +#
    
    18
    +#  Authors:
    
    19
    +#        Jim MacArthur <jim macarthur codethink co uk>
    
    20
    +
    
    21
    +import os
    
    22
    +import re
    
    23
    +from urllib.parse import urlparse
    
    24
    +
    
    25
    +import grpc
    
    26
    +
    
    27
    +from . import Sandbox
    
    28
    +from ..storage._filebaseddirectory import FileBasedDirectory
    
    29
    +from ..storage._casbaseddirectory import CasBasedDirectory
    
    30
    +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    31
    +from .._artifactcache.cascache import CASCache
    
    32
    +
    
    33
    +
    
    34
    +class SandboxError(Exception):
    
    35
    +    pass
    
    36
    +
    
    37
    +
    
    38
    +# SandboxRemote()
    
    39
    +#
    
    40
    +# This isn't really a sandbox, it's a stub which sends all the sources and build
    
    41
    +# commands to a remote server and retrieves the results from it.
    
    42
    +#
    
    43
    +class SandboxRemote(Sandbox):
    
    44
    +
    
    45
    +    def __init__(self, *args, **kwargs):
    
    46
    +        super().__init__(*args, **kwargs)
    
    47
    +        self.cascache = None
    
    48
    +
    
    49
    +        url = urlparse(kwargs['server_url'])
    
    50
    +        if not url.scheme or not url.hostname or not url.port:
    
    51
    +            raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
    
    52
    +                               .format(self.server_url) +
    
    53
    +                               "It should be of the form <protocol>://<domain name>:<port>.")
    
    54
    +        elif url.scheme != 'http':
    
    55
    +            raise SandboxError("Configured remote '{}' uses an unsupported protocol. "
    
    56
    +                               "Only plain HTTP is currenlty supported (no HTTPS).")
    
    57
    +
    
    58
    +        self.server_url = '{}:{}'.format(url.hostname, url.port)
    
    59
    +
    
    60
    +    def _get_cascache(self):
    
    61
    +        if self.cascache is None:
    
    62
    +            self.cascache = CASCache(self._get_context())
    
    63
    +            self.cascache.setup_remotes(use_config=True)
    
    64
    +        return self.cascache
    
    65
    +
    
    66
    +    def run_remote_command(self, command, input_root_digest, working_directory, environment):
    
    67
    +        # Sends an execution request to the remote execution server.
    
    68
    +        #
    
    69
    +        # This function blocks until it gets a response from the server.
    
    70
    +        #
    
    71
    +        environment_variables = [remote_execution_pb2.Command.
    
    72
    +                                 EnvironmentVariable(name=k, value=v)
    
    73
    +                                 for (k, v) in environment.items()]
    
    74
    +
    
    75
    +        # Create and send the Command object.
    
    76
    +        remote_command = remote_execution_pb2.Command(arguments=command,
    
    77
    +                                                      working_directory=working_directory,
    
    78
    +                                                      environment_variables=environment_variables,
    
    79
    +                                                      output_files=[],
    
    80
    +                                                      output_directories=[self._output_directory],
    
    81
    +                                                      platform=None)
    
    82
    +
    
    83
    +        cascache = self._get_cascache()
    
    84
    +        # Upload the Command message to the remote CAS server
    
    85
    +        command_digest = cascache.push_message(self._get_project(), remote_command)
    
    86
    +        if not command_digest or not cascache.verify_digest_pushed(self._get_project(), command_digest):
    
    87
    +            # Command push failed
    
    88
    +            return None
    
    89
    +
    
    90
    +        # Create and send the action.
    
    91
    +        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    92
    +                                             input_root_digest=input_root_digest,
    
    93
    +                                             timeout=None,
    
    94
    +                                             do_not_cache=False)
    
    95
    +
    
    96
    +        # Upload the Action message to the remote CAS server
    
    97
    +        action_digest = cascache.push_message(self._get_project(), action)
    
    98
    +        if not action_digest or not cascache.verify_digest_pushed(self._get_project(), action_digest):
    
    99
    +            # Action push failed
    
    100
    +            return None
    
    101
    +
    
    102
    +        # Next, try to create a communication channel to the BuildGrid server.
    
    103
    +        channel = grpc.insecure_channel(self.server_url)
    
    104
    +        stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    105
    +        request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
    
    106
    +                                                      skip_cache_lookup=False)
    
    107
    +        try:
    
    108
    +            operation_iterator = stub.Execute(request)
    
    109
    +        except grpc.RpcError:
    
    110
    +            return None
    
    111
    +
    
    112
    +        operation = None
    
    113
    +        with self._get_context().timed_activity("Waiting for the remote build to complete"):
    
    114
    +            # It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here,
    
    115
    +            # which will check the server is actually contactable. However, calling it when the
    
    116
    +            # server is available seems to cause .code() to hang forever.
    
    117
    +            for operation in operation_iterator:
    
    118
    +                if operation.done:
    
    119
    +                    break
    
    120
    +
    
    121
    +        return operation
    
    122
    +
    
    123
    +    def process_job_output(self, output_directories, output_files):
    
    124
    +        # Reads the remote execution server response to an execution request.
    
    125
    +        #
    
    126
    +        # output_directories is an array of OutputDirectory objects.
    
    127
    +        # output_files is an array of OutputFile objects.
    
    128
    +        #
    
    129
    +        # We only specify one output_directory, so it's an error
    
    130
    +        # for there to be any output files or more than one directory at the moment.
    
    131
    +        #
    
    132
    +        if output_files:
    
    133
    +            raise SandboxError("Output files were returned when we didn't request any.")
    
    134
    +        elif not output_directories:
    
    135
    +            error_text = "No output directory was returned from the build server."
    
    136
    +            raise SandboxError(error_text)
    
    137
    +        elif len(output_directories) > 1:
    
    138
    +            error_text = "More than one output directory was returned from the build server: {}."
    
    139
    +            raise SandboxError(error_text.format(output_directories))
    
    140
    +
    
    141
    +        tree_digest = output_directories[0].tree_digest
    
    142
    +        if tree_digest is None or not tree_digest.hash:
    
    143
    +            raise SandboxError("Output directory structure had no digest attached.")
    
    144
    +
    
    145
    +        cascache = self._get_cascache()
    
    146
    +        # Now do a pull to ensure we have the necessary parts.
    
    147
    +        dir_digest = cascache.pull_tree(self._get_project(), tree_digest)
    
    148
    +        if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
    
    149
    +            raise SandboxError("Output directory structure pulling from remote failed.")
    
    150
    +
    
    151
    +        path_components = os.path.split(self._output_directory)
    
    152
    +
    
    153
    +        # Now what we have is a digest for the output. Once we return, the calling process will
    
    154
    +        # attempt to descend into our directory and find that directory, so we need to overwrite
    
    155
    +        # that.
    
    156
    +
    
    157
    +        if not path_components:
    
    158
    +            # The artifact wants the whole directory; we could just return the returned hash in its
    
    159
    +            # place, but we don't have a means to do that yet.
    
    160
    +            raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
    
    161
    +
    
    162
    +        # At the moment, we will get the whole directory back in the first directory argument and we need
    
    163
    +        # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
    
    164
    +        # from another hash will be interesting, though...
    
    165
    +
    
    166
    +        new_dir = CasBasedDirectory(self._get_context(), ref=dir_digest)
    
    167
    +        self._set_virtual_directory(new_dir)
    
    168
    +
    
    169
    +    def run(self, command, flags, *, cwd=None, env=None):
    
    170
    +        # Upload sources
    
    171
    +        upload_vdir = self.get_virtual_directory()
    
    172
    +
    
    173
    +        if isinstance(upload_vdir, FileBasedDirectory):
    
    174
    +            # Make a new temporary directory to put source in
    
    175
    +            upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
    
    176
    +            upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
    
    177
    +
    
    178
    +        upload_vdir.recalculate_hash()
    
    179
    +
    
    180
    +        cascache = self._get_cascache()
    
    181
    +        # Now, push that key (without necessarily needing a ref) to the remote.
    
    182
    +        vdir_digest = cascache.push_directory(self._get_project(), upload_vdir)
    
    183
    +        if not vdir_digest or not cascache.verify_digest_pushed(self._get_project(), vdir_digest):
    
    184
    +            raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    185
    +
    
    186
    +        # Set up environment and working directory
    
    187
    +        if cwd is None:
    
    188
    +            cwd = self._get_work_directory()
    
    189
    +
    
    190
    +        if cwd is None:
    
    191
    +            cwd = '/'
    
    192
    +
    
    193
    +        if env is None:
    
    194
    +            env = self._get_environment()
    
    195
    +
    
    196
    +        # We want command args as a list of strings
    
    197
    +        if isinstance(command, str):
    
    198
    +            command = [command]
    
    199
    +
    
    200
    +        # Now transmit the command to execute
    
    201
    +        operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
    
    202
    +
    
    203
    +        if operation is None:
    
    204
    +            # Failure of remote execution, usually due to an error in BuildStream
    
    205
    +            # NB This error could be raised in __run_remote_command
    
    206
    +            raise SandboxError("No response returned from server")
    
    207
    +
    
    208
    +        assert(not operation.HasField('error') and operation.HasField('response'))
    
    209
    +
    
    210
    +        execution_response = remote_execution_pb2.ExecuteResponse()
    
    211
    +        # The response is expected to be an ExecutionResponse message
    
    212
    +        assert(operation.response.Is(execution_response.DESCRIPTOR))
    
    213
    +
    
    214
    +        operation.response.Unpack(execution_response)
    
    215
    +
    
    216
    +        if execution_response.status.code != 0:
    
    217
    +            # A normal error during the build: the remote execution system
    
    218
    +            # has worked correctly but the command failed.
    
    219
    +            # execution_response.error also contains 'message' (str) and
    
    220
    +            # 'details' (iterator of Any) which we ignore at the moment.
    
    221
    +            return execution_response.status.code
    
    222
    +
    
    223
    +        action_result = execution_response.result
    
    224
    +
    
    225
    +        self.process_job_output(action_result.output_directories, action_result.output_files)
    
    226
    +
    
    227
    +        return 0

  • buildstream/sandbox/sandbox.py
    ... ... @@ -99,9 +99,11 @@ class Sandbox():
    99 99
             self.__stdout = kwargs['stdout']
    
    100 100
             self.__stderr = kwargs['stderr']
    
    101 101
     
    
    102
    -        # Setup the directories. Root should be available to subclasses, hence
    
    103
    -        # being single-underscore. The others are private to this class.
    
    102
    +        # Setup the directories. Root and output_directory should be
    
    103
    +        # available to subclasses, hence being single-underscore. The
    
    104
    +        # others are private to this class.
    
    104 105
             self._root = os.path.join(directory, 'root')
    
    106
    +        self._output_directory = None
    
    105 107
             self.__directory = directory
    
    106 108
             self.__scratch = os.path.join(self.__directory, 'scratch')
    
    107 109
             for directory_ in [self._root, self.__scratch]:
    
    ... ... @@ -144,11 +146,17 @@ class Sandbox():
    144 146
                     self._vdir = FileBasedDirectory(self._root)
    
    145 147
             return self._vdir
    
    146 148
     
    
    149
    +    def _set_virtual_directory(self, virtual_directory):
    
    150
    +        """ Sets virtual directory. Useful after remote execution
    
    151
    +        has rewritten the working directory.
    
    152
    +        """
    
    153
    +        self._vdir = virtual_directory
    
    154
    +
    
    147 155
         def set_environment(self, environment):
    
    148 156
             """Sets the environment variables for the sandbox
    
    149 157
     
    
    150 158
             Args:
    
    151
    -           directory (dict): The environment variables to use in the sandbox
    
    159
    +           environment (dict): The environment variables to use in the sandbox
    
    152 160
             """
    
    153 161
             self.__env = environment
    
    154 162
     
    
    ... ... @@ -160,6 +168,15 @@ class Sandbox():
    160 168
             """
    
    161 169
             self.__cwd = directory
    
    162 170
     
    
    171
    +    def set_output_directory(self, directory):
    
    172
    +        """Sets the output directory - the directory which is preserved
    
    173
    +        as an artifact after assembly.
    
    174
    +
    
    175
    +        Args:
    
    176
    +           directory (str): An absolute path within the sandbox
    
    177
    +        """
    
    178
    +        self._output_directory = directory
    
    179
    +
    
    163 180
         def mark_directory(self, directory, *, artifact=False):
    
    164 181
             """Marks a sandbox directory and ensures it will exist
    
    165 182
     
    

  • buildstream/storage/_casbaseddirectory.py
    ... ... @@ -543,6 +543,15 @@ class CasBasedDirectory(Directory):
    543 543
                     filelist.append(k)
    
    544 544
             return filelist
    
    545 545
     
    
    546
    +    def recalculate_hash(self):
    
    547
    +        """ Recalcuates the hash for this directory and store the results in
    
    548
    +        the cache. If this directory has a parent, tell it to
    
    549
    +        recalculate (since changing this directory changes an entry in
    
    550
    +        the parent). Hashes for subdirectories also get recalculated.
    
    551
    +        """
    
    552
    +        self._recalculate_recursing_up()
    
    553
    +        self._recalculate_recursing_down()
    
    554
    +
    
    546 555
         def _get_identifier(self):
    
    547 556
             path = ""
    
    548 557
             if self.parent:
    

  • doc/source/format_project.rst
    ... ... @@ -204,6 +204,24 @@ with an artifact share.
    204 204
     You can also specify a list of caches here; earlier entries in the list
    
    205 205
     will have higher priority than later ones.
    
    206 206
     
    
    207
    +Remote execution
    
    208
    +~~~~~~~~~~~~~~~~
    
    209
    +BuildStream supports remote execution using the Google Remote Execution API
    
    210
    +(REAPI). A description of how remote execution works is beyond the scope
    
    211
    +of this document, but you can specify a remote server complying with the REAPI
    
    212
    +using the `remote-execution` option:
    
    213
    +
    
    214
    +.. code:: yaml
    
    215
    +
    
    216
    +  remote-execution:
    
    217
    +
    
    218
    +    # A url defining a remote execution server
    
    219
    +    url: http://buildserver.example.com:50051
    
    220
    +
    
    221
    +The url should contain a hostname and port separated by ':'. Only plain HTTP is
    
    222
    +currently suported (no HTTPS).
    
    223
    +
    
    224
    +The Remote Execution API can be found via https://github.com/bazelbuild/remote-apis.
    
    207 225
     
    
    208 226
     .. _project_essentials_mirrors:
    
    209 227
     
    

  • doc/source/install_artifacts.rst
    ... ... @@ -161,13 +161,13 @@ Below are two examples of how to run the cache server as a systemd service, one
    161 161
     
    
    162 162
        [Service]
    
    163 163
        Environment="LC_ALL=C.UTF-8"
    
    164
    -   ExecStart=/usr/local/bin/bst-artifact-server --port 11001 --server-key {{certs_path}}/privkey.pem --
    
    165
    -   server-cert {{certs_path}}/fullchain.pem {{artifacts_path}}
    
    164
    +   ExecStart=/usr/local/bin/bst-artifact-server --port 11001 --server-key {{certs_path}}/server.key --server-cert {{certs_path}}/server.crt {{artifacts_path}}
    
    166 165
        User=artifacts
    
    167 166
     
    
    168 167
        [Install]
    
    169 168
        WantedBy=multi-user.target
    
    170 169
     
    
    170
    +.. code:: ini
    
    171 171
     
    
    172 172
        #
    
    173 173
        # Pull/Push
    
    ... ... @@ -178,9 +178,7 @@ Below are two examples of how to run the cache server as a systemd service, one
    178 178
     
    
    179 179
        [Service]
    
    180 180
        Environment="LC_ALL=C.UTF-8"
    
    181
    -   ExecStart=/usr/local/bin/bst-artifact-server --port 11002 --server-key {{certs_path}}/privkey.pem --
    
    182
    -   server-cert {{certs_path}}/fullchain.pem --client-certs /home/artifacts/authorized.crt --enable-push /
    
    183
    -   {{artifacts_path}}
    
    181
    +   ExecStart=/usr/local/bin/bst-artifact-server --port 11002 --server-key {{certs_path}}/server.key --server-cert {{certs_path}}/server.crt --client-certs {{certs_path}}/authorized.crt --enable-push {{artifacts_path}}
    
    184 182
        User=artifacts
    
    185 183
     
    
    186 184
        [Install]
    
    ... ... @@ -188,11 +186,16 @@ Below are two examples of how to run the cache server as a systemd service, one
    188 186
     
    
    189 187
     Here we define when systemd should start the service, which is after the networking stack has been started, we then define how to run the cache with the desired configuration, under the artifacts user. The {{ }} are there to denote where you should change these files to point to your desired locations.
    
    190 188
     
    
    189
    +For more information on systemd services see: 
    
    190
    +`Creating Systemd Service Files <https://www.devdungeon.com/content/creating-systemd-service-files>`_.
    
    191
    +
    
    191 192
     User configuration
    
    192 193
     ~~~~~~~~~~~~~~~~~~
    
    193 194
     The user configuration for artifacts is documented with the rest
    
    194 195
     of the :ref:`user configuration documentation <user_config>`.
    
    195 196
     
    
    197
    +Note that for self-signed certificates, the public key fields are mandatory.
    
    198
    +
    
    196 199
     Assuming you have the same setup used in this document, and that your
    
    197 200
     host is reachable on the internet as ``artifacts.com`` (for example),
    
    198 201
     then a user can use the following user configuration:
    

  • tests/artifactcache/project/elements/compose-all.bst
    1
    +kind: compose
    
    2
    +
    
    3
    +depends:
    
    4
    +- filename: import-bin.bst
    
    5
    +  type: build
    
    6
    +- filename: import-dev.bst
    
    7
    +  type: build
    
    8
    +
    
    9
    +config:
    
    10
    +  # Dont try running the sandbox, we dont have a
    
    11
    +  # runtime to run anything in this context.
    
    12
    +  integrate: False

  • tests/artifactcache/project/elements/import-bin.bst
    1
    +kind: import
    
    2
    +sources:
    
    3
    +- kind: local
    
    4
    +  path: files/bin-files

  • tests/artifactcache/project/elements/import-dev.bst
    1
    +kind: import
    
    2
    +sources:
    
    3
    +- kind: local
    
    4
    +  path: files/dev-files

  • tests/artifactcache/project/elements/target.bst
    1
    +kind: stack
    
    2
    +description: |
    
    3
    +
    
    4
    +  Main stack target for the bst build test
    
    5
    +
    
    6
    +depends:
    
    7
    +- import-bin.bst
    
    8
    +- import-dev.bst
    
    9
    +- compose-all.bst

  • tests/artifactcache/project/files/bin-files/usr/bin/hello
    1
    +#!/bin/bash
    
    2
    +
    
    3
    +echo "Hello !"

  • tests/artifactcache/project/files/dev-files/usr/include/pony.h
    1
    +#ifndef __PONY_H__
    
    2
    +#define __PONY_H__
    
    3
    +
    
    4
    +#define PONY_BEGIN "Once upon a time, there was a pony."
    
    5
    +#define PONY_END "And they lived happily ever after, the end."
    
    6
    +
    
    7
    +#define MAKE_PONY(story)  \
    
    8
    +  PONY_BEGIN \
    
    9
    +  story \
    
    10
    +  PONY_END
    
    11
    +
    
    12
    +#endif /* __PONY_H__ */

  • tests/artifactcache/project/project.conf
    1
    +# Project config for frontend build test
    
    2
    +name: test
    
    3
    +
    
    4
    +element-path: elements

  • tests/artifactcache/pull.py
    1
    +import hashlib
    
    2
    +import os
    
    3
    +import pytest
    
    4
    +
    
    5
    +from buildstream._artifactcache.artifactcache import ArtifactCacheSpec
    
    6
    +from buildstream._artifactcache.cascache import CASCache
    
    7
    +from buildstream._context import Context
    
    8
    +from buildstream._project import Project
    
    9
    +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    10
    +
    
    11
    +from tests.testutils import cli, create_artifact_share
    
    12
    +
    
    13
    +
    
    14
    +# Project directory
    
    15
    +DATA_DIR = os.path.join(
    
    16
    +    os.path.dirname(os.path.realpath(__file__)),
    
    17
    +    "project",
    
    18
    +)
    
    19
    +
    
    20
    +
    
    21
    +# Handle messages from the pipeline
    
    22
    +def message_handler(message, context):
    
    23
    +    pass
    
    24
    +
    
    25
    +
    
    26
    +def tree_maker(cas, tree, directory):
    
    27
    +    if tree.root.ByteSize() == 0:
    
    28
    +        tree.root.CopyFrom(directory)
    
    29
    +
    
    30
    +    for directory_node in directory.directories:
    
    31
    +        child_directory = tree.children.add()
    
    32
    +
    
    33
    +        with open(cas.objpath(directory_node.digest), 'rb') as f:
    
    34
    +            child_directory.ParseFromString(f.read())
    
    35
    +
    
    36
    +        tree_maker(cas, tree, child_directory)
    
    37
    +
    
    38
    +
    
    39
    +@pytest.mark.datafiles(DATA_DIR)
    
    40
    +def test_pull(cli, tmpdir, datafiles):
    
    41
    +    project_dir = str(datafiles)
    
    42
    +
    
    43
    +    # Set up an artifact cache.
    
    44
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    45
    +        # Configure artifact share
    
    46
    +        cli.configure({
    
    47
    +            'scheduler': {
    
    48
    +                'pushers': 1
    
    49
    +            },
    
    50
    +            'artifacts': {
    
    51
    +                'url': share.repo,
    
    52
    +                'push': True,
    
    53
    +            }
    
    54
    +        })
    
    55
    +
    
    56
    +        # First build the project with the artifact cache configured
    
    57
    +        result = cli.run(project=project_dir, args=['build', 'target.bst'])
    
    58
    +        result.assert_success()
    
    59
    +
    
    60
    +        # Assert that we are now cached locally
    
    61
    +        assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
    
    62
    +        # Assert that we shared/pushed the cached artifact
    
    63
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    64
    +        assert share.has_artifact('test', 'target.bst', element_key)
    
    65
    +
    
    66
    +        # Delete the artifact locally
    
    67
    +        cli.remove_artifact_from_cache(project_dir, 'target.bst')
    
    68
    +
    
    69
    +        # Assert that we are not cached locally anymore
    
    70
    +        assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
    
    71
    +
    
    72
    +        # Fake minimal context
    
    73
    +        context = Context()
    
    74
    +        context.set_message_handler(message_handler)
    
    75
    +        context.sched_pushers = 1
    
    76
    +        context.artifactdir = os.path.join(tmpdir, 'cache', 'artifacts')
    
    77
    +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
    
    78
    +                                                          push=True)]
    
    79
    +
    
    80
    +        # Load the project and CAS cache
    
    81
    +        project = Project(project_dir, context)
    
    82
    +        project.ensure_fully_loaded()
    
    83
    +        cas = CASCache(context)
    
    84
    +
    
    85
    +        # Assert that the element's artifact is **not** cached
    
    86
    +        element = project.load_elements(['target.bst'], cas)[0]
    
    87
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    88
    +        assert not cas.contains(element, element_key)
    
    89
    +
    
    90
    +        # Manually setup the CAS remote
    
    91
    +        cas.setup_remotes(use_config=True)
    
    92
    +        cas.initialize_remotes()
    
    93
    +        assert cas.has_push_remotes()
    
    94
    +
    
    95
    +        # Pull the artifact
    
    96
    +        pulled = cas.pull(element, element_key)
    
    97
    +        assert pulled is True
    
    98
    +        assert cas.contains(element, element_key)
    
    99
    +
    
    100
    +        # Finally, close the opened gRPC channels properly!
    
    101
    +        for remote in cas._remotes[project]:
    
    102
    +            if remote.channel:
    
    103
    +                remote.channel.close()
    
    104
    +
    
    105
    +
    
    106
    +@pytest.mark.datafiles(DATA_DIR)
    
    107
    +def test_pull_tree(cli, tmpdir, datafiles):
    
    108
    +    project_dir = str(datafiles)
    
    109
    +
    
    110
    +    # Set up an artifact cache.
    
    111
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    112
    +        # Configure artifact share
    
    113
    +        cli.configure({
    
    114
    +            'scheduler': {
    
    115
    +                'pushers': 1
    
    116
    +            },
    
    117
    +            'artifacts': {
    
    118
    +                'url': share.repo,
    
    119
    +                'push': True,
    
    120
    +            }
    
    121
    +        })
    
    122
    +
    
    123
    +        # First build the project with the artifact cache configured
    
    124
    +        result = cli.run(project=project_dir, args=['build', 'target.bst'])
    
    125
    +        result.assert_success()
    
    126
    +
    
    127
    +        # Assert that we are now cached locally
    
    128
    +        assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
    
    129
    +        # Assert that we shared/pushed the cached artifact
    
    130
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    131
    +        assert share.has_artifact('test', 'target.bst', element_key)
    
    132
    +
    
    133
    +        # Fake minimal context
    
    134
    +        context = Context()
    
    135
    +        context.set_message_handler(message_handler)
    
    136
    +        context.sched_pushers = 1
    
    137
    +        context.artifactdir = os.path.join(tmpdir, 'cache', 'artifacts')
    
    138
    +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
    
    139
    +                                                          push=True)]
    
    140
    +
    
    141
    +        # Load the project and CAS cache
    
    142
    +        project = Project(project_dir, context)
    
    143
    +        project.ensure_fully_loaded()
    
    144
    +        cas = CASCache(context)
    
    145
    +
    
    146
    +        # Assert that the element's artifact is cached
    
    147
    +        element = project.load_elements(['target.bst'], cas)[0]
    
    148
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    149
    +        assert cas.contains(element, element_key)
    
    150
    +
    
    151
    +        # Manually setup the CAS remote
    
    152
    +        cas.setup_remotes(use_config=True)
    
    153
    +        cas.initialize_remotes()
    
    154
    +        assert cas.has_push_remotes(element=element)
    
    155
    +
    
    156
    +        # Retrieve the Directory object from the cached artifact
    
    157
    +        artifact_ref = cas.get_artifact_fullname(element, element_key)
    
    158
    +        artifact_digest = cas.resolve_ref(artifact_ref)
    
    159
    +
    
    160
    +        directory = remote_execution_pb2.Directory()
    
    161
    +
    
    162
    +        with open(cas.objpath(artifact_digest), 'rb') as f:
    
    163
    +            directory.ParseFromString(f.read())
    
    164
    +
    
    165
    +        # Build the Tree object while we are still cached
    
    166
    +        tree = remote_execution_pb2.Tree()
    
    167
    +        tree_maker(cas, tree, directory)
    
    168
    +
    
    169
    +        # Push the Tree as a regular message
    
    170
    +        tree_digest = cas.push_message(project, tree)
    
    171
    +
    
    172
    +        # Now delete the artifact locally
    
    173
    +        cli.remove_artifact_from_cache(project_dir, 'target.bst')
    
    174
    +
    
    175
    +        # Assert that we are not cached locally anymore
    
    176
    +        assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
    
    177
    +
    
    178
    +        # Pull the artifact using the Tree object
    
    179
    +        directory_digest = cas.pull_tree(project, tree_digest)
    
    180
    +        assert directory_digest == artifact_digest
    
    181
    +
    
    182
    +        # Ensure the entire Tree stucture has been pulled
    
    183
    +        assert os.path.exists(cas.objpath(directory_digest))
    
    184
    +        for child_directory in tree.children:
    
    185
    +            child_blob = child_directory.SerializeToString()
    
    186
    +
    
    187
    +            child_digest = remote_execution_pb2.Digest()
    
    188
    +            child_digest.hash = hashlib.sha256(child_blob).hexdigest()
    
    189
    +            child_digest.size_bytes = len(child_blob)
    
    190
    +
    
    191
    +            assert os.path.exists(cas.objpath(child_digest))
    
    192
    +
    
    193
    +        # Finally, close the opened gRPC channels properly!
    
    194
    +        for remote in cas._remotes[project]:
    
    195
    +            if remote.channel:
    
    196
    +                remote.channel.close()

  • tests/artifactcache/push.py
    1
    +import os
    
    2
    +import pytest
    
    3
    +
    
    4
    +from pluginbase import PluginBase
    
    5
    +from buildstream._artifactcache.artifactcache import ArtifactCacheSpec
    
    6
    +from buildstream._artifactcache.cascache import CASCache
    
    7
    +from buildstream._context import Context
    
    8
    +from buildstream._project import Project
    
    9
    +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    10
    +from buildstream.storage._casbaseddirectory import CasBasedDirectory
    
    11
    +
    
    12
    +from tests.testutils import cli, create_artifact_share
    
    13
    +
    
    14
    +
    
    15
    +# Project directory
    
    16
    +DATA_DIR = os.path.join(
    
    17
    +    os.path.dirname(os.path.realpath(__file__)),
    
    18
    +    "project",
    
    19
    +)
    
    20
    +
    
    21
    +
    
    22
    +# Handle messages from the pipeline
    
    23
    +def message_handler(message, context):
    
    24
    +    pass
    
    25
    +
    
    26
    +
    
    27
    +@pytest.mark.datafiles(DATA_DIR)
    
    28
    +def test_push(cli, tmpdir, datafiles):
    
    29
    +    project_dir = str(datafiles)
    
    30
    +
    
    31
    +    # First build the project without the artifact cache configured
    
    32
    +    result = cli.run(project=project_dir, args=['build', 'target.bst'])
    
    33
    +    result.assert_success()
    
    34
    +
    
    35
    +    # Assert that we are now cached locally
    
    36
    +    assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
    
    37
    +
    
    38
    +    # Set up an artifact cache.
    
    39
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    40
    +        # Fake minimal context
    
    41
    +        context = Context()
    
    42
    +        context.set_message_handler(message_handler)
    
    43
    +        context.sched_pushers = 1
    
    44
    +        context.artifactdir = os.path.join(tmpdir, 'cache', 'artifacts')
    
    45
    +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
    
    46
    +                                                          push=True)]
    
    47
    +
    
    48
    +        # Load the project and CAS cache
    
    49
    +        project = Project(project_dir, context)
    
    50
    +        project.ensure_fully_loaded()
    
    51
    +        cas = CASCache(context)
    
    52
    +
    
    53
    +        # Assert that the element's artifact is cached
    
    54
    +        element = project.load_elements(['target.bst'], cas)[0]
    
    55
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    56
    +        assert cas.contains(element, element_key)
    
    57
    +
    
    58
    +        # Manually setup the CAS remote
    
    59
    +        cas.setup_remotes(use_config=True)
    
    60
    +        cas.initialize_remotes()
    
    61
    +        assert cas.has_push_remotes(element=element)
    
    62
    +
    
    63
    +        # Push the element's artifact
    
    64
    +        pushed = cas.push(element, [element_key])
    
    65
    +        assert pushed is True
    
    66
    +        assert share.has_artifact('test', 'target.bst', element_key)
    
    67
    +
    
    68
    +        # Finally, close the opened gRPC channels properly!
    
    69
    +        for remote in cas._remotes[project]:
    
    70
    +            if remote.channel:
    
    71
    +                remote.channel.close()
    
    72
    +
    
    73
    +
    
    74
    +@pytest.mark.datafiles(DATA_DIR)
    
    75
    +def test_push_directory(cli, tmpdir, datafiles):
    
    76
    +    project_dir = str(datafiles)
    
    77
    +
    
    78
    +    # First build the project without the artifact cache configured
    
    79
    +    result = cli.run(project=project_dir, args=['build', 'target.bst'])
    
    80
    +    result.assert_success()
    
    81
    +
    
    82
    +    # Assert that we are now cached locally
    
    83
    +    assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
    
    84
    +
    
    85
    +    # Set up an artifact cache.
    
    86
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    87
    +        # Fake minimal context
    
    88
    +        context = Context()
    
    89
    +        context.set_message_handler(message_handler)
    
    90
    +        context.sched_pushers = 1
    
    91
    +        context.artifactdir = os.path.join(tmpdir, 'cache', 'artifacts')
    
    92
    +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
    
    93
    +                                                          push=True)]
    
    94
    +
    
    95
    +        # Load the project and CAS cache
    
    96
    +        project = Project(project_dir, context)
    
    97
    +        project.ensure_fully_loaded()
    
    98
    +        cas = CASCache(context)
    
    99
    +
    
    100
    +        # Assert that the element's artifact is cached
    
    101
    +        element = project.load_elements(['target.bst'], cas)[0]
    
    102
    +        element_key = cli.get_element_key(project_dir, 'target.bst')
    
    103
    +        assert cas.contains(element, element_key)
    
    104
    +
    
    105
    +        # Manually setup the CAS remote
    
    106
    +        cas.setup_remotes(use_config=True)
    
    107
    +        cas.initialize_remotes()
    
    108
    +        assert cas.has_push_remotes(element=element)
    
    109
    +
    
    110
    +        # Recreate the CasBasedDirectory object from the cached artifact
    
    111
    +        artifact_ref = cas.get_artifact_fullname(element, element_key)
    
    112
    +        artifact_digest = cas.resolve_ref(artifact_ref)
    
    113
    +
    
    114
    +        directory = CasBasedDirectory(context, ref=artifact_digest)
    
    115
    +
    
    116
    +        # Push the CasBasedDirectory object
    
    117
    +        directory_digest = cas.push_directory(project, directory)
    
    118
    +        assert directory_digest == artifact_digest
    
    119
    +        assert share.has_object(directory_digest)
    
    120
    +
    
    121
    +        # Finally, close the opened gRPC channels properly!
    
    122
    +        for remote in cas._remotes[project]:
    
    123
    +            if remote.channel:
    
    124
    +                remote.channel.close()
    
    125
    +
    
    126
    +
    
    127
    +@pytest.mark.datafiles(DATA_DIR)
    
    128
    +def test_push_message(cli, tmpdir, datafiles):
    
    129
    +    project_dir = str(datafiles)
    
    130
    +
    
    131
    +    # Set up an artifact cache.
    
    132
    +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
    
    133
    +        # Fake minimal context
    
    134
    +        context = Context()
    
    135
    +        context.set_message_handler(message_handler)
    
    136
    +        context.sched_pushers = 1
    
    137
    +        context.artifactdir = os.path.join(tmpdir, 'cache', 'artifacts')
    
    138
    +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
    
    139
    +                                                          push=True)]
    
    140
    +
    
    141
    +        # Load the project and CAS cache
    
    142
    +        project = Project(project_dir, context)
    
    143
    +        project.ensure_fully_loaded()
    
    144
    +        cas = CASCache(context)
    
    145
    +
    
    146
    +        # Manually setup the CAS remote
    
    147
    +        cas.setup_remotes(use_config=True)
    
    148
    +        cas.initialize_remotes()
    
    149
    +        assert cas.has_push_remotes()
    
    150
    +
    
    151
    +        # Create an example message object
    
    152
    +        command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'],
    
    153
    +                                               working_directory='/buildstream-build',
    
    154
    +                                               output_directories=['/buildstream-install'])
    
    155
    +
    
    156
    +        # Push the message object
    
    157
    +        digest = cas.push_message(project, command)
    
    158
    +        assert digest
    
    159
    +        assert share.has_object(digest)
    
    160
    +
    
    161
    +        # Finally, close the opened gRPC channels properly!
    
    162
    +        for remote in cas._remotes[project]:
    
    163
    +            if remote.channel:
    
    164
    +                remote.channel.close()

  • tests/testutils/artifactshare.py
    ... ... @@ -15,6 +15,7 @@ from buildstream._artifactcache.cascache import CASCache
    15 15
     from buildstream._artifactcache.casserver import create_server
    
    16 16
     from buildstream._context import Context
    
    17 17
     from buildstream._exceptions import ArtifactError
    
    18
    +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    18 19
     
    
    19 20
     
    
    20 21
     # ArtifactShare()
    
    ... ... @@ -87,6 +88,23 @@ class ArtifactShare():
    87 88
             # Sleep until termination by signal
    
    88 89
             signal.pause()
    
    89 90
     
    
    91
    +    # has_object():
    
    92
    +    #
    
    93
    +    # Checks whether the object is present in the share
    
    94
    +    #
    
    95
    +    # Args:
    
    96
    +    #    digest (str): The object's digest
    
    97
    +    #
    
    98
    +    # Returns:
    
    99
    +    #    (bool): True if the object exists in the share, otherwise false.
    
    100
    +    def has_object(self, digest):
    
    101
    +
    
    102
    +        assert isinstance(digest, remote_execution_pb2.Digest)
    
    103
    +
    
    104
    +        object_path = self.cas.objpath(digest)
    
    105
    +
    
    106
    +        return  os.path.exists(object_path)
    
    107
    +
    
    90 108
         # has_artifact():
    
    91 109
         #
    
    92 110
         # Checks whether the artifact is present in the share
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]