Martin Blanchard pushed to branch jmac/remote_execution_client at BuildStream / buildstream
Commits:
- 
92d2b873
by Jim MacArthur at 2018-09-06T08:09:27Z
- 
86266601
by Martin Blanchard at 2018-09-06T08:09:29Z
- 
513ee6c0
by Martin Blanchard at 2018-09-06T08:09:29Z
- 
0734ca9f
by Martin Blanchard at 2018-09-06T08:09:29Z
- 
225ae3b5
by Jim MacArthur at 2018-09-06T08:09:29Z
- 
84cd6a20
by Jim MacArthur at 2018-09-06T08:09:29Z
- 
fc7c0068
by Jim MacArthur at 2018-09-06T08:09:29Z
- 
2deb8c13
by Jim MacArthur at 2018-09-06T08:09:29Z
- 
d9148938
by Jim MacArthur at 2018-09-06T08:09:29Z
- 
b49cca26
by Jim MacArthur at 2018-09-06T08:09:29Z
- 
dd663a68
by Jim MacArthur at 2018-09-06T08:09:29Z
- 
42650e98
by Jim MacArthur at 2018-09-06T08:09:29Z
- 
a973325b
by Martin Blanchard at 2018-09-06T08:09:29Z
- 
60cb4448
by Martin Blanchard at 2018-09-06T08:09:29Z
- 
eb206fdf
by Martin Blanchard at 2018-09-06T08:09:29Z
- 
5b435fae
by Martin Blanchard at 2018-09-06T08:09:29Z
- 
359a8fd5
by Martin Blanchard at 2018-09-06T08:13:25Z
- 
77eb7c9d
by Martin Blanchard at 2018-09-06T08:16:36Z
- 
3fb9ce56
by Martin Blanchard at 2018-09-06T08:16:37Z
25 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_project.py
- buildstream/buildelement.py
- buildstream/data/projectconfig.yaml
- buildstream/element.py
- buildstream/plugins/elements/autotools.py
- buildstream/plugins/elements/cmake.py
- buildstream/plugins/elements/make.py
- buildstream/plugins/elements/meson.py
- buildstream/plugins/elements/qmake.py
- buildstream/sandbox/__init__.py
- + buildstream/sandbox/_sandboxremote.py
- buildstream/sandbox/sandbox.py
- buildstream/storage/_casbaseddirectory.py
- doc/source/format_project.rst
- + tests/artifactcache/project/elements/compose-all.bst
- + tests/artifactcache/project/elements/import-bin.bst
- + tests/artifactcache/project/elements/import-dev.bst
- + tests/artifactcache/project/elements/target.bst
- + tests/artifactcache/project/files/bin-files/usr/bin/hello
- + tests/artifactcache/project/files/dev-files/usr/include/pony.h
- + tests/artifactcache/project/project.conf
- + tests/artifactcache/pull.py
- + tests/artifactcache/push.py
- tests/testutils/artifactshare.py
Changes:
| ... | ... | @@ -19,6 +19,7 @@ | 
| 19 | 19 |  | 
| 20 | 20 |  import hashlib
 | 
| 21 | 21 |  import itertools
 | 
| 22 | +import io
 | |
| 22 | 23 |  import multiprocessing
 | 
| 23 | 24 |  import os
 | 
| 24 | 25 |  import signal
 | 
| ... | ... | @@ -76,6 +77,7 @@ class CASCache(ArtifactCache): | 
| 76 | 77 |      ################################################
 | 
| 77 | 78 |      #     Implementation of abstract methods       #
 | 
| 78 | 79 |      ################################################
 | 
| 80 | + | |
| 79 | 81 |      def contains(self, element, key):
 | 
| 80 | 82 |          refpath = self._refpath(self.get_artifact_fullname(element, key))
 | 
| 81 | 83 |  | 
| ... | ... | @@ -259,6 +261,25 @@ class CASCache(ArtifactCache): | 
| 259 | 261 |  | 
| 260 | 262 |          return False
 | 
| 261 | 263 |  | 
| 264 | +    def pull_tree(self, project, digest):
 | |
| 265 | +        """ Pull a single Tree rather than an artifact.
 | |
| 266 | +        Does not update local refs. """
 | |
| 267 | + | |
| 268 | +        for remote in self._remotes[project]:
 | |
| 269 | +            try:
 | |
| 270 | +                remote.init()
 | |
| 271 | + | |
| 272 | +                digest = self._fetch_tree(remote, digest)
 | |
| 273 | + | |
| 274 | +                # no need to pull from additional remotes
 | |
| 275 | +                return digest
 | |
| 276 | + | |
| 277 | +            except grpc.RpcError as e:
 | |
| 278 | +                if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 279 | +                    raise
 | |
| 280 | + | |
| 281 | +        return None
 | |
| 282 | + | |
| 262 | 283 |      def link_key(self, element, oldkey, newkey):
 | 
| 263 | 284 |          oldref = self.get_artifact_fullname(element, oldkey)
 | 
| 264 | 285 |          newref = self.get_artifact_fullname(element, newkey)
 | 
| ... | ... | @@ -267,8 +288,46 @@ class CASCache(ArtifactCache): | 
| 267 | 288 |  | 
| 268 | 289 |          self.set_ref(newref, tree)
 | 
| 269 | 290 |  | 
| 291 | +    def _push_refs_to_remote(self, refs, remote):
 | |
| 292 | +        skipped_remote = True
 | |
| 293 | +        try:
 | |
| 294 | +            for ref in refs:
 | |
| 295 | +                tree = self.resolve_ref(ref)
 | |
| 296 | + | |
| 297 | +                # Check whether ref is already on the server in which case
 | |
| 298 | +                # there is no need to push the artifact
 | |
| 299 | +                try:
 | |
| 300 | +                    request = buildstream_pb2.GetReferenceRequest()
 | |
| 301 | +                    request.key = ref
 | |
| 302 | +                    response = remote.ref_storage.GetReference(request)
 | |
| 303 | + | |
| 304 | +                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
 | |
| 305 | +                        # ref is already on the server with the same tree
 | |
| 306 | +                        continue
 | |
| 307 | + | |
| 308 | +                except grpc.RpcError as e:
 | |
| 309 | +                    if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 310 | +                        # Intentionally re-raise RpcError for outer except block.
 | |
| 311 | +                        raise
 | |
| 312 | + | |
| 313 | +                self._send_directory(remote, tree)
 | |
| 314 | + | |
| 315 | +                request = buildstream_pb2.UpdateReferenceRequest()
 | |
| 316 | +                request.keys.append(ref)
 | |
| 317 | +                request.digest.hash = tree.hash
 | |
| 318 | +                request.digest.size_bytes = tree.size_bytes
 | |
| 319 | +                remote.ref_storage.UpdateReference(request)
 | |
| 320 | + | |
| 321 | +                skipped_remote = False
 | |
| 322 | +        except grpc.RpcError as e:
 | |
| 323 | +            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
 | |
| 324 | +                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
 | |
| 325 | + | |
| 326 | +        return not skipped_remote
 | |
| 327 | + | |
| 270 | 328 |      def push(self, element, keys):
 | 
| 271 | -        refs = [self.get_artifact_fullname(element, key) for key in keys]
 | |
| 329 | + | |
| 330 | +        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
 | |
| 272 | 331 |  | 
| 273 | 332 |          project = element._get_project()
 | 
| 274 | 333 |  | 
| ... | ... | @@ -278,101 +337,83 @@ class CASCache(ArtifactCache): | 
| 278 | 337 |  | 
| 279 | 338 |          for remote in push_remotes:
 | 
| 280 | 339 |              remote.init()
 | 
| 281 | -            skipped_remote = True
 | |
| 282 | -            element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
 | |
| 283 | 340 |  | 
| 284 | -            try:
 | |
| 285 | -                for ref in refs:
 | |
| 286 | -                    tree = self.resolve_ref(ref)
 | |
| 287 | - | |
| 288 | -                    # Check whether ref is already on the server in which case
 | |
| 289 | -                    # there is no need to push the artifact
 | |
| 290 | -                    try:
 | |
| 291 | -                        request = buildstream_pb2.GetReferenceRequest()
 | |
| 292 | -                        request.key = ref
 | |
| 293 | -                        response = remote.ref_storage.GetReference(request)
 | |
| 294 | - | |
| 295 | -                        if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
 | |
| 296 | -                            # ref is already on the server with the same tree
 | |
| 297 | -                            continue
 | |
| 298 | - | |
| 299 | -                    except grpc.RpcError as e:
 | |
| 300 | -                        if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 301 | -                            # Intentionally re-raise RpcError for outer except block.
 | |
| 302 | -                            raise
 | |
| 303 | - | |
| 304 | -                    missing_blobs = {}
 | |
| 305 | -                    required_blobs = self._required_blobs(tree)
 | |
| 306 | - | |
| 307 | -                    # Limit size of FindMissingBlobs request
 | |
| 308 | -                    for required_blobs_group in _grouper(required_blobs, 512):
 | |
| 309 | -                        request = remote_execution_pb2.FindMissingBlobsRequest()
 | |
| 310 | - | |
| 311 | -                        for required_digest in required_blobs_group:
 | |
| 312 | -                            d = request.blob_digests.add()
 | |
| 313 | -                            d.hash = required_digest.hash
 | |
| 314 | -                            d.size_bytes = required_digest.size_bytes
 | |
| 315 | - | |
| 316 | -                        response = remote.cas.FindMissingBlobs(request)
 | |
| 317 | -                        for digest in response.missing_blob_digests:
 | |
| 318 | -                            d = remote_execution_pb2.Digest()
 | |
| 319 | -                            d.hash = digest.hash
 | |
| 320 | -                            d.size_bytes = digest.size_bytes
 | |
| 321 | -                            missing_blobs[d.hash] = d
 | |
| 322 | - | |
| 323 | -                    # Upload any blobs missing on the server
 | |
| 324 | -                    skipped_remote = False
 | |
| 325 | -                    for digest in missing_blobs.values():
 | |
| 326 | -                        uuid_ = uuid.uuid4()
 | |
| 327 | -                        resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
 | |
| 328 | -                                                  digest.hash, str(digest.size_bytes)])
 | |
| 329 | - | |
| 330 | -                        def request_stream(resname):
 | |
| 331 | -                            with open(self.objpath(digest), 'rb') as f:
 | |
| 332 | -                                assert os.fstat(f.fileno()).st_size == digest.size_bytes
 | |
| 333 | -                                offset = 0
 | |
| 334 | -                                finished = False
 | |
| 335 | -                                remaining = digest.size_bytes
 | |
| 336 | -                                while not finished:
 | |
| 337 | -                                    chunk_size = min(remaining, 64 * 1024)
 | |
| 338 | -                                    remaining -= chunk_size
 | |
| 339 | - | |
| 340 | -                                    request = bytestream_pb2.WriteRequest()
 | |
| 341 | -                                    request.write_offset = offset
 | |
| 342 | -                                    # max. 64 kB chunks
 | |
| 343 | -                                    request.data = f.read(chunk_size)
 | |
| 344 | -                                    request.resource_name = resname
 | |
| 345 | -                                    request.finish_write = remaining <= 0
 | |
| 346 | -                                    yield request
 | |
| 347 | -                                    offset += chunk_size
 | |
| 348 | -                                    finished = request.finish_write
 | |
| 349 | -                        response = remote.bytestream.Write(request_stream(resource_name))
 | |
| 350 | - | |
| 351 | -                    request = buildstream_pb2.UpdateReferenceRequest()
 | |
| 352 | -                    request.keys.append(ref)
 | |
| 353 | -                    request.digest.hash = tree.hash
 | |
| 354 | -                    request.digest.size_bytes = tree.size_bytes
 | |
| 355 | -                    remote.ref_storage.UpdateReference(request)
 | |
| 356 | - | |
| 357 | -                    pushed = True
 | |
| 358 | - | |
| 359 | -            except grpc.RpcError as e:
 | |
| 360 | -                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
 | |
| 361 | -                    raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
 | |
| 341 | +            element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
 | |
| 362 | 342 |  | 
| 363 | -            if skipped_remote:
 | |
| 343 | +            if self._push_refs_to_remote(refs, remote):
 | |
| 344 | +                pushed = True
 | |
| 345 | +            else:
 | |
| 364 | 346 |                  self.context.message(Message(
 | 
| 365 | 347 |                      None,
 | 
| 366 | 348 |                      MessageType.SKIPPED,
 | 
| 367 | 349 |                      "Remote ({}) already has {} cached".format(
 | 
| 368 | 350 |                          remote.spec.url, element._get_brief_display_key())
 | 
| 369 | 351 |                  ))
 | 
| 352 | + | |
| 370 | 353 |          return pushed
 | 
| 371 | 354 |  | 
| 355 | +    def push_directory(self, project, directory):
 | |
| 356 | + | |
| 357 | +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 358 | + | |
| 359 | +        if directory.ref is None:
 | |
| 360 | +            return None
 | |
| 361 | + | |
| 362 | +        for remote in push_remotes:
 | |
| 363 | +            remote.init()
 | |
| 364 | + | |
| 365 | +            self._send_directory(remote, directory.ref)
 | |
| 366 | + | |
| 367 | +        return directory.ref
 | |
| 368 | + | |
| 369 | +    def push_message(self, project, message):
 | |
| 370 | + | |
| 371 | +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 372 | + | |
| 373 | +        message_buffer = message.SerializeToString()
 | |
| 374 | +        message_sha = hashlib.sha256(message_buffer)
 | |
| 375 | +        message_digest = remote_execution_pb2.Digest()
 | |
| 376 | +        message_digest.hash = message_sha.hexdigest()
 | |
| 377 | +        message_digest.size_bytes = len(message_buffer)
 | |
| 378 | + | |
| 379 | +        for remote in push_remotes:
 | |
| 380 | +            remote.init()
 | |
| 381 | + | |
| 382 | +            with io.BytesIO(message_buffer) as b:
 | |
| 383 | +                self._send_blob(remote, message_digest, b)
 | |
| 384 | + | |
| 385 | +        return message_digest
 | |
| 386 | + | |
| 387 | +    def _verify_digest_on_remote(self, remote, digest):
 | |
| 388 | +        # Check whether ref is already on the server in which case
 | |
| 389 | +        # there is no need to push the artifact
 | |
| 390 | +        request = remote_execution_pb2.FindMissingBlobsRequest()
 | |
| 391 | +        request.blob_digests.extend([digest])
 | |
| 392 | + | |
| 393 | +        response = remote.cas.FindMissingBlobs(request)
 | |
| 394 | +        if digest in response.missing_blob_digests:
 | |
| 395 | +            return False
 | |
| 396 | + | |
| 397 | +        return True
 | |
| 398 | + | |
| 399 | +    def verify_digest_pushed(self, project, digest):
 | |
| 400 | + | |
| 401 | +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 402 | + | |
| 403 | +        pushed = False
 | |
| 404 | + | |
| 405 | +        for remote in push_remotes:
 | |
| 406 | +            remote.init()
 | |
| 407 | + | |
| 408 | +            if self._verify_digest_on_remote(remote, digest):
 | |
| 409 | +                pushed = True
 | |
| 410 | + | |
| 411 | +        return pushed
 | |
| 412 | + | |
| 413 | + | |
| 372 | 414 |      ################################################
 | 
| 373 | 415 |      #                API Private Methods           #
 | 
| 374 | 416 |      ################################################
 | 
| 375 | - | |
| 376 | 417 |      # objpath():
 | 
| 377 | 418 |      #
 | 
| 378 | 419 |      # Return the path of an object based on its digest.
 | 
| ... | ... | @@ -599,6 +640,7 @@ class CASCache(ArtifactCache): | 
| 599 | 640 |      ################################################
 | 
| 600 | 641 |      #             Local Private Methods            #
 | 
| 601 | 642 |      ################################################
 | 
| 643 | + | |
| 602 | 644 |      def _checkout(self, dest, tree):
 | 
| 603 | 645 |          os.makedirs(dest, exist_ok=True)
 | 
| 604 | 646 |  | 
| ... | ... | @@ -761,16 +803,16 @@ class CASCache(ArtifactCache): | 
| 761 | 803 |              #
 | 
| 762 | 804 |              q.put(str(e))
 | 
| 763 | 805 |  | 
| 764 | -    def _required_blobs(self, tree):
 | |
| 806 | +    def _required_blobs(self, directory_digest):
 | |
| 765 | 807 |          # parse directory, and recursively add blobs
 | 
| 766 | 808 |          d = remote_execution_pb2.Digest()
 | 
| 767 | -        d.hash = tree.hash
 | |
| 768 | -        d.size_bytes = tree.size_bytes
 | |
| 809 | +        d.hash = directory_digest.hash
 | |
| 810 | +        d.size_bytes = directory_digest.size_bytes
 | |
| 769 | 811 |          yield d
 | 
| 770 | 812 |  | 
| 771 | 813 |          directory = remote_execution_pb2.Directory()
 | 
| 772 | 814 |  | 
| 773 | -        with open(self.objpath(tree), 'rb') as f:
 | |
| 815 | +        with open(self.objpath(directory_digest), 'rb') as f:
 | |
| 774 | 816 |              directory.ParseFromString(f.read())
 | 
| 775 | 817 |  | 
| 776 | 818 |          for filenode in directory.files:
 | 
| ... | ... | @@ -782,16 +824,16 @@ class CASCache(ArtifactCache): | 
| 782 | 824 |          for dirnode in directory.directories:
 | 
| 783 | 825 |              yield from self._required_blobs(dirnode.digest)
 | 
| 784 | 826 |  | 
| 785 | -    def _fetch_blob(self, remote, digest, out):
 | |
| 827 | +    def _fetch_blob(self, remote, digest, stream):
 | |
| 786 | 828 |          resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
 | 
| 787 | 829 |          request = bytestream_pb2.ReadRequest()
 | 
| 788 | 830 |          request.resource_name = resource_name
 | 
| 789 | 831 |          request.read_offset = 0
 | 
| 790 | 832 |          for response in remote.bytestream.Read(request):
 | 
| 791 | -            out.write(response.data)
 | |
| 833 | +            stream.write(response.data)
 | |
| 834 | +        stream.flush()
 | |
| 792 | 835 |  | 
| 793 | -        out.flush()
 | |
| 794 | -        assert digest.size_bytes == os.fstat(out.fileno()).st_size
 | |
| 836 | +        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
 | |
| 795 | 837 |  | 
| 796 | 838 |      def _fetch_directory(self, remote, tree):
 | 
| 797 | 839 |          objpath = self.objpath(tree)
 | 
| ... | ... | @@ -827,6 +869,92 @@ class CASCache(ArtifactCache): | 
| 827 | 869 |              digest = self.add_object(path=out.name)
 | 
| 828 | 870 |              assert digest.hash == tree.hash
 | 
| 829 | 871 |  | 
| 872 | +    def _fetch_tree(self, remote, digest):
 | |
| 873 | +        # download but do not store the Tree object
 | |
| 874 | +        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
 | |
| 875 | +            self._fetch_blob(remote, digest, out)
 | |
| 876 | + | |
| 877 | +            tree = remote_execution_pb2.Tree()
 | |
| 878 | + | |
| 879 | +            with open(out.name, 'rb') as f:
 | |
| 880 | +                tree.ParseFromString(f.read())
 | |
| 881 | + | |
| 882 | +            tree.children.extend([tree.root])
 | |
| 883 | +            for directory in tree.children:
 | |
| 884 | +                for filenode in directory.files:
 | |
| 885 | +                    fileobjpath = self.objpath(filenode.digest)
 | |
| 886 | +                    if os.path.exists(fileobjpath):
 | |
| 887 | +                        # already in local cache
 | |
| 888 | +                        continue
 | |
| 889 | + | |
| 890 | +                    with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
 | |
| 891 | +                        self._fetch_blob(remote, filenode.digest, f)
 | |
| 892 | + | |
| 893 | +                        added_digest = self.add_object(path=f.name)
 | |
| 894 | +                        assert added_digest.hash == filenode.digest.hash
 | |
| 895 | + | |
| 896 | +                # place directory blob only in final location when we've downloaded
 | |
| 897 | +                # all referenced blobs to avoid dangling references in the repository
 | |
| 898 | +                dirbuffer = directory.SerializeToString()
 | |
| 899 | +                dirdigest = self.add_object(buffer=dirbuffer)
 | |
| 900 | +                assert dirdigest.size_bytes == len(dirbuffer)
 | |
| 901 | + | |
| 902 | +        return dirdigest
 | |
| 903 | + | |
| 904 | +    def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
 | |
| 905 | +        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
 | |
| 906 | +                                  digest.hash, str(digest.size_bytes)])
 | |
| 907 | + | |
| 908 | +        def request_stream(resname, instream):
 | |
| 909 | +            offset = 0
 | |
| 910 | +            finished = False
 | |
| 911 | +            remaining = digest.size_bytes
 | |
| 912 | +            while not finished:
 | |
| 913 | +                chunk_size = min(remaining, 64 * 1024)
 | |
| 914 | +                remaining -= chunk_size
 | |
| 915 | + | |
| 916 | +                request = bytestream_pb2.WriteRequest()
 | |
| 917 | +                request.write_offset = offset
 | |
| 918 | +                # max. 64 kB chunks
 | |
| 919 | +                request.data = instream.read(chunk_size)
 | |
| 920 | +                request.resource_name = resname
 | |
| 921 | +                request.finish_write = remaining <= 0
 | |
| 922 | + | |
| 923 | +                yield request
 | |
| 924 | + | |
| 925 | +                offset += chunk_size
 | |
| 926 | +                finished = request.finish_write
 | |
| 927 | + | |
| 928 | +        response = remote.bytestream.Write(request_stream(resource_name, stream))
 | |
| 929 | + | |
| 930 | +        assert response.committed_size == digest.size_bytes
 | |
| 931 | + | |
| 932 | +    def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
 | |
| 933 | +        required_blobs = self._required_blobs(digest)
 | |
| 934 | + | |
| 935 | +        missing_blobs = dict()
 | |
| 936 | +        # Limit size of FindMissingBlobs request
 | |
| 937 | +        for required_blobs_group in _grouper(required_blobs, 512):
 | |
| 938 | +            request = remote_execution_pb2.FindMissingBlobsRequest()
 | |
| 939 | + | |
| 940 | +            for required_digest in required_blobs_group:
 | |
| 941 | +                d = request.blob_digests.add()
 | |
| 942 | +                d.hash = required_digest.hash
 | |
| 943 | +                d.size_bytes = required_digest.size_bytes
 | |
| 944 | + | |
| 945 | +            response = remote.cas.FindMissingBlobs(request)
 | |
| 946 | +            for missing_digest in response.missing_blob_digests:
 | |
| 947 | +                d = remote_execution_pb2.Digest()
 | |
| 948 | +                d.hash = missing_digest.hash
 | |
| 949 | +                d.size_bytes = missing_digest.size_bytes
 | |
| 950 | +                missing_blobs[d.hash] = d
 | |
| 951 | + | |
| 952 | +        # Upload any blobs missing on the server
 | |
| 953 | +        for blob_digest in missing_blobs.values():
 | |
| 954 | +            with open(self.objpath(blob_digest), 'rb') as f:
 | |
| 955 | +                assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
 | |
| 956 | +                self._send_blob(remote, blob_digest, f, u_uid=u_uid)
 | |
| 957 | + | |
| 830 | 958 |  | 
| 831 | 959 |  # Represents a single remote CAS cache.
 | 
| 832 | 960 |  #
 | 
| ... | ... | @@ -129,6 +129,7 @@ class Project(): | 
| 129 | 129 |  | 
| 130 | 130 |          self.artifact_cache_specs = None
 | 
| 131 | 131 |          self._sandbox = None
 | 
| 132 | +        self._remote_execution = None
 | |
| 132 | 133 |          self._splits = None
 | 
| 133 | 134 |  | 
| 134 | 135 |          self._context.add_project(self)
 | 
| ... | ... | @@ -471,7 +472,7 @@ class Project(): | 
| 471 | 472 |              'aliases', 'name',
 | 
| 472 | 473 |              'artifacts', 'options',
 | 
| 473 | 474 |              'fail-on-overlap', 'shell', 'fatal-warnings',
 | 
| 474 | -            'ref-storage', 'sandbox', 'mirrors'
 | |
| 475 | +            'ref-storage', 'sandbox', 'mirrors', 'remote-execution'
 | |
| 475 | 476 |          ])
 | 
| 476 | 477 |  | 
| 477 | 478 |          #
 | 
| ... | ... | @@ -489,6 +490,9 @@ class Project(): | 
| 489 | 490 |          # Load sandbox configuration
 | 
| 490 | 491 |          self._sandbox = _yaml.node_get(config, Mapping, 'sandbox')
 | 
| 491 | 492 |  | 
| 493 | +        # Load remote execution configuration
 | |
| 494 | +        self._remote_execution = _yaml.node_get(config, Mapping, 'remote-execution')
 | |
| 495 | + | |
| 492 | 496 |          # Load project split rules
 | 
| 493 | 497 |          self._splits = _yaml.node_get(config, Mapping, 'split-rules')
 | 
| 494 | 498 |  | 
| ... | ... | @@ -155,6 +155,9 @@ class BuildElement(Element): | 
| 155 | 155 |              command_dir = build_root
 | 
| 156 | 156 |          sandbox.set_work_directory(command_dir)
 | 
| 157 | 157 |  | 
| 158 | +        # Tell sandbox which directory is preserved in the finished artifact
 | |
| 159 | +        sandbox.set_output_directory(install_root)
 | |
| 160 | + | |
| 158 | 161 |          # Setup environment
 | 
| 159 | 162 |          sandbox.set_environment(self.get_environment())
 | 
| 160 | 163 |  | 
| ... | ... | @@ -204,3 +204,6 @@ shell: | 
| 204 | 204 |    # Command to run when `bst shell` does not provide a command
 | 
| 205 | 205 |    #
 | 
| 206 | 206 |    command: [ 'sh', '-i' ]
 | 
| 207 | + | |
| 208 | +remote-execution:
 | |
| 209 | +  url: "" | |
| \ No newline at end of file | 
| ... | ... | @@ -95,6 +95,7 @@ from . import _site | 
| 95 | 95 |  from ._platform import Platform
 | 
| 96 | 96 |  from .plugin import CoreWarnings
 | 
| 97 | 97 |  from .sandbox._config import SandboxConfig
 | 
| 98 | +from .sandbox._sandboxremote import SandboxRemote
 | |
| 98 | 99 |  | 
| 99 | 100 |  from .storage.directory import Directory
 | 
| 100 | 101 |  from .storage._filebaseddirectory import FileBasedDirectory
 | 
| ... | ... | @@ -250,6 +251,9 @@ class Element(Plugin): | 
| 250 | 251 |          # Extract Sandbox config
 | 
| 251 | 252 |          self.__sandbox_config = self.__extract_sandbox_config(meta)
 | 
| 252 | 253 |  | 
| 254 | +        # Extract remote execution URL
 | |
| 255 | +        self.__remote_execution_url = self.__extract_remote_execution_config(meta)
 | |
| 256 | + | |
| 253 | 257 |      def __lt__(self, other):
 | 
| 254 | 258 |          return self.name < other.name
 | 
| 255 | 259 |  | 
| ... | ... | @@ -1570,6 +1574,8 @@ class Element(Plugin): | 
| 1570 | 1574 |                  finally:
 | 
| 1571 | 1575 |                      if collect is not None:
 | 
| 1572 | 1576 |                          try:
 | 
| 1577 | +                            # Sandbox will probably have replaced its virtual directory, so get it again
 | |
| 1578 | +                            sandbox_vroot = sandbox.get_virtual_directory()
 | |
| 1573 | 1579 |                              collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
 | 
| 1574 | 1580 |                          except VirtualDirectoryError:
 | 
| 1575 | 1581 |                              # No collect directory existed
 | 
| ... | ... | @@ -2146,7 +2152,32 @@ class Element(Plugin): | 
| 2146 | 2152 |          project = self._get_project()
 | 
| 2147 | 2153 |          platform = Platform.get_platform()
 | 
| 2148 | 2154 |  | 
| 2149 | -        if directory is not None and os.path.exists(directory):
 | |
| 2155 | +        if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
 | |
| 2156 | +            if not self.__artifacts.has_push_remotes(element=self):
 | |
| 2157 | +                # Give an early warning if remote execution will not work
 | |
| 2158 | +                raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
 | |
| 2159 | +                                   .format(self.name) +
 | |
| 2160 | +                                   "The remote artifact server(s) may not be correctly configured or contactable.")
 | |
| 2161 | + | |
| 2162 | +            self.info("Using a remote sandbox for artifact {}".format(self.name))
 | |
| 2163 | + | |
| 2164 | +            sandbox = SandboxRemote(context, project,
 | |
| 2165 | +                                    directory,
 | |
| 2166 | +                                    stdout=stdout,
 | |
| 2167 | +                                    stderr=stderr,
 | |
| 2168 | +                                    config=config,
 | |
| 2169 | +                                    server_url=self.__remote_execution_url,
 | |
| 2170 | +                                    allow_real_directory=False)
 | |
| 2171 | +            yield sandbox
 | |
| 2172 | + | |
| 2173 | +        elif directory is not None and os.path.exists(directory):
 | |
| 2174 | +            if self.__remote_execution_url:
 | |
| 2175 | +                self.warn("Artifact {} is configured to use remote execution but element plugin does not support it."
 | |
| 2176 | +                          .format(self.name), detail="Element plugin '{kind}' does not support virtual directories."
 | |
| 2177 | +                          .format(kind=self.get_kind()), warning_token="remote-failure")
 | |
| 2178 | + | |
| 2179 | +                self.info("Falling back to local sandbox for artifact {}".format(self.name))
 | |
| 2180 | + | |
| 2150 | 2181 |              sandbox = platform.create_sandbox(context, project,
 | 
| 2151 | 2182 |                                                directory,
 | 
| 2152 | 2183 |                                                stdout=stdout,
 | 
| ... | ... | @@ -2318,6 +2349,20 @@ class Element(Plugin): | 
| 2318 | 2349 |          return SandboxConfig(self.node_get_member(sandbox_config, int, 'build-uid'),
 | 
| 2319 | 2350 |                               self.node_get_member(sandbox_config, int, 'build-gid'))
 | 
| 2320 | 2351 |  | 
| 2352 | +    # Remote execution configuration data (server URL), to be used by the remote sandbox.
 | |
| 2353 | +    #
 | |
| 2354 | +    def __extract_remote_execution_config(self, meta):
 | |
| 2355 | +        if self.__is_junction:
 | |
| 2356 | +            return None
 | |
| 2357 | +        else:
 | |
| 2358 | +            project = self._get_project()
 | |
| 2359 | +            project.ensure_fully_loaded()
 | |
| 2360 | +            if project._remote_execution:
 | |
| 2361 | +                rexec_config = _yaml.node_chain_copy(project._remote_execution)
 | |
| 2362 | +                return self.node_get_member(rexec_config, str, 'url')
 | |
| 2363 | +            else:
 | |
| 2364 | +                return None
 | |
| 2365 | + | |
| 2321 | 2366 |      # This makes a special exception for the split rules, which
 | 
| 2322 | 2367 |      # elements may extend but whos defaults are defined in the project.
 | 
| 2323 | 2368 |      #
 | 
| ... | ... | @@ -57,7 +57,8 @@ from buildstream import BuildElement | 
| 57 | 57 |  | 
| 58 | 58 |  # Element implementation for the 'autotools' kind.
 | 
| 59 | 59 |  class AutotoolsElement(BuildElement):
 | 
| 60 | -    pass
 | |
| 60 | +    # Supports virtual directories (required for remote execution)
 | |
| 61 | +    BST_VIRTUAL_DIRECTORY = True
 | |
| 61 | 62 |  | 
| 62 | 63 |  | 
| 63 | 64 |  # Plugin entry point
 | 
| ... | ... | @@ -56,7 +56,8 @@ from buildstream import BuildElement | 
| 56 | 56 |  | 
| 57 | 57 |  # Element implementation for the 'cmake' kind.
 | 
| 58 | 58 |  class CMakeElement(BuildElement):
 | 
| 59 | -    pass
 | |
| 59 | +    # Supports virtual directories (required for remote execution)
 | |
| 60 | +    BST_VIRTUAL_DIRECTORY = True
 | |
| 60 | 61 |  | 
| 61 | 62 |  | 
| 62 | 63 |  # Plugin entry point
 | 
| ... | ... | @@ -38,7 +38,8 @@ from buildstream import BuildElement | 
| 38 | 38 |  | 
| 39 | 39 |  # Element implementation for the 'make' kind.
 | 
| 40 | 40 |  class MakeElement(BuildElement):
 | 
| 41 | -    pass
 | |
| 41 | +    # Supports virtual directories (required for remote execution)
 | |
| 42 | +    BST_VIRTUAL_DIRECTORY = True
 | |
| 42 | 43 |  | 
| 43 | 44 |  | 
| 44 | 45 |  # Plugin entry point
 | 
| ... | ... | @@ -53,7 +53,8 @@ from buildstream import BuildElement | 
| 53 | 53 |  | 
| 54 | 54 |  # Element implementation for the 'meson' kind.
 | 
| 55 | 55 |  class MesonElement(BuildElement):
 | 
| 56 | -    pass
 | |
| 56 | +    # Supports virtual directories (required for remote execution)
 | |
| 57 | +    BST_VIRTUAL_DIRECTORY = True
 | |
| 57 | 58 |  | 
| 58 | 59 |  | 
| 59 | 60 |  # Plugin entry point
 | 
| ... | ... | @@ -33,7 +33,8 @@ from buildstream import BuildElement | 
| 33 | 33 |  | 
| 34 | 34 |  # Element implementation for the 'qmake' kind.
 | 
| 35 | 35 |  class QMakeElement(BuildElement):
 | 
| 36 | -    pass
 | |
| 36 | +    # Supports virtual directories (required for remote execution)
 | |
| 37 | +    BST_VIRTUAL_DIRECTORY = True
 | |
| 37 | 38 |  | 
| 38 | 39 |  | 
| 39 | 40 |  # Plugin entry point
 | 
| ... | ... | @@ -20,3 +20,4 @@ | 
| 20 | 20 |  from .sandbox import Sandbox, SandboxFlags
 | 
| 21 | 21 |  from ._sandboxchroot import SandboxChroot
 | 
| 22 | 22 |  from ._sandboxbwrap import SandboxBwrap
 | 
| 23 | +from ._sandboxremote import SandboxRemote | 
| 1 | +#!/usr/bin/env python3
 | |
| 2 | +#
 | |
| 3 | +#  Copyright (C) 2018 Bloomberg LP
 | |
| 4 | +#
 | |
| 5 | +#  This program is free software; you can redistribute it and/or
 | |
| 6 | +#  modify it under the terms of the GNU Lesser General Public
 | |
| 7 | +#  License as published by the Free Software Foundation; either
 | |
| 8 | +#  version 2 of the License, or (at your option) any later version.
 | |
| 9 | +#
 | |
| 10 | +#  This library is distributed in the hope that it will be useful,
 | |
| 11 | +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| 12 | +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
 | |
| 13 | +#  Lesser General Public License for more details.
 | |
| 14 | +#
 | |
| 15 | +#  You should have received a copy of the GNU Lesser General Public
 | |
| 16 | +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 17 | +#
 | |
| 18 | +#  Authors:
 | |
| 19 | +#        Jim MacArthur <jim macarthur codethink co uk>
 | |
| 20 | + | |
| 21 | +import os
 | |
| 22 | +import re
 | |
| 23 | +from urllib.parse import urlparse
 | |
| 24 | + | |
| 25 | +import grpc
 | |
| 26 | + | |
| 27 | +from . import Sandbox
 | |
| 28 | +from ..storage._filebaseddirectory import FileBasedDirectory
 | |
| 29 | +from ..storage._casbaseddirectory import CasBasedDirectory
 | |
| 30 | +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | |
| 31 | +from .._artifactcache.cascache import CASCache
 | |
| 32 | + | |
| 33 | + | |
| 34 | +class SandboxError(Exception):
 | |
| 35 | +    pass
 | |
| 36 | + | |
| 37 | + | |
| 38 | +# SandboxRemote()
 | |
| 39 | +#
 | |
| 40 | +# This isn't really a sandbox, it's a stub which sends all the sources and build
 | |
| 41 | +# commands to a remote server and retrieves the results from it.
 | |
| 42 | +#
 | |
| 43 | +class SandboxRemote(Sandbox):
 | |
| 44 | + | |
| 45 | +    def __init__(self, *args, **kwargs):
 | |
| 46 | +        super().__init__(*args, **kwargs)
 | |
| 47 | +        self.cascache = None
 | |
| 48 | + | |
| 49 | +        url = urlparse(kwargs['server_url'])
 | |
| 50 | +        if not url.scheme or not url.hostname or not url.port:
 | |
| 51 | +            raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
 | |
| 52 | +                               .format(self.server_url) +
 | |
| 53 | +                               "It should be of the form <protocol>://<domain name>:<port>.")
 | |
| 54 | +        elif url.scheme != 'http':
 | |
| 55 | +            raise SandboxError("Configured remote '{}' uses an unsupported protocol. "
 | |
| 56 | +                               "Only plain HTTP is currenlty supported (no HTTPS).")
 | |
| 57 | + | |
| 58 | +        self.server_url = '{}:{}'.format(url.hostname, url.port)
 | |
| 59 | + | |
| 60 | +    def _get_cascache(self):
 | |
| 61 | +        if self.cascache is None:
 | |
| 62 | +            self.cascache = CASCache(self._get_context())
 | |
| 63 | +            self.cascache.setup_remotes(use_config=True)
 | |
| 64 | +        return self.cascache
 | |
| 65 | + | |
| 66 | +    def run_remote_command(self, command, input_root_digest, working_directory, environment):
 | |
| 67 | +        # Sends an execution request to the remote execution server.
 | |
| 68 | +        #
 | |
| 69 | +        # This function blocks until it gets a response from the server.
 | |
| 70 | +        #
 | |
| 71 | +        environment_variables = [remote_execution_pb2.Command.
 | |
| 72 | +                                 EnvironmentVariable(name=k, value=v)
 | |
| 73 | +                                 for (k, v) in environment.items()]
 | |
| 74 | + | |
| 75 | +        # Create and send the Command object.
 | |
| 76 | +        remote_command = remote_execution_pb2.Command(arguments=command,
 | |
| 77 | +                                                      working_directory=working_directory,
 | |
| 78 | +                                                      environment_variables=environment_variables,
 | |
| 79 | +                                                      output_files=[],
 | |
| 80 | +                                                      output_directories=[self._output_directory],
 | |
| 81 | +                                                      platform=None)
 | |
| 82 | + | |
| 83 | +        cascache = self._get_cascache()
 | |
| 84 | +        # Upload the Command message to the remote CAS server
 | |
| 85 | +        command_digest = cascache.push_message(self._get_project(), remote_command)
 | |
| 86 | +        if not command_digest or not cascache.verify_digest_pushed(self._get_project(), command_digest):
 | |
| 87 | +            # Command push failed
 | |
| 88 | +            return None
 | |
| 89 | + | |
| 90 | +        # Create and send the action.
 | |
| 91 | +        action = remote_execution_pb2.Action(command_digest=command_digest,
 | |
| 92 | +                                             input_root_digest=input_root_digest,
 | |
| 93 | +                                             timeout=None,
 | |
| 94 | +                                             do_not_cache=False)
 | |
| 95 | + | |
| 96 | +        # Upload the Action message to the remote CAS server
 | |
| 97 | +        action_digest = cascache.push_message(self._get_project(), action)
 | |
| 98 | +        if not action_digest or not cascache.verify_digest_pushed(self._get_project(), action_digest):
 | |
| 99 | +            # Action push failed
 | |
| 100 | +            return None
 | |
| 101 | + | |
| 102 | +        # Next, try to create a communication channel to the BuildGrid server.
 | |
| 103 | +        channel = grpc.insecure_channel(self.server_url)
 | |
| 104 | +        stub = remote_execution_pb2_grpc.ExecutionStub(channel)
 | |
| 105 | +        request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
 | |
| 106 | +                                                      skip_cache_lookup=False)
 | |
| 107 | +        try:
 | |
| 108 | +            operation_iterator = stub.Execute(request)
 | |
| 109 | +        except grpc.RpcError:
 | |
| 110 | +            return None
 | |
| 111 | + | |
| 112 | +        operation = None
 | |
| 113 | +        with self._get_context().timed_activity("Waiting for the remote build to complete"):
 | |
| 114 | +            # It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here,
 | |
| 115 | +            # which will check the server is actually contactable. However, calling it when the
 | |
| 116 | +            # server is available seems to cause .code() to hang forever.
 | |
| 117 | +            for operation in operation_iterator:
 | |
| 118 | +                if operation.done:
 | |
| 119 | +                    break
 | |
| 120 | + | |
| 121 | +        return operation
 | |
| 122 | + | |
| 123 | +    def process_job_output(self, output_directories, output_files):
 | |
| 124 | +        # Reads the remote execution server response to an execution request.
 | |
| 125 | +        #
 | |
| 126 | +        # output_directories is an array of OutputDirectory objects.
 | |
| 127 | +        # output_files is an array of OutputFile objects.
 | |
| 128 | +        #
 | |
| 129 | +        # We only specify one output_directory, so it's an error
 | |
| 130 | +        # for there to be any output files or more than one directory at the moment.
 | |
| 131 | +        #
 | |
| 132 | +        if output_files:
 | |
| 133 | +            raise SandboxError("Output files were returned when we didn't request any.")
 | |
| 134 | +        elif not output_directories:
 | |
| 135 | +            error_text = "No output directory was returned from the build server."
 | |
| 136 | +            raise SandboxError(error_text)
 | |
| 137 | +        elif len(output_directories) > 1:
 | |
| 138 | +            error_text = "More than one output directory was returned from the build server: {}."
 | |
| 139 | +            raise SandboxError(error_text.format(output_directories))
 | |
| 140 | + | |
| 141 | +        tree_digest = output_directories[0].tree_digest
 | |
| 142 | +        if tree_digest is None or not tree_digest.hash:
 | |
| 143 | +            raise SandboxError("Output directory structure had no digest attached.")
 | |
| 144 | + | |
| 145 | +        cascache = self._get_cascache()
 | |
| 146 | +        # Now do a pull to ensure we have the necessary parts.
 | |
| 147 | +        dir_digest = cascache.pull_tree(self._get_project(), tree_digest)
 | |
| 148 | +        if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
 | |
| 149 | +            raise SandboxError("Output directory structure pulling from remote failed.")
 | |
| 150 | + | |
| 151 | +        path_components = os.path.split(self._output_directory)
 | |
| 152 | + | |
| 153 | +        # Now what we have is a digest for the output. Once we return, the calling process will
 | |
| 154 | +        # attempt to descend into our directory and find that directory, so we need to overwrite
 | |
| 155 | +        # that.
 | |
| 156 | + | |
| 157 | +        if not path_components:
 | |
| 158 | +            # The artifact wants the whole directory; we could just return the returned hash in its
 | |
| 159 | +            # place, but we don't have a means to do that yet.
 | |
| 160 | +            raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
 | |
| 161 | + | |
| 162 | +        # At the moment, we will get the whole directory back in the first directory argument and we need
 | |
| 163 | +        # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
 | |
| 164 | +        # from another hash will be interesting, though...
 | |
| 165 | + | |
| 166 | +        new_dir = CasBasedDirectory(self._get_context(), ref=dir_digest)
 | |
| 167 | +        self._set_virtual_directory(new_dir)
 | |
| 168 | + | |
| 169 | +    def run(self, command, flags, *, cwd=None, env=None):
 | |
| 170 | +        # Upload sources
 | |
| 171 | +        upload_vdir = self.get_virtual_directory()
 | |
| 172 | + | |
| 173 | +        if isinstance(upload_vdir, FileBasedDirectory):
 | |
| 174 | +            # Make a new temporary directory to put source in
 | |
| 175 | +            upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
 | |
| 176 | +            upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
 | |
| 177 | + | |
| 178 | +        upload_vdir.recalculate_hash()
 | |
| 179 | + | |
| 180 | +        cascache = self._get_cascache()
 | |
| 181 | +        # Now, push that key (without necessarily needing a ref) to the remote.
 | |
| 182 | +        vdir_digest = cascache.push_directory(self._get_project(), upload_vdir)
 | |
| 183 | +        if not vdir_digest or not cascache.verify_digest_pushed(self._get_project(), vdir_digest):
 | |
| 184 | +            raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
 | |
| 185 | + | |
| 186 | +        # Set up environment and working directory
 | |
| 187 | +        if cwd is None:
 | |
| 188 | +            cwd = self._get_work_directory()
 | |
| 189 | + | |
| 190 | +        if cwd is None:
 | |
| 191 | +            cwd = '/'
 | |
| 192 | + | |
| 193 | +        if env is None:
 | |
| 194 | +            env = self._get_environment()
 | |
| 195 | + | |
| 196 | +        # We want command args as a list of strings
 | |
| 197 | +        if isinstance(command, str):
 | |
| 198 | +            command = [command]
 | |
| 199 | + | |
| 200 | +        # Now transmit the command to execute
 | |
| 201 | +        operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
 | |
| 202 | + | |
| 203 | +        if operation is None:
 | |
| 204 | +            # Failure of remote execution, usually due to an error in BuildStream
 | |
| 205 | +            # NB This error could be raised in __run_remote_command
 | |
| 206 | +            raise SandboxError("No response returned from server")
 | |
| 207 | + | |
| 208 | +        assert(not operation.HasField('error') and operation.HasField('response'))
 | |
| 209 | + | |
| 210 | +        execution_response = remote_execution_pb2.ExecuteResponse()
 | |
| 211 | +        # The response is expected to be an ExecutionResponse message
 | |
| 212 | +        assert(operation.response.Is(execution_response.DESCRIPTOR))
 | |
| 213 | + | |
| 214 | +        operation.response.Unpack(execution_response)
 | |
| 215 | + | |
| 216 | +        if execution_response.status.code != 0:
 | |
| 217 | +            # A normal error during the build: the remote execution system
 | |
| 218 | +            # has worked correctly but the command failed.
 | |
| 219 | +            # execution_response.error also contains 'message' (str) and
 | |
| 220 | +            # 'details' (iterator of Any) which we ignore at the moment.
 | |
| 221 | +            return execution_response.status.code
 | |
| 222 | + | |
| 223 | +        action_result = execution_response.result
 | |
| 224 | + | |
| 225 | +        self.process_job_output(action_result.output_directories, action_result.output_files)
 | |
| 226 | + | |
| 227 | +        return 0 | 
| ... | ... | @@ -99,9 +99,11 @@ class Sandbox(): | 
| 99 | 99 |          self.__stdout = kwargs['stdout']
 | 
| 100 | 100 |          self.__stderr = kwargs['stderr']
 | 
| 101 | 101 |  | 
| 102 | -        # Setup the directories. Root should be available to subclasses, hence
 | |
| 103 | -        # being single-underscore. The others are private to this class.
 | |
| 102 | +        # Setup the directories. Root and output_directory should be
 | |
| 103 | +        # available to subclasses, hence being single-underscore. The
 | |
| 104 | +        # others are private to this class.
 | |
| 104 | 105 |          self._root = os.path.join(directory, 'root')
 | 
| 106 | +        self._output_directory = None
 | |
| 105 | 107 |          self.__directory = directory
 | 
| 106 | 108 |          self.__scratch = os.path.join(self.__directory, 'scratch')
 | 
| 107 | 109 |          for directory_ in [self._root, self.__scratch]:
 | 
| ... | ... | @@ -144,11 +146,17 @@ class Sandbox(): | 
| 144 | 146 |                  self._vdir = FileBasedDirectory(self._root)
 | 
| 145 | 147 |          return self._vdir
 | 
| 146 | 148 |  | 
| 149 | +    def _set_virtual_directory(self, virtual_directory):
 | |
| 150 | +        """ Sets virtual directory. Useful after remote execution
 | |
| 151 | +        has rewritten the working directory.
 | |
| 152 | +        """
 | |
| 153 | +        self._vdir = virtual_directory
 | |
| 154 | + | |
| 147 | 155 |      def set_environment(self, environment):
 | 
| 148 | 156 |          """Sets the environment variables for the sandbox
 | 
| 149 | 157 |  | 
| 150 | 158 |          Args:
 | 
| 151 | -           directory (dict): The environment variables to use in the sandbox
 | |
| 159 | +           environment (dict): The environment variables to use in the sandbox
 | |
| 152 | 160 |          """
 | 
| 153 | 161 |          self.__env = environment
 | 
| 154 | 162 |  | 
| ... | ... | @@ -160,6 +168,15 @@ class Sandbox(): | 
| 160 | 168 |          """
 | 
| 161 | 169 |          self.__cwd = directory
 | 
| 162 | 170 |  | 
| 171 | +    def set_output_directory(self, directory):
 | |
| 172 | +        """Sets the output directory - the directory which is preserved
 | |
| 173 | +        as an artifact after assembly.
 | |
| 174 | + | |
| 175 | +        Args:
 | |
| 176 | +           directory (str): An absolute path within the sandbox
 | |
| 177 | +        """
 | |
| 178 | +        self._output_directory = directory
 | |
| 179 | + | |
| 163 | 180 |      def mark_directory(self, directory, *, artifact=False):
 | 
| 164 | 181 |          """Marks a sandbox directory and ensures it will exist
 | 
| 165 | 182 |  | 
| ... | ... | @@ -543,6 +543,15 @@ class CasBasedDirectory(Directory): | 
| 543 | 543 |                  filelist.append(k)
 | 
| 544 | 544 |          return filelist
 | 
| 545 | 545 |  | 
| 546 | +    def recalculate_hash(self):
 | |
| 547 | +        """ Recalcuates the hash for this directory and store the results in
 | |
| 548 | +        the cache. If this directory has a parent, tell it to
 | |
| 549 | +        recalculate (since changing this directory changes an entry in
 | |
| 550 | +        the parent). Hashes for subdirectories also get recalculated.
 | |
| 551 | +        """
 | |
| 552 | +        self._recalculate_recursing_up()
 | |
| 553 | +        self._recalculate_recursing_down()
 | |
| 554 | + | |
| 546 | 555 |      def _get_identifier(self):
 | 
| 547 | 556 |          path = ""
 | 
| 548 | 557 |          if self.parent:
 | 
| ... | ... | @@ -204,6 +204,24 @@ with an artifact share. | 
| 204 | 204 |  You can also specify a list of caches here; earlier entries in the list
 | 
| 205 | 205 |  will have higher priority than later ones.
 | 
| 206 | 206 |  | 
| 207 | +Remote execution
 | |
| 208 | +~~~~~~~~~~~~~~~~
 | |
| 209 | +BuildStream supports remote execution using the Google Remote Execution API
 | |
| 210 | +(REAPI). A description of how remote execution works is beyond the scope
 | |
| 211 | +of this document, but you can specify a remote server complying with the REAPI
 | |
| 212 | +using the `remote-execution` option:
 | |
| 213 | + | |
| 214 | +.. code:: yaml
 | |
| 215 | + | |
| 216 | +  remote-execution:
 | |
| 217 | + | |
| 218 | +    # A url defining a remote execution server
 | |
| 219 | +    url: http://buildserver.example.com:50051
 | |
| 220 | + | |
| 221 | +The url should contain a hostname and port separated by ':'. Only plain HTTP is
 | |
| 222 | +currently suported (no HTTPS).
 | |
| 223 | + | |
| 224 | +The Remote Execution API can be found via https://github.com/bazelbuild/remote-apis.
 | |
| 207 | 225 |  | 
| 208 | 226 |  .. _project_essentials_mirrors:
 | 
| 209 | 227 |  | 
| 1 | +kind: compose
 | |
| 2 | + | |
| 3 | +depends:
 | |
| 4 | +- filename: import-bin.bst
 | |
| 5 | +  type: build
 | |
| 6 | +- filename: import-dev.bst
 | |
| 7 | +  type: build
 | |
| 8 | + | |
| 9 | +config:
 | |
| 10 | +  # Dont try running the sandbox, we dont have a
 | |
| 11 | +  # runtime to run anything in this context.
 | |
| 12 | +  integrate: False | 
| 1 | +kind: import
 | |
| 2 | +sources:
 | |
| 3 | +- kind: local
 | |
| 4 | +  path: files/bin-files | 
| 1 | +kind: import
 | |
| 2 | +sources:
 | |
| 3 | +- kind: local
 | |
| 4 | +  path: files/dev-files | 
| 1 | +kind: stack
 | |
| 2 | +description: |
 | |
| 3 | + | |
| 4 | +  Main stack target for the bst build test
 | |
| 5 | + | |
| 6 | +depends:
 | |
| 7 | +- import-bin.bst
 | |
| 8 | +- import-dev.bst
 | |
| 9 | +- compose-all.bst | 
| 1 | +#!/bin/bash
 | |
| 2 | + | |
| 3 | +echo "Hello !" | 
| 1 | +#ifndef __PONY_H__
 | |
| 2 | +#define __PONY_H__
 | |
| 3 | + | |
| 4 | +#define PONY_BEGIN "Once upon a time, there was a pony."
 | |
| 5 | +#define PONY_END "And they lived happily ever after, the end."
 | |
| 6 | + | |
| 7 | +#define MAKE_PONY(story)  \
 | |
| 8 | +  PONY_BEGIN \
 | |
| 9 | +  story \
 | |
| 10 | +  PONY_END
 | |
| 11 | + | |
| 12 | +#endif /* __PONY_H__ */ | 
| 1 | +# Project config for frontend build test
 | |
| 2 | +name: test
 | |
| 3 | + | |
| 4 | +element-path: elements | 
| 1 | +import hashlib
 | |
| 2 | +import os
 | |
| 3 | +import pytest
 | |
| 4 | + | |
| 5 | +from buildstream._artifactcache.artifactcache import ArtifactCacheSpec
 | |
| 6 | +from buildstream._artifactcache.cascache import CASCache
 | |
| 7 | +from buildstream._context import Context
 | |
| 8 | +from buildstream._project import Project
 | |
| 9 | +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 10 | + | |
| 11 | +from tests.testutils import cli, create_artifact_share
 | |
| 12 | + | |
| 13 | + | |
| 14 | +# Project directory
 | |
| 15 | +DATA_DIR = os.path.join(
 | |
| 16 | +    os.path.dirname(os.path.realpath(__file__)),
 | |
| 17 | +    "project",
 | |
| 18 | +)
 | |
| 19 | + | |
| 20 | + | |
| 21 | +# Handle messages from the pipeline
 | |
| 22 | +def message_handler(message, context):
 | |
| 23 | +    pass
 | |
| 24 | + | |
| 25 | + | |
| 26 | +def tree_maker(cas, tree, directory):
 | |
| 27 | +    if tree.root.ByteSize() == 0:
 | |
| 28 | +        tree.root.CopyFrom(directory)
 | |
| 29 | + | |
| 30 | +    for directory_node in directory.directories:
 | |
| 31 | +        child_directory = tree.children.add()
 | |
| 32 | + | |
| 33 | +        with open(cas.objpath(directory_node.digest), 'rb') as f:
 | |
| 34 | +            child_directory.ParseFromString(f.read())
 | |
| 35 | + | |
| 36 | +        tree_maker(cas, tree, child_directory)
 | |
| 37 | + | |
| 38 | + | |
| 39 | +@pytest.mark.datafiles(DATA_DIR)
 | |
| 40 | +def test_pull(cli, tmpdir, datafiles):
 | |
| 41 | +    project_dir = str(datafiles)
 | |
| 42 | + | |
| 43 | +    # Set up an artifact cache.
 | |
| 44 | +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
 | |
| 45 | +        # Configure artifact share
 | |
| 46 | +        cli.configure({
 | |
| 47 | +            'scheduler': {
 | |
| 48 | +                'pushers': 1
 | |
| 49 | +            },
 | |
| 50 | +            'artifacts': {
 | |
| 51 | +                'url': share.repo,
 | |
| 52 | +                'push': True,
 | |
| 53 | +            }
 | |
| 54 | +        })
 | |
| 55 | + | |
| 56 | +        # First build the project with the artifact cache configured
 | |
| 57 | +        result = cli.run(project=project_dir, args=['build', 'target.bst'])
 | |
| 58 | +        result.assert_success()
 | |
| 59 | + | |
| 60 | +        # Assert that we are now cached locally
 | |
| 61 | +        assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
 | |
| 62 | +        # Assert that we shared/pushed the cached artifact
 | |
| 63 | +        element_key = cli.get_element_key(project_dir, 'target.bst')
 | |
| 64 | +        assert share.has_artifact('test', 'target.bst', element_key)
 | |
| 65 | + | |
| 66 | +        # Delete the artifact locally
 | |
| 67 | +        cli.remove_artifact_from_cache(project_dir, 'target.bst')
 | |
| 68 | + | |
| 69 | +        # Assert that we are not cached locally anymore
 | |
| 70 | +        assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
 | |
| 71 | + | |
| 72 | +        # Fake minimal context
 | |
| 73 | +        context = Context()
 | |
| 74 | +        context.set_message_handler(message_handler)
 | |
| 75 | +        context.sched_pushers = 1
 | |
| 76 | +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
 | |
| 77 | +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
 | |
| 78 | +                                                          push=True)]
 | |
| 79 | + | |
| 80 | +        # Load the project and CAS cache
 | |
| 81 | +        project = Project(project_dir, context)
 | |
| 82 | +        project.ensure_fully_loaded()
 | |
| 83 | +        cas = CASCache(context)
 | |
| 84 | + | |
| 85 | +        # Assert that the element's artifact is **not** cached
 | |
| 86 | +        element = project.load_elements(['target.bst'], cas)[0]
 | |
| 87 | +        element_key = cli.get_element_key(project_dir, 'target.bst')
 | |
| 88 | +        assert not cas.contains(element, element_key)
 | |
| 89 | + | |
| 90 | +        # Manually setup the CAS remote
 | |
| 91 | +        cas.setup_remotes(use_config=True)
 | |
| 92 | +        cas.initialize_remotes()
 | |
| 93 | +        assert cas.has_push_remotes()
 | |
| 94 | + | |
| 95 | +        # Pull the artifact
 | |
| 96 | +        pulled = cas.pull(element, element_key)
 | |
| 97 | +        assert pulled is True
 | |
| 98 | +        assert cas.contains(element, element_key)
 | |
| 99 | + | |
| 100 | +        # Finally, close the opened gRPC channels properly!
 | |
| 101 | +        for remote in cas._remotes[project]:
 | |
| 102 | +            if remote.channel:
 | |
| 103 | +                remote.channel.close()
 | |
| 104 | + | |
| 105 | + | |
| 106 | +@pytest.mark.datafiles(DATA_DIR)
 | |
| 107 | +def test_pull_tree(cli, tmpdir, datafiles):
 | |
| 108 | +    project_dir = str(datafiles)
 | |
| 109 | + | |
| 110 | +    # Set up an artifact cache.
 | |
| 111 | +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
 | |
| 112 | +        # Configure artifact share
 | |
| 113 | +        cli.configure({
 | |
| 114 | +            'scheduler': {
 | |
| 115 | +                'pushers': 1
 | |
| 116 | +            },
 | |
| 117 | +            'artifacts': {
 | |
| 118 | +                'url': share.repo,
 | |
| 119 | +                'push': True,
 | |
| 120 | +            }
 | |
| 121 | +        })
 | |
| 122 | + | |
| 123 | +        # First build the project with the artifact cache configured
 | |
| 124 | +        result = cli.run(project=project_dir, args=['build', 'target.bst'])
 | |
| 125 | +        result.assert_success()
 | |
| 126 | + | |
| 127 | +        # Assert that we are now cached locally
 | |
| 128 | +        assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
 | |
| 129 | +        # Assert that we shared/pushed the cached artifact
 | |
| 130 | +        element_key = cli.get_element_key(project_dir, 'target.bst')
 | |
| 131 | +        assert share.has_artifact('test', 'target.bst', element_key)
 | |
| 132 | + | |
| 133 | +        # Fake minimal context
 | |
| 134 | +        context = Context()
 | |
| 135 | +        context.set_message_handler(message_handler)
 | |
| 136 | +        context.sched_pushers = 1
 | |
| 137 | +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
 | |
| 138 | +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
 | |
| 139 | +                                                          push=True)]
 | |
| 140 | + | |
| 141 | +        # Load the project and CAS cache
 | |
| 142 | +        project = Project(project_dir, context)
 | |
| 143 | +        project.ensure_fully_loaded()
 | |
| 144 | +        cas = CASCache(context)
 | |
| 145 | + | |
| 146 | +        # Assert that the element's artifact is cached
 | |
| 147 | +        element = project.load_elements(['target.bst'], cas)[0]
 | |
| 148 | +        element_key = cli.get_element_key(project_dir, 'target.bst')
 | |
| 149 | +        assert cas.contains(element, element_key)
 | |
| 150 | + | |
| 151 | +        # Manually setup the CAS remote
 | |
| 152 | +        cas.setup_remotes(use_config=True)
 | |
| 153 | +        cas.initialize_remotes()
 | |
| 154 | +        assert cas.has_push_remotes(element=element)
 | |
| 155 | + | |
| 156 | +        # Retrieve the Directory object from the cached artifact
 | |
| 157 | +        artifact_ref = cas.get_artifact_fullname(element, element_key)
 | |
| 158 | +        artifact_digest = cas.resolve_ref(artifact_ref)
 | |
| 159 | + | |
| 160 | +        directory = remote_execution_pb2.Directory()
 | |
| 161 | + | |
| 162 | +        with open(cas.objpath(artifact_digest), 'rb') as f:
 | |
| 163 | +            directory.ParseFromString(f.read())
 | |
| 164 | + | |
| 165 | +        # Build the Tree object while we are still cached
 | |
| 166 | +        tree = remote_execution_pb2.Tree()
 | |
| 167 | +        tree_maker(cas, tree, directory)
 | |
| 168 | + | |
| 169 | +        # Push the Tree as a regular message
 | |
| 170 | +        tree_digest = cas.push_message(project, tree)
 | |
| 171 | + | |
| 172 | +        # Now delete the artifact locally
 | |
| 173 | +        cli.remove_artifact_from_cache(project_dir, 'target.bst')
 | |
| 174 | + | |
| 175 | +        # Assert that we are not cached locally anymore
 | |
| 176 | +        assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
 | |
| 177 | + | |
| 178 | +        # Pull the artifact using the Tree object
 | |
| 179 | +        directory_digest = cas.pull_tree(project, tree_digest)
 | |
| 180 | +        assert directory_digest == artifact_digest
 | |
| 181 | + | |
| 182 | +        # Ensure the entire Tree stucture has been pulled
 | |
| 183 | +        assert os.path.exists(cas.objpath(directory_digest))
 | |
| 184 | +        for child_directory in tree.children:
 | |
| 185 | +            child_blob = child_directory.SerializeToString()
 | |
| 186 | + | |
| 187 | +            child_digest = remote_execution_pb2.Digest()
 | |
| 188 | +            child_digest.hash = hashlib.sha256(child_blob).hexdigest()
 | |
| 189 | +            child_digest.size_bytes = len(child_blob)
 | |
| 190 | + | |
| 191 | +            assert os.path.exists(cas.objpath(child_digest))
 | |
| 192 | + | |
| 193 | +        # Finally, close the opened gRPC channels properly!
 | |
| 194 | +        for remote in cas._remotes[project]:
 | |
| 195 | +            if remote.channel:
 | |
| 196 | +                remote.channel.close() | 
| 1 | +import os
 | |
| 2 | +import pytest
 | |
| 3 | + | |
| 4 | +from pluginbase import PluginBase
 | |
| 5 | +from buildstream._artifactcache.artifactcache import ArtifactCacheSpec
 | |
| 6 | +from buildstream._artifactcache.cascache import CASCache
 | |
| 7 | +from buildstream._context import Context
 | |
| 8 | +from buildstream._project import Project
 | |
| 9 | +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 10 | +from buildstream.storage._casbaseddirectory import CasBasedDirectory
 | |
| 11 | + | |
| 12 | +from tests.testutils import cli, create_artifact_share
 | |
| 13 | + | |
| 14 | + | |
| 15 | +# Project directory
 | |
| 16 | +DATA_DIR = os.path.join(
 | |
| 17 | +    os.path.dirname(os.path.realpath(__file__)),
 | |
| 18 | +    "project",
 | |
| 19 | +)
 | |
| 20 | + | |
| 21 | + | |
| 22 | +# Handle messages from the pipeline
 | |
| 23 | +def message_handler(message, context):
 | |
| 24 | +    pass
 | |
| 25 | + | |
| 26 | + | |
| 27 | +@pytest.mark.datafiles(DATA_DIR)
 | |
| 28 | +def test_push(cli, tmpdir, datafiles):
 | |
| 29 | +    project_dir = str(datafiles)
 | |
| 30 | + | |
| 31 | +    # First build the project without the artifact cache configured
 | |
| 32 | +    result = cli.run(project=project_dir, args=['build', 'target.bst'])
 | |
| 33 | +    result.assert_success()
 | |
| 34 | + | |
| 35 | +    # Assert that we are now cached locally
 | |
| 36 | +    assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
 | |
| 37 | + | |
| 38 | +    # Set up an artifact cache.
 | |
| 39 | +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
 | |
| 40 | +        # Fake minimal context
 | |
| 41 | +        context = Context()
 | |
| 42 | +        context.set_message_handler(message_handler)
 | |
| 43 | +        context.sched_pushers = 1
 | |
| 44 | +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
 | |
| 45 | +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
 | |
| 46 | +                                                          push=True)]
 | |
| 47 | + | |
| 48 | +        # Load the project and CAS cache
 | |
| 49 | +        project = Project(project_dir, context)
 | |
| 50 | +        project.ensure_fully_loaded()
 | |
| 51 | +        cas = CASCache(context)
 | |
| 52 | + | |
| 53 | +        # Assert that the element's artifact is cached
 | |
| 54 | +        element = project.load_elements(['target.bst'], cas)[0]
 | |
| 55 | +        element_key = cli.get_element_key(project_dir, 'target.bst')
 | |
| 56 | +        assert cas.contains(element, element_key)
 | |
| 57 | + | |
| 58 | +        # Manually setup the CAS remote
 | |
| 59 | +        cas.setup_remotes(use_config=True)
 | |
| 60 | +        cas.initialize_remotes()
 | |
| 61 | +        assert cas.has_push_remotes(element=element)
 | |
| 62 | + | |
| 63 | +        # Push the element's artifact
 | |
| 64 | +        pushed = cas.push(element, [element_key])
 | |
| 65 | +        assert pushed is True
 | |
| 66 | +        assert share.has_artifact('test', 'target.bst', element_key)
 | |
| 67 | + | |
| 68 | +        # Finally, close the opened gRPC channels properly!
 | |
| 69 | +        for remote in cas._remotes[project]:
 | |
| 70 | +            if remote.channel:
 | |
| 71 | +                remote.channel.close()
 | |
| 72 | + | |
| 73 | + | |
| 74 | +@pytest.mark.datafiles(DATA_DIR)
 | |
| 75 | +def test_push_directory(cli, tmpdir, datafiles):
 | |
| 76 | +    project_dir = str(datafiles)
 | |
| 77 | + | |
| 78 | +    # First build the project without the artifact cache configured
 | |
| 79 | +    result = cli.run(project=project_dir, args=['build', 'target.bst'])
 | |
| 80 | +    result.assert_success()
 | |
| 81 | + | |
| 82 | +    # Assert that we are now cached locally
 | |
| 83 | +    assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
 | |
| 84 | + | |
| 85 | +    # Set up an artifact cache.
 | |
| 86 | +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
 | |
| 87 | +        # Fake minimal context
 | |
| 88 | +        context = Context()
 | |
| 89 | +        context.set_message_handler(message_handler)
 | |
| 90 | +        context.sched_pushers = 1
 | |
| 91 | +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
 | |
| 92 | +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
 | |
| 93 | +                                                          push=True)]
 | |
| 94 | + | |
| 95 | +        # Load the project and CAS cache
 | |
| 96 | +        project = Project(project_dir, context)
 | |
| 97 | +        project.ensure_fully_loaded()
 | |
| 98 | +        cas = CASCache(context)
 | |
| 99 | + | |
| 100 | +        # Assert that the element's artifact is cached
 | |
| 101 | +        element = project.load_elements(['target.bst'], cas)[0]
 | |
| 102 | +        element_key = cli.get_element_key(project_dir, 'target.bst')
 | |
| 103 | +        assert cas.contains(element, element_key)
 | |
| 104 | + | |
| 105 | +        # Manually setup the CAS remote
 | |
| 106 | +        cas.setup_remotes(use_config=True)
 | |
| 107 | +        cas.initialize_remotes()
 | |
| 108 | +        assert cas.has_push_remotes(element=element)
 | |
| 109 | + | |
| 110 | +        # Recreate the CasBasedDirectory object from the cached artifact
 | |
| 111 | +        artifact_ref = cas.get_artifact_fullname(element, element_key)
 | |
| 112 | +        artifact_digest = cas.resolve_ref(artifact_ref)
 | |
| 113 | + | |
| 114 | +        directory = CasBasedDirectory(context, ref=artifact_digest)
 | |
| 115 | + | |
| 116 | +        # Push the CasBasedDirectory object
 | |
| 117 | +        directory_digest = cas.push_directory(project, directory)
 | |
| 118 | +        assert directory_digest == artifact_digest
 | |
| 119 | +        assert share.has_object(directory_digest)
 | |
| 120 | + | |
| 121 | +        # Finally, close the opened gRPC channels properly!
 | |
| 122 | +        for remote in cas._remotes[project]:
 | |
| 123 | +            if remote.channel:
 | |
| 124 | +                remote.channel.close()
 | |
| 125 | + | |
| 126 | + | |
| 127 | +@pytest.mark.datafiles(DATA_DIR)
 | |
| 128 | +def test_push_message(cli, tmpdir, datafiles):
 | |
| 129 | +    project_dir = str(datafiles)
 | |
| 130 | + | |
| 131 | +    # Set up an artifact cache.
 | |
| 132 | +    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
 | |
| 133 | +        # Fake minimal context
 | |
| 134 | +        context = Context()
 | |
| 135 | +        context.set_message_handler(message_handler)
 | |
| 136 | +        context.sched_pushers = 1
 | |
| 137 | +        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
 | |
| 138 | +        context.artifact_cache_specs = [ArtifactCacheSpec(url=share.repo,
 | |
| 139 | +                                                          push=True)]
 | |
| 140 | + | |
| 141 | +        # Load the project and CAS cache
 | |
| 142 | +        project = Project(project_dir, context)
 | |
| 143 | +        project.ensure_fully_loaded()
 | |
| 144 | +        cas = CASCache(context)
 | |
| 145 | + | |
| 146 | +        # Manually setup the CAS remote
 | |
| 147 | +        cas.setup_remotes(use_config=True)
 | |
| 148 | +        cas.initialize_remotes()
 | |
| 149 | +        assert cas.has_push_remotes()
 | |
| 150 | + | |
| 151 | +        # Create an example message object
 | |
| 152 | +        command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'],
 | |
| 153 | +                                               working_directory='/buildstream-build',
 | |
| 154 | +                                               output_directories=['/buildstream-install'])
 | |
| 155 | + | |
| 156 | +        # Push the message object
 | |
| 157 | +        digest = cas.push_message(project, command)
 | |
| 158 | +        assert digest
 | |
| 159 | +        assert share.has_object(digest)
 | |
| 160 | + | |
| 161 | +        # Finally, close the opened gRPC channels properly!
 | |
| 162 | +        for remote in cas._remotes[project]:
 | |
| 163 | +            if remote.channel:
 | |
| 164 | +                remote.channel.close() | 
| ... | ... | @@ -15,6 +15,7 @@ from buildstream._artifactcache.cascache import CASCache | 
| 15 | 15 |  from buildstream._artifactcache.casserver import create_server
 | 
| 16 | 16 |  from buildstream._context import Context
 | 
| 17 | 17 |  from buildstream._exceptions import ArtifactError
 | 
| 18 | +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 18 | 19 |  | 
| 19 | 20 |  | 
| 20 | 21 |  # ArtifactShare()
 | 
| ... | ... | @@ -87,6 +88,23 @@ class ArtifactShare(): | 
| 87 | 88 |          # Sleep until termination by signal
 | 
| 88 | 89 |          signal.pause()
 | 
| 89 | 90 |  | 
| 91 | +    # has_object():
 | |
| 92 | +    #
 | |
| 93 | +    # Checks whether the object is present in the share
 | |
| 94 | +    #
 | |
| 95 | +    # Args:
 | |
| 96 | +    #    digest (str): The object's digest
 | |
| 97 | +    #
 | |
| 98 | +    # Returns:
 | |
| 99 | +    #    (bool): True if the object exists in the share, otherwise false.
 | |
| 100 | +    def has_object(self, digest):
 | |
| 101 | + | |
| 102 | +        assert isinstance(digest, remote_execution_pb2.Digest)
 | |
| 103 | + | |
| 104 | +        object_path = self.cas.objpath(digest)
 | |
| 105 | + | |
| 106 | +        return os.path.exists(object_path)
 | |
| 107 | + | |
| 90 | 108 |      # has_artifact():
 | 
| 91 | 109 |      #
 | 
| 92 | 110 |      # Checks whether the artifact is present in the share
 | 
