[Notes] [Git][BuildStream/buildstream][tristan/element-processing-order] 17 commits: .gitlab-ci.yml: Add tests for python 3.7



Title: GitLab

Tristan Van Berkom pushed to branch tristan/element-processing-order at BuildStream / buildstream

Commits:

27 changed files:

Changes:

  • .gitlab-ci.yml
    ... ... @@ -60,6 +60,16 @@ tests-ubuntu-18.04:
    60 60
       image: buildstream/testsuite-ubuntu:18.04-5da27168-32c47d1c
    
    61 61
       <<: *tests
    
    62 62
     
    
    63
    +tests-python-3.7-stretch:
    
    64
    +  image: buildstream/testsuite-python:3.7-stretch-a60f0c39
    
    65
    +  <<: *tests
    
    66
    +
    
    67
    +  variables:
    
    68
    +    # Note that we explicitly specify TOXENV in this case because this
    
    69
    +    # image has both 3.6 and 3.7 versions. python3.6 cannot be removed because
    
    70
    +    # some of our base dependencies declare it as their runtime dependency.
    
    71
    +    TOXENV: py37
    
    72
    +
    
    63 73
     overnight-fedora-28-aarch64:
    
    64 74
       image: buildstream/testsuite-fedora:aarch64-28-5da27168-32c47d1c
    
    65 75
       tags:
    

  • buildstream/_artifactcache/artifactcache.pybuildstream/_artifactcache.py
    ... ... @@ -19,18 +19,16 @@
    19 19
     
    
    20 20
     import multiprocessing
    
    21 21
     import os
    
    22
    -import signal
    
    23 22
     import string
    
    24 23
     from collections.abc import Mapping
    
    25 24
     
    
    26
    -from ..types import _KeyStrength
    
    27
    -from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
    
    28
    -from .._message import Message, MessageType
    
    29
    -from .. import _signals
    
    30
    -from .. import utils
    
    31
    -from .. import _yaml
    
    25
    +from .types import _KeyStrength
    
    26
    +from ._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
    
    27
    +from ._message import Message, MessageType
    
    28
    +from . import utils
    
    29
    +from . import _yaml
    
    32 30
     
    
    33
    -from .cascache import CASRemote, CASRemoteSpec
    
    31
    +from ._cas import CASRemote, CASRemoteSpec
    
    34 32
     
    
    35 33
     
    
    36 34
     CACHE_SIZE_FILE = "cache_size"
    
    ... ... @@ -249,7 +247,7 @@ class ArtifactCache():
    249 247
                     # FIXME: Asking the user what to do may be neater
    
    250 248
                     default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
    
    251 249
                                                 'buildstream.conf')
    
    252
    -                detail = ("There is not enough space to build the given element.\n"
    
    250
    +                detail = ("There is not enough space to complete the build.\n"
    
    253 251
                               "Please increase the cache-quota in {}."
    
    254 252
                               .format(self.context.config_origin or default_conf))
    
    255 253
     
    
    ... ... @@ -375,20 +373,8 @@ class ArtifactCache():
    375 373
             remotes = {}
    
    376 374
             q = multiprocessing.Queue()
    
    377 375
             for remote_spec in remote_specs:
    
    378
    -            # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    379
    -            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    380
    -            p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
    
    381 376
     
    
    382
    -            try:
    
    383
    -                # Keep SIGINT blocked in the child process
    
    384
    -                with _signals.blocked([signal.SIGINT], ignore=False):
    
    385
    -                    p.start()
    
    386
    -
    
    387
    -                error = q.get()
    
    388
    -                p.join()
    
    389
    -            except KeyboardInterrupt:
    
    390
    -                utils._kill_process_tree(p.pid)
    
    391
    -                raise
    
    377
    +            error = CASRemote.check_remote(remote_spec, q)
    
    392 378
     
    
    393 379
                 if error and on_failure:
    
    394 380
                     on_failure(remote_spec.url, error)
    
    ... ... @@ -747,7 +733,7 @@ class ArtifactCache():
    747 733
                                     "servers are configured as push remotes.")
    
    748 734
     
    
    749 735
             for remote in push_remotes:
    
    750
    -            message_digest = self.cas.push_message(remote, message)
    
    736
    +            message_digest = remote.push_message(message)
    
    751 737
     
    
    752 738
             return message_digest
    
    753 739
     
    

  • buildstream/_artifactcache/__init__.pybuildstream/_cas/__init__.py
    ... ... @@ -17,4 +17,5 @@
    17 17
     #  Authors:
    
    18 18
     #        Tristan Van Berkom <tristan vanberkom codethink co uk>
    
    19 19
     
    
    20
    -from .artifactcache import ArtifactCache, ArtifactCacheSpec, CACHE_SIZE_FILE
    20
    +from .cascache import CASCache
    
    21
    +from .casremote import CASRemote, CASRemoteSpec

  • buildstream/_artifactcache/cascache.pybuildstream/_cas/cascache.py
    ... ... @@ -17,85 +17,23 @@
    17 17
     #  Authors:
    
    18 18
     #        Jürg Billeter <juerg billeter codethink co uk>
    
    19 19
     
    
    20
    -from collections import namedtuple
    
    21 20
     import hashlib
    
    22 21
     import itertools
    
    23
    -import io
    
    24 22
     import os
    
    25 23
     import stat
    
    26 24
     import tempfile
    
    27 25
     import uuid
    
    28 26
     import contextlib
    
    29
    -from urllib.parse import urlparse
    
    30 27
     
    
    31 28
     import grpc
    
    32 29
     
    
    33
    -from .._protos.google.rpc import code_pb2
    
    34
    -from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    35
    -from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    36
    -from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    30
    +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    31
    +from .._protos.buildstream.v2 import buildstream_pb2
    
    37 32
     
    
    38 33
     from .. import utils
    
    39
    -from .._exceptions import CASError, LoadError, LoadErrorReason
    
    40
    -from .. import _yaml
    
    34
    +from .._exceptions import CASCacheError
    
    41 35
     
    
    42
    -
    
    43
    -# The default limit for gRPC messages is 4 MiB.
    
    44
    -# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    45
    -_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    46
    -
    
    47
    -
    
    48
    -class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')):
    
    49
    -
    
    50
    -    # _new_from_config_node
    
    51
    -    #
    
    52
    -    # Creates an CASRemoteSpec() from a YAML loaded node
    
    53
    -    #
    
    54
    -    @staticmethod
    
    55
    -    def _new_from_config_node(spec_node, basedir=None):
    
    56
    -        _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance-name'])
    
    57
    -        url = _yaml.node_get(spec_node, str, 'url')
    
    58
    -        push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
    
    59
    -        if not url:
    
    60
    -            provenance = _yaml.node_get_provenance(spec_node, 'url')
    
    61
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    62
    -                            "{}: empty artifact cache URL".format(provenance))
    
    63
    -
    
    64
    -        instance_name = _yaml.node_get(spec_node, str, 'instance-name', default_value=None)
    
    65
    -
    
    66
    -        server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
    
    67
    -        if server_cert and basedir:
    
    68
    -            server_cert = os.path.join(basedir, server_cert)
    
    69
    -
    
    70
    -        client_key = _yaml.node_get(spec_node, str, 'client-key', default_value=None)
    
    71
    -        if client_key and basedir:
    
    72
    -            client_key = os.path.join(basedir, client_key)
    
    73
    -
    
    74
    -        client_cert = _yaml.node_get(spec_node, str, 'client-cert', default_value=None)
    
    75
    -        if client_cert and basedir:
    
    76
    -            client_cert = os.path.join(basedir, client_cert)
    
    77
    -
    
    78
    -        if client_key and not client_cert:
    
    79
    -            provenance = _yaml.node_get_provenance(spec_node, 'client-key')
    
    80
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    81
    -                            "{}: 'client-key' was specified without 'client-cert'".format(provenance))
    
    82
    -
    
    83
    -        if client_cert and not client_key:
    
    84
    -            provenance = _yaml.node_get_provenance(spec_node, 'client-cert')
    
    85
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    86
    -                            "{}: 'client-cert' was specified without 'client-key'".format(provenance))
    
    87
    -
    
    88
    -        return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name)
    
    89
    -
    
    90
    -
    
    91
    -CASRemoteSpec.__new__.__defaults__ = (None, None, None, None)
    
    92
    -
    
    93
    -
    
    94
    -class BlobNotFound(CASError):
    
    95
    -
    
    96
    -    def __init__(self, blob, msg):
    
    97
    -        self.blob = blob
    
    98
    -        super().__init__(msg)
    
    36
    +from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate
    
    99 37
     
    
    100 38
     
    
    101 39
     # A CASCache manages a CAS repository as specified in the Remote Execution API.
    
    ... ... @@ -120,7 +58,7 @@ class CASCache():
    120 58
             headdir = os.path.join(self.casdir, 'refs', 'heads')
    
    121 59
             objdir = os.path.join(self.casdir, 'objects')
    
    122 60
             if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
    
    123
    -            raise CASError("CAS repository check failed for '{}'".format(self.casdir))
    
    61
    +            raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir))
    
    124 62
     
    
    125 63
         # contains():
    
    126 64
         #
    
    ... ... @@ -169,7 +107,7 @@ class CASCache():
    169 107
         #     subdir (str): Optional specific dir to extract
    
    170 108
         #
    
    171 109
         # Raises:
    
    172
    -    #     CASError: In cases there was an OSError, or if the ref did not exist.
    
    110
    +    #     CASCacheError: In cases there was an OSError, or if the ref did not exist.
    
    173 111
         #
    
    174 112
         # Returns: path to extracted directory
    
    175 113
         #
    
    ... ... @@ -201,7 +139,7 @@ class CASCache():
    201 139
                     # Another process beat us to rename
    
    202 140
                     pass
    
    203 141
                 except OSError as e:
    
    204
    -                raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
    
    142
    +                raise CASCacheError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
    
    205 143
     
    
    206 144
             return originaldest
    
    207 145
     
    
    ... ... @@ -245,29 +183,6 @@ class CASCache():
    245 183
     
    
    246 184
             return modified, removed, added
    
    247 185
     
    
    248
    -    def initialize_remote(self, remote_spec, q):
    
    249
    -        try:
    
    250
    -            remote = CASRemote(remote_spec)
    
    251
    -            remote.init()
    
    252
    -
    
    253
    -            request = buildstream_pb2.StatusRequest(instance_name=remote_spec.instance_name)
    
    254
    -            response = remote.ref_storage.Status(request)
    
    255
    -
    
    256
    -            if remote_spec.push and not response.allow_updates:
    
    257
    -                q.put('CAS server does not allow push')
    
    258
    -            else:
    
    259
    -                # No error
    
    260
    -                q.put(None)
    
    261
    -
    
    262
    -        except grpc.RpcError as e:
    
    263
    -            # str(e) is too verbose for errors reported to the user
    
    264
    -            q.put(e.details())
    
    265
    -
    
    266
    -        except Exception as e:               # pylint: disable=broad-except
    
    267
    -            # Whatever happens, we need to return it to the calling process
    
    268
    -            #
    
    269
    -            q.put(str(e))
    
    270
    -
    
    271 186
         # pull():
    
    272 187
         #
    
    273 188
         # Pull a ref from a remote repository.
    
    ... ... @@ -306,7 +221,7 @@ class CASCache():
    306 221
                 return True
    
    307 222
             except grpc.RpcError as e:
    
    308 223
                 if e.code() != grpc.StatusCode.NOT_FOUND:
    
    309
    -                raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
    
    224
    +                raise CASCacheError("Failed to pull ref {}: {}".format(ref, e)) from e
    
    310 225
                 else:
    
    311 226
                     return False
    
    312 227
             except BlobNotFound as e:
    
    ... ... @@ -360,7 +275,7 @@ class CASCache():
    360 275
         #   (bool): True if any remote was updated, False if no pushes were required
    
    361 276
         #
    
    362 277
         # Raises:
    
    363
    -    #   (CASError): if there was an error
    
    278
    +    #   (CASCacheError): if there was an error
    
    364 279
         #
    
    365 280
         def push(self, refs, remote):
    
    366 281
             skipped_remote = True
    
    ... ... @@ -395,7 +310,7 @@ class CASCache():
    395 310
                     skipped_remote = False
    
    396 311
             except grpc.RpcError as e:
    
    397 312
                 if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    398
    -                raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
    
    313
    +                raise CASCacheError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
    
    399 314
     
    
    400 315
             return not skipped_remote
    
    401 316
     
    
    ... ... @@ -408,57 +323,13 @@ class CASCache():
    408 323
         #     directory (Directory): A virtual directory object to push.
    
    409 324
         #
    
    410 325
         # Raises:
    
    411
    -    #     (CASError): if there was an error
    
    326
    +    #     (CASCacheError): if there was an error
    
    412 327
         #
    
    413 328
         def push_directory(self, remote, directory):
    
    414 329
             remote.init()
    
    415 330
     
    
    416 331
             self._send_directory(remote, directory.ref)
    
    417 332
     
    
    418
    -    # push_message():
    
    419
    -    #
    
    420
    -    # Push the given protobuf message to a remote.
    
    421
    -    #
    
    422
    -    # Args:
    
    423
    -    #     remote (CASRemote): The remote to push to
    
    424
    -    #     message (Message): A protobuf message to push.
    
    425
    -    #
    
    426
    -    # Raises:
    
    427
    -    #     (CASError): if there was an error
    
    428
    -    #
    
    429
    -    def push_message(self, remote, message):
    
    430
    -
    
    431
    -        message_buffer = message.SerializeToString()
    
    432
    -        message_digest = utils._message_digest(message_buffer)
    
    433
    -
    
    434
    -        remote.init()
    
    435
    -
    
    436
    -        with io.BytesIO(message_buffer) as b:
    
    437
    -            self._send_blob(remote, message_digest, b)
    
    438
    -
    
    439
    -        return message_digest
    
    440
    -
    
    441
    -    # verify_digest_on_remote():
    
    442
    -    #
    
    443
    -    # Check whether the object is already on the server in which case
    
    444
    -    # there is no need to upload it.
    
    445
    -    #
    
    446
    -    # Args:
    
    447
    -    #     remote (CASRemote): The remote to check
    
    448
    -    #     digest (Digest): The object digest.
    
    449
    -    #
    
    450
    -    def verify_digest_on_remote(self, remote, digest):
    
    451
    -        remote.init()
    
    452
    -
    
    453
    -        request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
    
    454
    -        request.blob_digests.extend([digest])
    
    455
    -
    
    456
    -        response = remote.cas.FindMissingBlobs(request)
    
    457
    -        if digest in response.missing_blob_digests:
    
    458
    -            return False
    
    459
    -
    
    460
    -        return True
    
    461
    -
    
    462 333
         # objpath():
    
    463 334
         #
    
    464 335
         # Return the path of an object based on its digest.
    
    ... ... @@ -531,7 +402,7 @@ class CASCache():
    531 402
                 pass
    
    532 403
     
    
    533 404
             except OSError as e:
    
    534
    -            raise CASError("Failed to hash object: {}".format(e)) from e
    
    405
    +            raise CASCacheError("Failed to hash object: {}".format(e)) from e
    
    535 406
     
    
    536 407
             return digest
    
    537 408
     
    
    ... ... @@ -572,7 +443,7 @@ class CASCache():
    572 443
                     return digest
    
    573 444
     
    
    574 445
             except FileNotFoundError as e:
    
    575
    -            raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
    
    446
    +            raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e
    
    576 447
     
    
    577 448
         # update_mtime()
    
    578 449
         #
    
    ... ... @@ -585,7 +456,7 @@ class CASCache():
    585 456
             try:
    
    586 457
                 os.utime(self._refpath(ref))
    
    587 458
             except FileNotFoundError as e:
    
    588
    -            raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
    
    459
    +            raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e
    
    589 460
     
    
    590 461
         # calculate_cache_size()
    
    591 462
         #
    
    ... ... @@ -676,7 +547,7 @@ class CASCache():
    676 547
             # Remove cache ref
    
    677 548
             refpath = self._refpath(ref)
    
    678 549
             if not os.path.exists(refpath):
    
    679
    -            raise CASError("Could not find ref '{}'".format(ref))
    
    550
    +            raise CASCacheError("Could not find ref '{}'".format(ref))
    
    680 551
     
    
    681 552
             os.unlink(refpath)
    
    682 553
     
    
    ... ... @@ -792,7 +663,7 @@ class CASCache():
    792 663
                     # The process serving the socket can't be cached anyway
    
    793 664
                     pass
    
    794 665
                 else:
    
    795
    -                raise CASError("Unsupported file type for {}".format(full_path))
    
    666
    +                raise CASCacheError("Unsupported file type for {}".format(full_path))
    
    796 667
     
    
    797 668
             return self.add_object(digest=dir_digest,
    
    798 669
                                    buffer=directory.SerializeToString())
    
    ... ... @@ -811,7 +682,7 @@ class CASCache():
    811 682
                 if dirnode.name == name:
    
    812 683
                     return dirnode.digest
    
    813 684
     
    
    814
    -        raise CASError("Subdirectory {} not found".format(name))
    
    685
    +        raise CASCacheError("Subdirectory {} not found".format(name))
    
    815 686
     
    
    816 687
         def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
    
    817 688
             dir_a = remote_execution_pb2.Directory()
    
    ... ... @@ -909,23 +780,6 @@ class CASCache():
    909 780
             for dirnode in directory.directories:
    
    910 781
                 yield from self._required_blobs(dirnode.digest)
    
    911 782
     
    
    912
    -    def _fetch_blob(self, remote, digest, stream):
    
    913
    -        resource_name_components = ['blobs', digest.hash, str(digest.size_bytes)]
    
    914
    -
    
    915
    -        if remote.spec.instance_name:
    
    916
    -            resource_name_components.insert(0, remote.spec.instance_name)
    
    917
    -
    
    918
    -        resource_name = '/'.join(resource_name_components)
    
    919
    -
    
    920
    -        request = bytestream_pb2.ReadRequest()
    
    921
    -        request.resource_name = resource_name
    
    922
    -        request.read_offset = 0
    
    923
    -        for response in remote.bytestream.Read(request):
    
    924
    -            stream.write(response.data)
    
    925
    -        stream.flush()
    
    926
    -
    
    927
    -        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    928
    -
    
    929 783
         # _ensure_blob():
    
    930 784
         #
    
    931 785
         # Fetch and add blob if it's not already local.
    
    ... ... @@ -944,7 +798,7 @@ class CASCache():
    944 798
                 return objpath
    
    945 799
     
    
    946 800
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    947
    -            self._fetch_blob(remote, digest, f)
    
    801
    +            remote._fetch_blob(digest, f)
    
    948 802
     
    
    949 803
                 added_digest = self.add_object(path=f.name, link_directly=True)
    
    950 804
                 assert added_digest.hash == digest.hash
    
    ... ... @@ -1051,7 +905,7 @@ class CASCache():
    1051 905
         def _fetch_tree(self, remote, digest):
    
    1052 906
             # download but do not store the Tree object
    
    1053 907
             with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    1054
    -            self._fetch_blob(remote, digest, out)
    
    908
    +            remote._fetch_blob(digest, out)
    
    1055 909
     
    
    1056 910
                 tree = remote_execution_pb2.Tree()
    
    1057 911
     
    
    ... ... @@ -1071,39 +925,6 @@ class CASCache():
    1071 925
     
    
    1072 926
             return dirdigest
    
    1073 927
     
    
    1074
    -    def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
    
    1075
    -        resource_name_components = ['uploads', str(u_uid), 'blobs',
    
    1076
    -                                    digest.hash, str(digest.size_bytes)]
    
    1077
    -
    
    1078
    -        if remote.spec.instance_name:
    
    1079
    -            resource_name_components.insert(0, remote.spec.instance_name)
    
    1080
    -
    
    1081
    -        resource_name = '/'.join(resource_name_components)
    
    1082
    -
    
    1083
    -        def request_stream(resname, instream):
    
    1084
    -            offset = 0
    
    1085
    -            finished = False
    
    1086
    -            remaining = digest.size_bytes
    
    1087
    -            while not finished:
    
    1088
    -                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    1089
    -                remaining -= chunk_size
    
    1090
    -
    
    1091
    -                request = bytestream_pb2.WriteRequest()
    
    1092
    -                request.write_offset = offset
    
    1093
    -                # max. _MAX_PAYLOAD_BYTES chunks
    
    1094
    -                request.data = instream.read(chunk_size)
    
    1095
    -                request.resource_name = resname
    
    1096
    -                request.finish_write = remaining <= 0
    
    1097
    -
    
    1098
    -                yield request
    
    1099
    -
    
    1100
    -                offset += chunk_size
    
    1101
    -                finished = request.finish_write
    
    1102
    -
    
    1103
    -        response = remote.bytestream.Write(request_stream(resource_name, stream))
    
    1104
    -
    
    1105
    -        assert response.committed_size == digest.size_bytes
    
    1106
    -
    
    1107 928
         def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
    
    1108 929
             required_blobs = self._required_blobs(digest)
    
    1109 930
     
    
    ... ... @@ -1137,7 +958,7 @@ class CASCache():
    1137 958
                     if (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    1138 959
                             not remote.batch_update_supported):
    
    1139 960
                         # Too large for batch request, upload in independent request.
    
    1140
    -                    self._send_blob(remote, digest, f, u_uid=u_uid)
    
    961
    +                    remote._send_blob(digest, f, u_uid=u_uid)
    
    1141 962
                     else:
    
    1142 963
                         if not batch.add(digest, f):
    
    1143 964
                             # Not enough space left in batch request.
    
    ... ... @@ -1150,183 +971,6 @@ class CASCache():
    1150 971
             batch.send()
    
    1151 972
     
    
    1152 973
     
    
    1153
    -# Represents a single remote CAS cache.
    
    1154
    -#
    
    1155
    -class CASRemote():
    
    1156
    -    def __init__(self, spec):
    
    1157
    -        self.spec = spec
    
    1158
    -        self._initialized = False
    
    1159
    -        self.channel = None
    
    1160
    -        self.bytestream = None
    
    1161
    -        self.cas = None
    
    1162
    -        self.ref_storage = None
    
    1163
    -        self.batch_update_supported = None
    
    1164
    -        self.batch_read_supported = None
    
    1165
    -        self.capabilities = None
    
    1166
    -        self.max_batch_total_size_bytes = None
    
    1167
    -
    
    1168
    -    def init(self):
    
    1169
    -        if not self._initialized:
    
    1170
    -            url = urlparse(self.spec.url)
    
    1171
    -            if url.scheme == 'http':
    
    1172
    -                port = url.port or 80
    
    1173
    -                self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port))
    
    1174
    -            elif url.scheme == 'https':
    
    1175
    -                port = url.port or 443
    
    1176
    -
    
    1177
    -                if self.spec.server_cert:
    
    1178
    -                    with open(self.spec.server_cert, 'rb') as f:
    
    1179
    -                        server_cert_bytes = f.read()
    
    1180
    -                else:
    
    1181
    -                    server_cert_bytes = None
    
    1182
    -
    
    1183
    -                if self.spec.client_key:
    
    1184
    -                    with open(self.spec.client_key, 'rb') as f:
    
    1185
    -                        client_key_bytes = f.read()
    
    1186
    -                else:
    
    1187
    -                    client_key_bytes = None
    
    1188
    -
    
    1189
    -                if self.spec.client_cert:
    
    1190
    -                    with open(self.spec.client_cert, 'rb') as f:
    
    1191
    -                        client_cert_bytes = f.read()
    
    1192
    -                else:
    
    1193
    -                    client_cert_bytes = None
    
    1194
    -
    
    1195
    -                credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes,
    
    1196
    -                                                           private_key=client_key_bytes,
    
    1197
    -                                                           certificate_chain=client_cert_bytes)
    
    1198
    -                self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
    
    1199
    -            else:
    
    1200
    -                raise CASError("Unsupported URL: {}".format(self.spec.url))
    
    1201
    -
    
    1202
    -            self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    1203
    -            self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    1204
    -            self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
    
    1205
    -            self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
    
    1206
    -
    
    1207
    -            self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    1208
    -            try:
    
    1209
    -                request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=self.spec.instance_name)
    
    1210
    -                response = self.capabilities.GetCapabilities(request)
    
    1211
    -                server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
    
    1212
    -                if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
    
    1213
    -                    self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
    
    1214
    -            except grpc.RpcError as e:
    
    1215
    -                # Simply use the defaults for servers that don't implement GetCapabilities()
    
    1216
    -                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1217
    -                    raise
    
    1218
    -
    
    1219
    -            # Check whether the server supports BatchReadBlobs()
    
    1220
    -            self.batch_read_supported = False
    
    1221
    -            try:
    
    1222
    -                request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=self.spec.instance_name)
    
    1223
    -                response = self.cas.BatchReadBlobs(request)
    
    1224
    -                self.batch_read_supported = True
    
    1225
    -            except grpc.RpcError as e:
    
    1226
    -                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1227
    -                    raise
    
    1228
    -
    
    1229
    -            # Check whether the server supports BatchUpdateBlobs()
    
    1230
    -            self.batch_update_supported = False
    
    1231
    -            try:
    
    1232
    -                request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self.spec.instance_name)
    
    1233
    -                response = self.cas.BatchUpdateBlobs(request)
    
    1234
    -                self.batch_update_supported = True
    
    1235
    -            except grpc.RpcError as e:
    
    1236
    -                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    1237
    -                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    1238
    -                    raise
    
    1239
    -
    
    1240
    -            self._initialized = True
    
    1241
    -
    
    1242
    -
    
    1243
    -# Represents a batch of blobs queued for fetching.
    
    1244
    -#
    
    1245
    -class _CASBatchRead():
    
    1246
    -    def __init__(self, remote):
    
    1247
    -        self._remote = remote
    
    1248
    -        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1249
    -        self._request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=remote.spec.instance_name)
    
    1250
    -        self._size = 0
    
    1251
    -        self._sent = False
    
    1252
    -
    
    1253
    -    def add(self, digest):
    
    1254
    -        assert not self._sent
    
    1255
    -
    
    1256
    -        new_batch_size = self._size + digest.size_bytes
    
    1257
    -        if new_batch_size > self._max_total_size_bytes:
    
    1258
    -            # Not enough space left in current batch
    
    1259
    -            return False
    
    1260
    -
    
    1261
    -        request_digest = self._request.digests.add()
    
    1262
    -        request_digest.hash = digest.hash
    
    1263
    -        request_digest.size_bytes = digest.size_bytes
    
    1264
    -        self._size = new_batch_size
    
    1265
    -        return True
    
    1266
    -
    
    1267
    -    def send(self):
    
    1268
    -        assert not self._sent
    
    1269
    -        self._sent = True
    
    1270
    -
    
    1271
    -        if not self._request.digests:
    
    1272
    -            return
    
    1273
    -
    
    1274
    -        batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    1275
    -
    
    1276
    -        for response in batch_response.responses:
    
    1277
    -            if response.status.code == code_pb2.NOT_FOUND:
    
    1278
    -                raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
    
    1279
    -                    response.digest.hash, response.status.code))
    
    1280
    -            if response.status.code != code_pb2.OK:
    
    1281
    -                raise CASError("Failed to download blob {}: {}".format(
    
    1282
    -                    response.digest.hash, response.status.code))
    
    1283
    -            if response.digest.size_bytes != len(response.data):
    
    1284
    -                raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    1285
    -                    response.digest.hash, response.digest.size_bytes, len(response.data)))
    
    1286
    -
    
    1287
    -            yield (response.digest, response.data)
    
    1288
    -
    
    1289
    -
    
    1290
    -# Represents a batch of blobs queued for upload.
    
    1291
    -#
    
    1292
    -class _CASBatchUpdate():
    
    1293
    -    def __init__(self, remote):
    
    1294
    -        self._remote = remote
    
    1295
    -        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1296
    -        self._request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=remote.spec.instance_name)
    
    1297
    -        self._size = 0
    
    1298
    -        self._sent = False
    
    1299
    -
    
    1300
    -    def add(self, digest, stream):
    
    1301
    -        assert not self._sent
    
    1302
    -
    
    1303
    -        new_batch_size = self._size + digest.size_bytes
    
    1304
    -        if new_batch_size > self._max_total_size_bytes:
    
    1305
    -            # Not enough space left in current batch
    
    1306
    -            return False
    
    1307
    -
    
    1308
    -        blob_request = self._request.requests.add()
    
    1309
    -        blob_request.digest.hash = digest.hash
    
    1310
    -        blob_request.digest.size_bytes = digest.size_bytes
    
    1311
    -        blob_request.data = stream.read(digest.size_bytes)
    
    1312
    -        self._size = new_batch_size
    
    1313
    -        return True
    
    1314
    -
    
    1315
    -    def send(self):
    
    1316
    -        assert not self._sent
    
    1317
    -        self._sent = True
    
    1318
    -
    
    1319
    -        if not self._request.requests:
    
    1320
    -            return
    
    1321
    -
    
    1322
    -        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    1323
    -
    
    1324
    -        for response in batch_response.responses:
    
    1325
    -            if response.status.code != code_pb2.OK:
    
    1326
    -                raise CASError("Failed to upload blob {}: {}".format(
    
    1327
    -                    response.digest.hash, response.status.code))
    
    1328
    -
    
    1329
    -
    
    1330 974
     def _grouper(iterable, n):
    
    1331 975
         while True:
    
    1332 976
             try:
    

  • buildstream/_cas/casremote.py
    1
    +from collections import namedtuple
    
    2
    +import io
    
    3
    +import os
    
    4
    +import multiprocessing
    
    5
    +import signal
    
    6
    +from urllib.parse import urlparse
    
    7
    +import uuid
    
    8
    +
    
    9
    +import grpc
    
    10
    +
    
    11
    +from .. import _yaml
    
    12
    +from .._protos.google.rpc import code_pb2
    
    13
    +from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    14
    +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    15
    +from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    16
    +
    
    17
    +from .._exceptions import CASRemoteError, LoadError, LoadErrorReason
    
    18
    +from .. import _signals
    
    19
    +from .. import utils
    
    20
    +
    
    21
    +# The default limit for gRPC messages is 4 MiB.
    
    22
    +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    23
    +_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    24
    +
    
    25
    +
    
    26
    +class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')):
    
    27
    +
    
    28
    +    # _new_from_config_node
    
    29
    +    #
    
    30
    +    # Creates an CASRemoteSpec() from a YAML loaded node
    
    31
    +    #
    
    32
    +    @staticmethod
    
    33
    +    def _new_from_config_node(spec_node, basedir=None):
    
    34
    +        _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance_name'])
    
    35
    +        url = _yaml.node_get(spec_node, str, 'url')
    
    36
    +        push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
    
    37
    +        if not url:
    
    38
    +            provenance = _yaml.node_get_provenance(spec_node, 'url')
    
    39
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    40
    +                            "{}: empty artifact cache URL".format(provenance))
    
    41
    +
    
    42
    +        instance_name = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
    
    43
    +
    
    44
    +        server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
    
    45
    +        if server_cert and basedir:
    
    46
    +            server_cert = os.path.join(basedir, server_cert)
    
    47
    +
    
    48
    +        client_key = _yaml.node_get(spec_node, str, 'client-key', default_value=None)
    
    49
    +        if client_key and basedir:
    
    50
    +            client_key = os.path.join(basedir, client_key)
    
    51
    +
    
    52
    +        client_cert = _yaml.node_get(spec_node, str, 'client-cert', default_value=None)
    
    53
    +        if client_cert and basedir:
    
    54
    +            client_cert = os.path.join(basedir, client_cert)
    
    55
    +
    
    56
    +        if client_key and not client_cert:
    
    57
    +            provenance = _yaml.node_get_provenance(spec_node, 'client-key')
    
    58
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    59
    +                            "{}: 'client-key' was specified without 'client-cert'".format(provenance))
    
    60
    +
    
    61
    +        if client_cert and not client_key:
    
    62
    +            provenance = _yaml.node_get_provenance(spec_node, 'client-cert')
    
    63
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    64
    +                            "{}: 'client-cert' was specified without 'client-key'".format(provenance))
    
    65
    +
    
    66
    +        return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name)
    
    67
    +
    
    68
    +
    
    69
    +CASRemoteSpec.__new__.__defaults__ = (None, None, None, None)
    
    70
    +
    
    71
    +
    
    72
    +class BlobNotFound(CASRemoteError):
    
    73
    +
    
    74
    +    def __init__(self, blob, msg):
    
    75
    +        self.blob = blob
    
    76
    +        super().__init__(msg)
    
    77
    +
    
    78
    +
    
    79
    +# Represents a single remote CAS cache.
    
    80
    +#
    
    81
    +class CASRemote():
    
    82
    +    def __init__(self, spec):
    
    83
    +        self.spec = spec
    
    84
    +        self._initialized = False
    
    85
    +        self.channel = None
    
    86
    +        self.bytestream = None
    
    87
    +        self.cas = None
    
    88
    +        self.ref_storage = None
    
    89
    +        self.batch_update_supported = None
    
    90
    +        self.batch_read_supported = None
    
    91
    +        self.capabilities = None
    
    92
    +        self.max_batch_total_size_bytes = None
    
    93
    +
    
    94
    +    def init(self):
    
    95
    +        if not self._initialized:
    
    96
    +            url = urlparse(self.spec.url)
    
    97
    +            if url.scheme == 'http':
    
    98
    +                port = url.port or 80
    
    99
    +                self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port))
    
    100
    +            elif url.scheme == 'https':
    
    101
    +                port = url.port or 443
    
    102
    +
    
    103
    +                if self.spec.server_cert:
    
    104
    +                    with open(self.spec.server_cert, 'rb') as f:
    
    105
    +                        server_cert_bytes = f.read()
    
    106
    +                else:
    
    107
    +                    server_cert_bytes = None
    
    108
    +
    
    109
    +                if self.spec.client_key:
    
    110
    +                    with open(self.spec.client_key, 'rb') as f:
    
    111
    +                        client_key_bytes = f.read()
    
    112
    +                else:
    
    113
    +                    client_key_bytes = None
    
    114
    +
    
    115
    +                if self.spec.client_cert:
    
    116
    +                    with open(self.spec.client_cert, 'rb') as f:
    
    117
    +                        client_cert_bytes = f.read()
    
    118
    +                else:
    
    119
    +                    client_cert_bytes = None
    
    120
    +
    
    121
    +                credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes,
    
    122
    +                                                           private_key=client_key_bytes,
    
    123
    +                                                           certificate_chain=client_cert_bytes)
    
    124
    +                self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
    
    125
    +            else:
    
    126
    +                raise CASRemoteError("Unsupported URL: {}".format(self.spec.url))
    
    127
    +
    
    128
    +            self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    129
    +            self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    130
    +            self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
    
    131
    +            self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
    
    132
    +
    
    133
    +            self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    134
    +            try:
    
    135
    +                request = remote_execution_pb2.GetCapabilitiesRequest()
    
    136
    +                response = self.capabilities.GetCapabilities(request)
    
    137
    +                server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
    
    138
    +                if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
    
    139
    +                    self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
    
    140
    +            except grpc.RpcError as e:
    
    141
    +                # Simply use the defaults for servers that don't implement GetCapabilities()
    
    142
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    143
    +                    raise
    
    144
    +
    
    145
    +            # Check whether the server supports BatchReadBlobs()
    
    146
    +            self.batch_read_supported = False
    
    147
    +            try:
    
    148
    +                request = remote_execution_pb2.BatchReadBlobsRequest()
    
    149
    +                response = self.cas.BatchReadBlobs(request)
    
    150
    +                self.batch_read_supported = True
    
    151
    +            except grpc.RpcError as e:
    
    152
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    153
    +                    raise
    
    154
    +
    
    155
    +            # Check whether the server supports BatchUpdateBlobs()
    
    156
    +            self.batch_update_supported = False
    
    157
    +            try:
    
    158
    +                request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    159
    +                response = self.cas.BatchUpdateBlobs(request)
    
    160
    +                self.batch_update_supported = True
    
    161
    +            except grpc.RpcError as e:
    
    162
    +                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    163
    +                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    164
    +                    raise
    
    165
    +
    
    166
    +            self._initialized = True
    
    167
    +
    
    168
    +    # check_remote
    
    169
    +    #
    
    170
    +    # Used when checking whether remote_specs work in the buildstream main
    
    171
    +    # thread, runs this in a seperate process to avoid creation of gRPC threads
    
    172
    +    # in the main BuildStream process
    
    173
    +    # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    174
    +    @classmethod
    
    175
    +    def check_remote(cls, remote_spec, q):
    
    176
    +
    
    177
    +        def __check_remote():
    
    178
    +            try:
    
    179
    +                remote = cls(remote_spec)
    
    180
    +                remote.init()
    
    181
    +
    
    182
    +                request = buildstream_pb2.StatusRequest()
    
    183
    +                response = remote.ref_storage.Status(request)
    
    184
    +
    
    185
    +                if remote_spec.push and not response.allow_updates:
    
    186
    +                    q.put('CAS server does not allow push')
    
    187
    +                else:
    
    188
    +                    # No error
    
    189
    +                    q.put(None)
    
    190
    +
    
    191
    +            except grpc.RpcError as e:
    
    192
    +                # str(e) is too verbose for errors reported to the user
    
    193
    +                q.put(e.details())
    
    194
    +
    
    195
    +            except Exception as e:               # pylint: disable=broad-except
    
    196
    +                # Whatever happens, we need to return it to the calling process
    
    197
    +                #
    
    198
    +                q.put(str(e))
    
    199
    +
    
    200
    +        p = multiprocessing.Process(target=__check_remote)
    
    201
    +
    
    202
    +        try:
    
    203
    +            # Keep SIGINT blocked in the child process
    
    204
    +            with _signals.blocked([signal.SIGINT], ignore=False):
    
    205
    +                p.start()
    
    206
    +
    
    207
    +            error = q.get()
    
    208
    +            p.join()
    
    209
    +        except KeyboardInterrupt:
    
    210
    +            utils._kill_process_tree(p.pid)
    
    211
    +            raise
    
    212
    +
    
    213
    +        return error
    
    214
    +
    
    215
    +    # verify_digest_on_remote():
    
    216
    +    #
    
    217
    +    # Check whether the object is already on the server in which case
    
    218
    +    # there is no need to upload it.
    
    219
    +    #
    
    220
    +    # Args:
    
    221
    +    #     digest (Digest): The object digest.
    
    222
    +    #
    
    223
    +    def verify_digest_on_remote(self, digest):
    
    224
    +        self.init()
    
    225
    +
    
    226
    +        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    227
    +        request.blob_digests.extend([digest])
    
    228
    +
    
    229
    +        response = self.cas.FindMissingBlobs(request)
    
    230
    +        if digest in response.missing_blob_digests:
    
    231
    +            return False
    
    232
    +
    
    233
    +        return True
    
    234
    +
    
    235
    +    # push_message():
    
    236
    +    #
    
    237
    +    # Push the given protobuf message to a remote.
    
    238
    +    #
    
    239
    +    # Args:
    
    240
    +    #     message (Message): A protobuf message to push.
    
    241
    +    #
    
    242
    +    # Raises:
    
    243
    +    #     (CASRemoteError): if there was an error
    
    244
    +    #
    
    245
    +    def push_message(self, message):
    
    246
    +
    
    247
    +        message_buffer = message.SerializeToString()
    
    248
    +        message_digest = utils._message_digest(message_buffer)
    
    249
    +
    
    250
    +        self.init()
    
    251
    +
    
    252
    +        with io.BytesIO(message_buffer) as b:
    
    253
    +            self._send_blob(message_digest, b)
    
    254
    +
    
    255
    +        return message_digest
    
    256
    +
    
    257
    +    ################################################
    
    258
    +    #             Local Private Methods            #
    
    259
    +    ################################################
    
    260
    +    def _fetch_blob(self, digest, stream):
    
    261
    +        resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    262
    +        request = bytestream_pb2.ReadRequest()
    
    263
    +        request.resource_name = resource_name
    
    264
    +        request.read_offset = 0
    
    265
    +        for response in self.bytestream.Read(request):
    
    266
    +            stream.write(response.data)
    
    267
    +        stream.flush()
    
    268
    +
    
    269
    +        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    270
    +
    
    271
    +    def _send_blob(self, digest, stream, u_uid=uuid.uuid4()):
    
    272
    +        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
    
    273
    +                                  digest.hash, str(digest.size_bytes)])
    
    274
    +
    
    275
    +        def request_stream(resname, instream):
    
    276
    +            offset = 0
    
    277
    +            finished = False
    
    278
    +            remaining = digest.size_bytes
    
    279
    +            while not finished:
    
    280
    +                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    281
    +                remaining -= chunk_size
    
    282
    +
    
    283
    +                request = bytestream_pb2.WriteRequest()
    
    284
    +                request.write_offset = offset
    
    285
    +                # max. _MAX_PAYLOAD_BYTES chunks
    
    286
    +                request.data = instream.read(chunk_size)
    
    287
    +                request.resource_name = resname
    
    288
    +                request.finish_write = remaining <= 0
    
    289
    +
    
    290
    +                yield request
    
    291
    +
    
    292
    +                offset += chunk_size
    
    293
    +                finished = request.finish_write
    
    294
    +
    
    295
    +        response = self.bytestream.Write(request_stream(resource_name, stream))
    
    296
    +
    
    297
    +        assert response.committed_size == digest.size_bytes
    
    298
    +
    
    299
    +
    
    300
    +# Represents a batch of blobs queued for fetching.
    
    301
    +#
    
    302
    +class _CASBatchRead():
    
    303
    +    def __init__(self, remote):
    
    304
    +        self._remote = remote
    
    305
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    306
    +        self._request = remote_execution_pb2.BatchReadBlobsRequest()
    
    307
    +        self._size = 0
    
    308
    +        self._sent = False
    
    309
    +
    
    310
    +    def add(self, digest):
    
    311
    +        assert not self._sent
    
    312
    +
    
    313
    +        new_batch_size = self._size + digest.size_bytes
    
    314
    +        if new_batch_size > self._max_total_size_bytes:
    
    315
    +            # Not enough space left in current batch
    
    316
    +            return False
    
    317
    +
    
    318
    +        request_digest = self._request.digests.add()
    
    319
    +        request_digest.hash = digest.hash
    
    320
    +        request_digest.size_bytes = digest.size_bytes
    
    321
    +        self._size = new_batch_size
    
    322
    +        return True
    
    323
    +
    
    324
    +    def send(self):
    
    325
    +        assert not self._sent
    
    326
    +        self._sent = True
    
    327
    +
    
    328
    +        if not self._request.digests:
    
    329
    +            return
    
    330
    +
    
    331
    +        batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    332
    +
    
    333
    +        for response in batch_response.responses:
    
    334
    +            if response.status.code == code_pb2.NOT_FOUND:
    
    335
    +                raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
    
    336
    +                    response.digest.hash, response.status.code))
    
    337
    +            if response.status.code != code_pb2.OK:
    
    338
    +                raise CASRemoteError("Failed to download blob {}: {}".format(
    
    339
    +                    response.digest.hash, response.status.code))
    
    340
    +            if response.digest.size_bytes != len(response.data):
    
    341
    +                raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    342
    +                    response.digest.hash, response.digest.size_bytes, len(response.data)))
    
    343
    +
    
    344
    +            yield (response.digest, response.data)
    
    345
    +
    
    346
    +
    
    347
    +# Represents a batch of blobs queued for upload.
    
    348
    +#
    
    349
    +class _CASBatchUpdate():
    
    350
    +    def __init__(self, remote):
    
    351
    +        self._remote = remote
    
    352
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    353
    +        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    354
    +        self._size = 0
    
    355
    +        self._sent = False
    
    356
    +
    
    357
    +    def add(self, digest, stream):
    
    358
    +        assert not self._sent
    
    359
    +
    
    360
    +        new_batch_size = self._size + digest.size_bytes
    
    361
    +        if new_batch_size > self._max_total_size_bytes:
    
    362
    +            # Not enough space left in current batch
    
    363
    +            return False
    
    364
    +
    
    365
    +        blob_request = self._request.requests.add()
    
    366
    +        blob_request.digest.hash = digest.hash
    
    367
    +        blob_request.digest.size_bytes = digest.size_bytes
    
    368
    +        blob_request.data = stream.read(digest.size_bytes)
    
    369
    +        self._size = new_batch_size
    
    370
    +        return True
    
    371
    +
    
    372
    +    def send(self):
    
    373
    +        assert not self._sent
    
    374
    +        self._sent = True
    
    375
    +
    
    376
    +        if not self._request.requests:
    
    377
    +            return
    
    378
    +
    
    379
    +        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    380
    +
    
    381
    +        for response in batch_response.responses:
    
    382
    +            if response.status.code != code_pb2.OK:
    
    383
    +                raise CASRemoteError("Failed to upload blob {}: {}".format(
    
    384
    +                    response.digest.hash, response.status.code))

  • buildstream/_artifactcache/casserver.pybuildstream/_cas/casserver.py

  • buildstream/_context.py
    ... ... @@ -31,7 +31,7 @@ from ._exceptions import LoadError, LoadErrorReason, BstError
    31 31
     from ._message import Message, MessageType
    
    32 32
     from ._profile import Topics, profile_start, profile_end
    
    33 33
     from ._artifactcache import ArtifactCache
    
    34
    -from ._artifactcache.cascache import CASCache
    
    34
    +from ._cas import CASCache
    
    35 35
     from ._workspaces import Workspaces, WorkspaceProjectCache, WORKSPACE_PROJECT_FILE
    
    36 36
     from .plugin import _plugin_lookup
    
    37 37
     from .sandbox import SandboxRemote
    
    ... ... @@ -317,11 +317,18 @@ class Context():
    317 317
         # invoked with as opposed to a junctioned subproject.
    
    318 318
         #
    
    319 319
         # Returns:
    
    320
    -    #    (list): The list of projects
    
    320
    +    #    (Project): The Project object
    
    321 321
         #
    
    322 322
         def get_toplevel_project(self):
    
    323 323
             return self._projects[0]
    
    324 324
     
    
    325
    +    # get_workspaces():
    
    326
    +    #
    
    327
    +    # Return a Workspaces object containing a list of workspaces.
    
    328
    +    #
    
    329
    +    # Returns:
    
    330
    +    #    (Workspaces): The Workspaces object
    
    331
    +    #
    
    325 332
         def get_workspaces(self):
    
    326 333
             return self._workspaces
    
    327 334
     
    

  • buildstream/_exceptions.py
    ... ... @@ -284,6 +284,21 @@ class CASError(BstError):
    284 284
             super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True)
    
    285 285
     
    
    286 286
     
    
    287
    +# CASRemoteError
    
    288
    +#
    
    289
    +# Raised when errors are encountered in the remote CAS
    
    290
    +class CASRemoteError(CASError):
    
    291
    +    pass
    
    292
    +
    
    293
    +
    
    294
    +# CASCacheError
    
    295
    +#
    
    296
    +# Raised when errors are encountered in the local CASCacheError
    
    297
    +#
    
    298
    +class CASCacheError(CASError):
    
    299
    +    pass
    
    300
    +
    
    301
    +
    
    287 302
     # PipelineError
    
    288 303
     #
    
    289 304
     # Raised from pipeline operations
    

  • buildstream/_gitsourcebase.py
    ... ... @@ -296,18 +296,24 @@ class GitMirror(SourceFetcher):
    296 296
                 shallow = set()
    
    297 297
                 for _, commit_ref, _ in self.tags:
    
    298 298
     
    
    299
    -                _, out = self.source.check_output([self.source.host_git, 'rev-list',
    
    300
    -                                                   '--boundary', '{}..{}'.format(commit_ref, self.ref)],
    
    301
    -                                                  fail="Failed to get git history {}..{} in directory: {}"
    
    302
    -                                                  .format(commit_ref, self.ref, fullpath),
    
    303
    -                                                  fail_temporarily=True,
    
    304
    -                                                  cwd=self.mirror)
    
    305
    -                for line in out.splitlines():
    
    306
    -                    rev = line.lstrip('-')
    
    307
    -                    if line[0] == '-':
    
    308
    -                        shallow.add(rev)
    
    309
    -                    else:
    
    310
    -                        included.add(rev)
    
    299
    +                if commit_ref == self.ref:
    
    300
    +                    # rev-list does not work in case of same rev
    
    301
    +                    shallow.add(self.ref)
    
    302
    +                else:
    
    303
    +                    _, out = self.source.check_output([self.source.host_git, 'rev-list',
    
    304
    +                                                       '--ancestry-path', '--boundary',
    
    305
    +                                                       '{}..{}'.format(commit_ref, self.ref)],
    
    306
    +                                                      fail="Failed to get git history {}..{} in directory: {}"
    
    307
    +                                                      .format(commit_ref, self.ref, fullpath),
    
    308
    +                                                      fail_temporarily=True,
    
    309
    +                                                      cwd=self.mirror)
    
    310
    +                    self.source.warn("refs {}..{}: {}".format(commit_ref, self.ref, out.splitlines()))
    
    311
    +                    for line in out.splitlines():
    
    312
    +                        rev = line.lstrip('-')
    
    313
    +                        if line[0] == '-':
    
    314
    +                            shallow.add(rev)
    
    315
    +                        else:
    
    316
    +                            included.add(rev)
    
    311 317
     
    
    312 318
                 shallow -= included
    
    313 319
                 included |= shallow
    

  • buildstream/_scheduler/jobs/cachesizejob.py
    ... ... @@ -34,8 +34,8 @@ class CacheSizeJob(Job):
    34 34
             if status == JobStatus.OK:
    
    35 35
                 self._artifacts.set_cache_size(result)
    
    36 36
     
    
    37
    -            if self._complete_cb:
    
    38
    -                self._complete_cb(result)
    
    37
    +        if self._complete_cb:
    
    38
    +            self._complete_cb(status, result)
    
    39 39
     
    
    40 40
         def child_process_data(self):
    
    41 41
             return {}

  • buildstream/_scheduler/jobs/cleanupjob.py
    ... ... @@ -20,8 +20,9 @@ from .job import Job, JobStatus
    20 20
     
    
    21 21
     
    
    22 22
     class CleanupJob(Job):
    
    23
    -    def __init__(self, *args, **kwargs):
    
    23
    +    def __init__(self, *args, complete_cb, **kwargs):
    
    24 24
             super().__init__(*args, **kwargs)
    
    25
    +        self._complete_cb = complete_cb
    
    25 26
     
    
    26 27
             context = self._scheduler.context
    
    27 28
             self._artifacts = context.artifactcache
    
    ... ... @@ -32,3 +33,6 @@ class CleanupJob(Job):
    32 33
         def parent_complete(self, status, result):
    
    33 34
             if status == JobStatus.OK:
    
    34 35
                 self._artifacts.set_cache_size(result)
    
    36
    +
    
    37
    +        if self._complete_cb:
    
    38
    +            self._complete_cb(status, result)

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -85,28 +85,11 @@ class Process(multiprocessing.Process):
    85 85
     #    action_name (str): The queue action name
    
    86 86
     #    logfile (str): A template string that points to the logfile
    
    87 87
     #                   that should be used - should contain {pid}.
    
    88
    -#    resources (iter(ResourceType)) - A set of resources this job
    
    89
    -#                                     wants to use.
    
    90
    -#    exclusive_resources (iter(ResourceType)) - A set of resources
    
    91
    -#                                               this job wants to use
    
    92
    -#                                               exclusively.
    
    93 88
     #    max_retries (int): The maximum number of retries
    
    94 89
     #
    
    95 90
     class Job():
    
    96 91
     
    
    97
    -    def __init__(self, scheduler, action_name, logfile, *,
    
    98
    -                 resources=None, exclusive_resources=None, max_retries=0):
    
    99
    -
    
    100
    -        if resources is None:
    
    101
    -            resources = set()
    
    102
    -        else:
    
    103
    -            resources = set(resources)
    
    104
    -        if exclusive_resources is None:
    
    105
    -            exclusive_resources = set()
    
    106
    -        else:
    
    107
    -            exclusive_resources = set(resources)
    
    108
    -
    
    109
    -        assert exclusive_resources <= resources, "All exclusive resources must also be resources!"
    
    92
    +    def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
    
    110 93
     
    
    111 94
             #
    
    112 95
             # Public members
    
    ... ... @@ -114,12 +97,6 @@ class Job():
    114 97
             self.action_name = action_name   # The action name for the Queue
    
    115 98
             self.child_data = None           # Data to be sent to the main process
    
    116 99
     
    
    117
    -        # The resources this job wants to access
    
    118
    -        self.resources = resources
    
    119
    -        # Resources this job needs to access exclusively, i.e., no
    
    120
    -        # other job should be allowed to access them
    
    121
    -        self.exclusive_resources = exclusive_resources
    
    122
    -
    
    123 100
             #
    
    124 101
             # Private members
    
    125 102
             #
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -57,11 +57,10 @@ class BuildQueue(Queue):
    57 57
                               logfile=logfile)
    
    58 58
                 job = ElementJob(self._scheduler, self.action_name,
    
    59 59
                                  logfile, element=element, queue=self,
    
    60
    -                             resources=self.resources,
    
    61 60
                                  action_cb=self.process,
    
    62 61
                                  complete_cb=self._job_done,
    
    63 62
                                  max_retries=self._max_retries)
    
    64
    -            self._done_queue.append(job)
    
    63
    +            self._done_queue.append(element)
    
    65 64
                 self.failed_elements.append(element)
    
    66 65
                 self._scheduler._job_complete_callback(job, False)
    
    67 66
     
    

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -72,8 +72,9 @@ class Queue():
    72 72
             # Private members
    
    73 73
             #
    
    74 74
             self._scheduler = scheduler
    
    75
    -        self._wait_queue = deque()
    
    76
    -        self._done_queue = deque()
    
    75
    +        self._resources = scheduler.resources  # Shared resource pool
    
    76
    +        self._wait_queue = deque()             # Ready / Waiting elements
    
    77
    +        self._done_queue = deque()             # Processed / Skipped elements
    
    77 78
             self._max_retries = 0
    
    78 79
     
    
    79 80
             # Assert the subclass has setup class data
    
    ... ... @@ -115,16 +116,6 @@ class Queue():
    115 116
         def status(self, element):
    
    116 117
             return QueueStatus.READY
    
    117 118
     
    
    118
    -    # prepare()
    
    119
    -    #
    
    120
    -    # Abstract method for handling job preparation in the main process.
    
    121
    -    #
    
    122
    -    # Args:
    
    123
    -    #    element (Element): The element which is scheduled
    
    124
    -    #
    
    125
    -    def prepare(self, element):
    
    126
    -        pass
    
    127
    -
    
    128 119
         # done()
    
    129 120
         #
    
    130 121
         # Abstract method for handling a successful job completion.
    
    ... ... @@ -153,26 +144,18 @@ class Queue():
    153 144
             if not elts:
    
    154 145
                 return
    
    155 146
     
    
    156
    -        # Note: The internal lists work with jobs. This is not
    
    157
    -        #       reflected in any external methods (except
    
    158
    -        #       pop/peek_ready_jobs).
    
    159
    -        def create_job(element):
    
    160
    -            logfile = self._element_log_path(element)
    
    161
    -            return ElementJob(self._scheduler, self.action_name,
    
    162
    -                              logfile, element=element, queue=self,
    
    163
    -                              resources=self.resources,
    
    164
    -                              action_cb=self.process,
    
    165
    -                              complete_cb=self._job_done,
    
    166
    -                              max_retries=self._max_retries)
    
    167
    -
    
    168
    -        # Place skipped elements directly on the done queue
    
    169
    -        jobs = [create_job(elt) for elt in elts]
    
    170
    -        skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
    
    171
    -        wait = [job for job in jobs if job not in skip]
    
    172
    -
    
    173
    -        self.skipped_elements.extend([job.element for job in skip])
    
    174
    -        self._wait_queue.extend(wait)
    
    175
    -        self._done_queue.extend(skip)
    
    147
    +        # Place skipped elements on the done queue right away.
    
    148
    +        #
    
    149
    +        # The remaining ready and waiting elements must remain in the
    
    150
    +        # same queue, and ready status must be determined at the moment
    
    151
    +        # which the scheduler is asking for the next job.
    
    152
    +        #
    
    153
    +        skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
    
    154
    +        wait = [elt for elt in elts if elt not in skip]
    
    155
    +
    
    156
    +        self.skipped_elements.extend(skip)  # Public record of skipped elements
    
    157
    +        self._done_queue.extend(skip)       # Elements to be processed
    
    158
    +        self._wait_queue.extend(wait)       # Elements eligible to be dequeued
    
    176 159
     
    
    177 160
         # dequeue()
    
    178 161
         #
    
    ... ... @@ -184,69 +167,59 @@ class Queue():
    184 167
         #
    
    185 168
         def dequeue(self):
    
    186 169
             while self._done_queue:
    
    187
    -            yield self._done_queue.popleft().element
    
    170
    +            yield self._done_queue.popleft()
    
    188 171
     
    
    189 172
         # dequeue_ready()
    
    190 173
         #
    
    191
    -    # Reports whether there are any elements to dequeue
    
    174
    +    # Reports whether any elements can be promoted to other queues
    
    192 175
         #
    
    193 176
         # Returns:
    
    194
    -    #    (bool): Whether there are elements to dequeue
    
    177
    +    #    (bool): Whether there are elements ready
    
    195 178
         #
    
    196 179
         def dequeue_ready(self):
    
    197 180
             return any(self._done_queue)
    
    198 181
     
    
    199
    -    # pop_ready_jobs()
    
    200
    -    #
    
    201
    -    # Returns:
    
    202
    -    #     ([Job]): A list of jobs to run
    
    182
    +    # harvest_jobs()
    
    203 183
         #
    
    204 184
         # Process elements in the queue, moving elements which were enqueued
    
    205
    -    # into the dequeue pool, and processing them if necessary.
    
    206
    -    #
    
    207
    -    # This will have different results for elements depending
    
    208
    -    # on the Queue.status() implementation.
    
    209
    -    #
    
    210
    -    #   o Elements which are QueueStatus.WAIT will not be affected
    
    185
    +    # into the dequeue pool, and creating as many jobs for which resources
    
    186
    +    # can be reserved.
    
    211 187
         #
    
    212
    -    #   o Elements which are QueueStatus.SKIP will move directly
    
    213
    -    #     to the dequeue pool
    
    214
    -    #
    
    215
    -    #   o For Elements which are QueueStatus.READY a Job will be
    
    216
    -    #     created and returned to the caller, given that the scheduler
    
    217
    -    #     allows the Queue enough resources for the given job
    
    188
    +    # Returns:
    
    189
    +    #     ([Job]): A list of jobs which can be run now
    
    218 190
         #
    
    219
    -    def pop_ready_jobs(self):
    
    191
    +    def harvest_jobs(self):
    
    220 192
             unready = []
    
    221 193
             ready = []
    
    222 194
     
    
    223 195
             while self._wait_queue:
    
    224
    -            job = self._wait_queue.popleft()
    
    225
    -            element = job.element
    
    196
    +            if not self._resources.reserve(self.resources, peek=True):
    
    197
    +                break
    
    226 198
     
    
    199
    +            element = self._wait_queue.popleft()
    
    227 200
                 status = self.status(element)
    
    201
    +
    
    228 202
                 if status == QueueStatus.WAIT:
    
    229
    -                unready.append(job)
    
    230
    -                continue
    
    203
    +                unready.append(element)
    
    231 204
                 elif status == QueueStatus.SKIP:
    
    232
    -                self._done_queue.append(job)
    
    205
    +                self._done_queue.append(element)
    
    233 206
                     self.skipped_elements.append(element)
    
    234
    -                continue
    
    235
    -
    
    236
    -            self.prepare(element)
    
    237
    -            ready.append(job)
    
    207
    +            else:
    
    208
    +                reserved = self._resources.reserve(self.resources)
    
    209
    +                assert reserved
    
    210
    +                ready.append(element)
    
    238 211
     
    
    239
    -        # These were not ready but were in the beginning, give em
    
    240
    -        # first priority again next time around
    
    241 212
             self._wait_queue.extendleft(unready)
    
    242 213
     
    
    243
    -        return ready
    
    244
    -
    
    245
    -    def peek_ready_jobs(self):
    
    246
    -        def ready(job):
    
    247
    -            return self.status(job.element) == QueueStatus.READY
    
    248
    -
    
    249
    -        yield from (job for job in self._wait_queue if ready(job))
    
    214
    +        return [
    
    215
    +            ElementJob(self._scheduler, self.action_name,
    
    216
    +                       self._element_log_path(element),
    
    217
    +                       element=element, queue=self,
    
    218
    +                       action_cb=self.process,
    
    219
    +                       complete_cb=self._job_done,
    
    220
    +                       max_retries=self._max_retries)
    
    221
    +            for element in ready
    
    222
    +        ]
    
    250 223
     
    
    251 224
         #####################################################
    
    252 225
         #                 Private Methods                   #
    
    ... ... @@ -292,6 +265,10 @@ class Queue():
    292 265
         #
    
    293 266
         def _job_done(self, job, element, status, result):
    
    294 267
     
    
    268
    +        # Now release the resources we reserved
    
    269
    +        #
    
    270
    +        self._resources.release(self.resources)
    
    271
    +
    
    295 272
             # Update values that need to be synchronized in the main task
    
    296 273
             # before calling any queue implementation
    
    297 274
             self._update_workspaces(element, job)
    
    ... ... @@ -324,12 +301,8 @@ class Queue():
    324 301
                               detail=traceback.format_exc())
    
    325 302
                 self.failed_elements.append(element)
    
    326 303
             else:
    
    327
    -            #
    
    328
    -            # No exception occured in post processing
    
    329
    -            #
    
    330
    -
    
    331
    -            # All jobs get placed on the done queue for later processing.
    
    332
    -            self._done_queue.append(job)
    
    304
    +            # All elements get placed on the done queue for later processing.
    
    305
    +            self._done_queue.append(element)
    
    333 306
     
    
    334 307
                 # These lists are for bookkeeping purposes for the UI and logging.
    
    335 308
                 if status == JobStatus.SKIPPED:
    

  • buildstream/_scheduler/resources.py
    ... ... @@ -34,28 +34,25 @@ class Resources():
    34 34
                 ResourceType.UPLOAD: set()
    
    35 35
             }
    
    36 36
     
    
    37
    -    def clear_job_resources(self, job):
    
    38
    -        for resource in job.exclusive_resources:
    
    39
    -            self._exclusive_resources[resource].remove(hash(job))
    
    37
    +    # reserve()
    
    38
    +    #
    
    39
    +    # Reserves a set of resources
    
    40
    +    #
    
    41
    +    # Args:
    
    42
    +    #    resources (set): A set of ResourceTypes
    
    43
    +    #    exclusive (set): Another set of ResourceTypes
    
    44
    +    #    peek (bool): Whether to only peek at whether the resource is available
    
    45
    +    #
    
    46
    +    # Returns:
    
    47
    +    #    (bool): True if the resources could be reserved
    
    48
    +    #
    
    49
    +    def reserve(self, resources, exclusive=None, *, peek=False):
    
    50
    +        if exclusive is None:
    
    51
    +            exclusive = set()
    
    40 52
     
    
    41
    -        for resource in job.resources:
    
    42
    -            self._used_resources[resource] -= 1
    
    43
    -
    
    44
    -    def reserve_exclusive_resources(self, job):
    
    45
    -        exclusive = job.exclusive_resources
    
    46
    -
    
    47
    -        # The very first thing we do is to register any exclusive
    
    48
    -        # resources this job may want. Even if the job is not yet
    
    49
    -        # allowed to run (because another job is holding the resource
    
    50
    -        # it wants), we can still set this - it just means that any
    
    51
    -        # job *currently* using these resources has to finish first,
    
    52
    -        # and no new jobs wanting these can be launched (except other
    
    53
    -        # exclusive-access jobs).
    
    54
    -        #
    
    55
    -        for resource in exclusive:
    
    56
    -            self._exclusive_resources[resource].add(hash(job))
    
    53
    +        resources = set(resources)
    
    54
    +        exclusive = set(exclusive)
    
    57 55
     
    
    58
    -    def reserve_job_resources(self, job):
    
    59 56
             # First, we check if the job wants to access a resource that
    
    60 57
             # another job wants exclusive access to. If so, it cannot be
    
    61 58
             # scheduled.
    
    ... ... @@ -68,7 +65,8 @@ class Resources():
    68 65
             #        is currently not possible, but may be worth thinking
    
    69 66
             #        about.
    
    70 67
             #
    
    71
    -        for resource in job.resources - job.exclusive_resources:
    
    68
    +        for resource in resources - exclusive:
    
    69
    +
    
    72 70
                 # If our job wants this resource exclusively, we never
    
    73 71
                 # check this, so we can get away with not (temporarily)
    
    74 72
                 # removing it from the set.
    
    ... ... @@ -84,14 +82,14 @@ class Resources():
    84 82
             # at a time, despite being allowed to be part of the exclusive
    
    85 83
             # set.
    
    86 84
             #
    
    87
    -        for exclusive in job.exclusive_resources:
    
    88
    -            if self._used_resources[exclusive] != 0:
    
    85
    +        for resource in exclusive:
    
    86
    +            if self._used_resources[resource] != 0:
    
    89 87
                     return False
    
    90 88
     
    
    91 89
             # Finally, we check if we have enough of each resource
    
    92 90
             # available. If we don't have enough, the job cannot be
    
    93 91
             # scheduled.
    
    94
    -        for resource in job.resources:
    
    92
    +        for resource in resources:
    
    95 93
                 if (self._max_resources[resource] > 0 and
    
    96 94
                         self._used_resources[resource] >= self._max_resources[resource]):
    
    97 95
                     return False
    
    ... ... @@ -99,7 +97,70 @@ class Resources():
    99 97
             # Now we register the fact that our job is using the resources
    
    100 98
             # it asked for, and tell the scheduler that it is allowed to
    
    101 99
             # continue.
    
    102
    -        for resource in job.resources:
    
    103
    -            self._used_resources[resource] += 1
    
    100
    +        if not peek:
    
    101
    +            for resource in resources:
    
    102
    +                self._used_resources[resource] += 1
    
    104 103
     
    
    105 104
             return True
    
    105
    +
    
    106
    +    # release()
    
    107
    +    #
    
    108
    +    # Release resources previously reserved with Resources.reserve()
    
    109
    +    #
    
    110
    +    # Args:
    
    111
    +    #    resources (set): A set of resources to release
    
    112
    +    #
    
    113
    +    def release(self, resources):
    
    114
    +        for resource in resources:
    
    115
    +            assert self._used_resources[resource] > 0, "Scheduler resource imbalance"
    
    116
    +            self._used_resources[resource] -= 1
    
    117
    +
    
    118
    +    # register_exclusive_interest()
    
    119
    +    #
    
    120
    +    # Inform the resources pool that `source` has an interest in
    
    121
    +    # reserving this resource exclusively.
    
    122
    +    #
    
    123
    +    # The source parameter is used to identify the caller, it
    
    124
    +    # must be ensured to be unique for the time that the
    
    125
    +    # interest is registered.
    
    126
    +    #
    
    127
    +    # This function may be called multiple times, and subsequent
    
    128
    +    # calls will simply have no effect until clear_exclusive_interest()
    
    129
    +    # is used to clear the interest.
    
    130
    +    #
    
    131
    +    # This must be called in advance of reserve()
    
    132
    +    #
    
    133
    +    # Args:
    
    134
    +    #    resources (set): Set of resources to reserve exclusively
    
    135
    +    #    source (any): Source identifier, to be used again when unregistering
    
    136
    +    #                  the interest.
    
    137
    +    #
    
    138
    +    def register_exclusive_interest(self, resources, source):
    
    139
    +
    
    140
    +        # The very first thing we do is to register any exclusive
    
    141
    +        # resources this job may want. Even if the job is not yet
    
    142
    +        # allowed to run (because another job is holding the resource
    
    143
    +        # it wants), we can still set this - it just means that any
    
    144
    +        # job *currently* using these resources has to finish first,
    
    145
    +        # and no new jobs wanting these can be launched (except other
    
    146
    +        # exclusive-access jobs).
    
    147
    +        #
    
    148
    +        for resource in resources:
    
    149
    +            self._exclusive_resources[resource].add(source)
    
    150
    +
    
    151
    +    # unregister_exclusive_interest()
    
    152
    +    #
    
    153
    +    # Clear the exclusive interest in these resources.
    
    154
    +    #
    
    155
    +    # This should be called by the given source which registered
    
    156
    +    # an exclusive interest.
    
    157
    +    #
    
    158
    +    # Args:
    
    159
    +    #    resources (set): Set of resources to reserve exclusively
    
    160
    +    #    source (str): Source identifier, to be used again when unregistering
    
    161
    +    #                  the interest.
    
    162
    +    #
    
    163
    +    def unregister_exclusive_interest(self, resources, source):
    
    164
    +
    
    165
    +        for resource in resources:
    
    166
    +            self._exclusive_resources[resource].remove(source)

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -28,7 +28,7 @@ from contextlib import contextmanager
    28 28
     
    
    29 29
     # Local imports
    
    30 30
     from .resources import Resources, ResourceType
    
    31
    -from .jobs import CacheSizeJob, CleanupJob
    
    31
    +from .jobs import JobStatus, CacheSizeJob, CleanupJob
    
    32 32
     
    
    33 33
     
    
    34 34
     # A decent return code for Scheduler.run()
    
    ... ... @@ -38,14 +38,10 @@ class SchedStatus():
    38 38
         TERMINATED = 1
    
    39 39
     
    
    40 40
     
    
    41
    -# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
    
    42
    -# which we launch dynamically, they have the property of being
    
    43
    -# meaningless to queue if one is already queued, and it also
    
    44
    -# doesnt make sense to run them in parallel
    
    41
    +# Some action names for the internal jobs we launch
    
    45 42
     #
    
    46 43
     _ACTION_NAME_CLEANUP = 'cleanup'
    
    47 44
     _ACTION_NAME_CACHE_SIZE = 'cache_size'
    
    48
    -_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
    
    49 45
     
    
    50 46
     
    
    51 47
     # Scheduler()
    
    ... ... @@ -81,8 +77,6 @@ class Scheduler():
    81 77
             #
    
    82 78
             # Public members
    
    83 79
             #
    
    84
    -        self.active_jobs = []       # Jobs currently being run in the scheduler
    
    85
    -        self.waiting_jobs = []      # Jobs waiting for resources
    
    86 80
             self.queues = None          # Exposed for the frontend to print summaries
    
    87 81
             self.context = context      # The Context object shared with Queues
    
    88 82
             self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
    
    ... ... @@ -95,15 +89,23 @@ class Scheduler():
    95 89
             #
    
    96 90
             # Private members
    
    97 91
             #
    
    92
    +        self._active_jobs = []                # Jobs currently being run in the scheduler
    
    93
    +        self._starttime = start_time          # Initial application start time
    
    94
    +        self._suspendtime = None              # Session time compensation for suspended state
    
    95
    +        self._queue_jobs = True               # Whether we should continue to queue jobs
    
    96
    +
    
    97
    +        # State of cache management related jobs
    
    98
    +        self._cache_size_scheduled = False    # Whether we have a cache size job scheduled
    
    99
    +        self._cache_size_running = None       # A running CacheSizeJob, or None
    
    100
    +        self._cleanup_scheduled = False       # Whether we have a cleanup job scheduled
    
    101
    +        self._cleanup_running = None          # A running CleanupJob, or None
    
    102
    +
    
    103
    +        # Callbacks to report back to the Scheduler owner
    
    98 104
             self._interrupt_callback = interrupt_callback
    
    99 105
             self._ticker_callback = ticker_callback
    
    100 106
             self._job_start_callback = job_start_callback
    
    101 107
             self._job_complete_callback = job_complete_callback
    
    102 108
     
    
    103
    -        self._starttime = start_time
    
    104
    -        self._suspendtime = None
    
    105
    -        self._queue_jobs = True      # Whether we should continue to queue jobs
    
    106
    -
    
    107 109
             # Whether our exclusive jobs, like 'cleanup' are currently already
    
    108 110
             # waiting or active.
    
    109 111
             #
    
    ... ... @@ -113,9 +115,9 @@ class Scheduler():
    113 115
             self._exclusive_waiting = set()
    
    114 116
             self._exclusive_active = set()
    
    115 117
     
    
    116
    -        self._resources = Resources(context.sched_builders,
    
    117
    -                                    context.sched_fetchers,
    
    118
    -                                    context.sched_pushers)
    
    118
    +        self.resources = Resources(context.sched_builders,
    
    119
    +                                   context.sched_fetchers,
    
    120
    +                                   context.sched_pushers)
    
    119 121
     
    
    120 122
         # run()
    
    121 123
         #
    
    ... ... @@ -150,7 +152,7 @@ class Scheduler():
    150 152
             self._connect_signals()
    
    151 153
     
    
    152 154
             # Run the queues
    
    153
    -        self._schedule_queue_jobs()
    
    155
    +        self._sched()
    
    154 156
             self.loop.run_forever()
    
    155 157
             self.loop.close()
    
    156 158
     
    
    ... ... @@ -240,12 +242,14 @@ class Scheduler():
    240 242
         #    status (JobStatus): The status of the completed job
    
    241 243
         #
    
    242 244
         def job_completed(self, job, status):
    
    243
    -        self._resources.clear_job_resources(job)
    
    244
    -        self.active_jobs.remove(job)
    
    245
    -        if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    246
    -            self._exclusive_active.remove(job.action_name)
    
    245
    +
    
    246
    +        # Remove from the active jobs list
    
    247
    +        self._active_jobs.remove(job)
    
    248
    +
    
    249
    +        # Scheduler owner facing callback
    
    247 250
             self._job_complete_callback(job, status)
    
    248
    -        self._schedule_queue_jobs()
    
    251
    +
    
    252
    +        # Now check for more jobs
    
    249 253
             self._sched()
    
    250 254
     
    
    251 255
         # check_cache_size():
    
    ... ... @@ -255,78 +259,104 @@ class Scheduler():
    255 259
         # if needed.
    
    256 260
         #
    
    257 261
         def check_cache_size(self):
    
    258
    -        job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
    
    259
    -                           'cache_size/cache_size',
    
    260
    -                           resources=[ResourceType.CACHE,
    
    261
    -                                      ResourceType.PROCESS],
    
    262
    -                           complete_cb=self._run_cleanup)
    
    263
    -        self._schedule_jobs([job])
    
    262
    +
    
    263
    +        # Here we assume we are called in response to a job
    
    264
    +        # completion callback, or before entering the scheduler.
    
    265
    +        #
    
    266
    +        # As such there is no need to call `_sched()` from here,
    
    267
    +        # and we prefer to run it once at the last moment.
    
    268
    +        #
    
    269
    +        self._cache_size_scheduled = True
    
    264 270
     
    
    265 271
         #######################################################
    
    266 272
         #                  Local Private Methods              #
    
    267 273
         #######################################################
    
    268 274
     
    
    269
    -    # _sched()
    
    275
    +    # _spawn_job()
    
    270 276
         #
    
    271
    -    # The main driving function of the scheduler, it will be called
    
    272
    -    # automatically when Scheduler.run() is called initially,
    
    277
    +    # Spanws a job
    
    273 278
         #
    
    274
    -    def _sched(self):
    
    275
    -        for job in self.waiting_jobs:
    
    276
    -            self._resources.reserve_exclusive_resources(job)
    
    279
    +    # Args:
    
    280
    +    #    job (Job): The job to spawn
    
    281
    +    #
    
    282
    +    def _spawn_job(self, job):
    
    283
    +        job.spawn()
    
    284
    +        self._active_jobs.append(job)
    
    285
    +        if self._job_start_callback:
    
    286
    +            self._job_start_callback(job)
    
    277 287
     
    
    278
    -        for job in self.waiting_jobs:
    
    279
    -            if not self._resources.reserve_job_resources(job):
    
    280
    -                continue
    
    288
    +    # Callback for the cache size job
    
    289
    +    def _cache_size_job_complete(self, status, cache_size):
    
    281 290
     
    
    282
    -            # Postpone these jobs if one is already running
    
    283
    -            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
    
    284
    -               job.action_name in self._exclusive_active:
    
    285
    -                continue
    
    291
    +        # Deallocate cache size job resources
    
    292
    +        self._cache_size_running = None
    
    293
    +        self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
    
    286 294
     
    
    287
    -            job.spawn()
    
    288
    -            self.waiting_jobs.remove(job)
    
    289
    -            self.active_jobs.append(job)
    
    295
    +        # Schedule a cleanup job if we've hit the threshold
    
    296
    +        if status != JobStatus.OK:
    
    297
    +            return
    
    290 298
     
    
    291
    -            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    292
    -                self._exclusive_waiting.remove(job.action_name)
    
    293
    -                self._exclusive_active.add(job.action_name)
    
    299
    +        context = self.context
    
    300
    +        artifacts = context.artifactcache
    
    294 301
     
    
    295
    -            if self._job_start_callback:
    
    296
    -                self._job_start_callback(job)
    
    302
    +        if artifacts.has_quota_exceeded():
    
    303
    +            self._cleanup_scheduled = True
    
    297 304
     
    
    298
    -        # If nothings ticking, time to bail out
    
    299
    -        if not self.active_jobs and not self.waiting_jobs:
    
    300
    -            self.loop.stop()
    
    305
    +    # Callback for the cleanup job
    
    306
    +    def _cleanup_job_complete(self, status, cache_size):
    
    301 307
     
    
    302
    -    # _schedule_jobs()
    
    303
    -    #
    
    304
    -    # The main entry point for jobs to be scheduled.
    
    305
    -    #
    
    306
    -    # This is called either as a result of scanning the queues
    
    307
    -    # in _schedule_queue_jobs(), or directly by the Scheduler
    
    308
    -    # to insert special jobs like cleanups.
    
    308
    +        # Deallocate cleanup job resources
    
    309
    +        self._cleanup_running = None
    
    310
    +        self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
    
    311
    +
    
    312
    +        # Unregister the exclusive interest when we're done with it
    
    313
    +        if not self._cleanup_scheduled:
    
    314
    +            self.resources.unregister_exclusive_interest(
    
    315
    +                [ResourceType.CACHE], 'cache-cleanup'
    
    316
    +            )
    
    317
    +
    
    318
    +    # _sched_cleanup_job()
    
    309 319
         #
    
    310
    -    # Args:
    
    311
    -    #     jobs ([Job]): A list of jobs to schedule
    
    320
    +    # Runs a cleanup job if one is scheduled to run now and
    
    321
    +    # sufficient recources are available.
    
    312 322
         #
    
    313
    -    def _schedule_jobs(self, jobs):
    
    314
    -        for job in jobs:
    
    323
    +    def _sched_cleanup_job(self):
    
    315 324
     
    
    316
    -            # Special treatment of our redundant exclusive jobs
    
    317
    -            #
    
    318
    -            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    325
    +        if self._cleanup_scheduled and self._cleanup_running is None:
    
    326
    +
    
    327
    +            # Ensure we have an exclusive interest in the resources
    
    328
    +            self.resources.register_exclusive_interest(
    
    329
    +                [ResourceType.CACHE], 'cache-cleanup'
    
    330
    +            )
    
    331
    +
    
    332
    +            if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
    
    333
    +                                      [ResourceType.CACHE]):
    
    319 334
     
    
    320
    -                # Drop the job if one is already queued
    
    321
    -                if job.action_name in self._exclusive_waiting:
    
    322
    -                    continue
    
    335
    +                # Update state and launch
    
    336
    +                self._cleanup_scheduled = False
    
    337
    +                self._cleanup_running = \
    
    338
    +                    CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
    
    339
    +                               complete_cb=self._cleanup_job_complete)
    
    340
    +                self._spawn_job(self._cleanup_running)
    
    323 341
     
    
    324
    -                # Mark this action type as queued
    
    325
    -                self._exclusive_waiting.add(job.action_name)
    
    342
    +    # _sched_cache_size_job()
    
    343
    +    #
    
    344
    +    # Runs a cache size job if one is scheduled to run now and
    
    345
    +    # sufficient recources are available.
    
    346
    +    #
    
    347
    +    def _sched_cache_size_job(self):
    
    348
    +
    
    349
    +        if self._cache_size_scheduled and not self._cache_size_running:
    
    326 350
     
    
    327
    -            self.waiting_jobs.append(job)
    
    351
    +            if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
    
    352
    +                self._cache_size_scheduled = False
    
    353
    +                self._cache_size_running = \
    
    354
    +                    CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
    
    355
    +                                 'cache_size/cache_size',
    
    356
    +                                 complete_cb=self._cache_size_job_complete)
    
    357
    +                self._spawn_job(self._cache_size_running)
    
    328 358
     
    
    329
    -    # _schedule_queue_jobs()
    
    359
    +    # _sched_queue_jobs()
    
    330 360
         #
    
    331 361
         # Ask the queues what jobs they want to schedule and schedule
    
    332 362
         # them. This is done here so we can ask for new jobs when jobs
    
    ... ... @@ -335,7 +365,7 @@ class Scheduler():
    335 365
         # This will process the Queues, pull elements through the Queues
    
    336 366
         # and process anything that is ready.
    
    337 367
         #
    
    338
    -    def _schedule_queue_jobs(self):
    
    368
    +    def _sched_queue_jobs(self):
    
    339 369
             ready = []
    
    340 370
             process_queues = True
    
    341 371
     
    
    ... ... @@ -344,10 +374,7 @@ class Scheduler():
    344 374
                 # Pull elements forward through queues
    
    345 375
                 elements = []
    
    346 376
                 for queue in self.queues:
    
    347
    -                # Enqueue elements complete from the last queue
    
    348 377
                     queue.enqueue(elements)
    
    349
    -
    
    350
    -                # Dequeue processed elements for the next queue
    
    351 378
                     elements = list(queue.dequeue())
    
    352 379
     
    
    353 380
                 # Kickoff whatever processes can be processed at this time
    
    ... ... @@ -362,41 +389,51 @@ class Scheduler():
    362 389
                 # thus need all the pulls to complete before ever starting
    
    363 390
                 # a build
    
    364 391
                 ready.extend(chain.from_iterable(
    
    365
    -                queue.pop_ready_jobs() for queue in reversed(self.queues)
    
    392
    +                q.harvest_jobs() for q in reversed(self.queues)
    
    366 393
                 ))
    
    367 394
     
    
    368
    -            # pop_ready_jobs() may have skipped jobs, adding them to
    
    369
    -            # the done_queue.  Pull these skipped elements forward to
    
    370
    -            # the next queue and process them.
    
    395
    +            # harvest_jobs() may have decided to skip some jobs, making
    
    396
    +            # them eligible for promotion to the next queue as a side effect.
    
    397
    +            #
    
    398
    +            # If that happens, do another round.
    
    371 399
                 process_queues = any(q.dequeue_ready() for q in self.queues)
    
    372 400
     
    
    373
    -        self._schedule_jobs(ready)
    
    374
    -        self._sched()
    
    401
    +        # Spawn the jobs
    
    402
    +        #
    
    403
    +        for job in ready:
    
    404
    +            self._spawn_job(job)
    
    375 405
     
    
    376
    -    # _run_cleanup()
    
    377
    -    #
    
    378
    -    # Schedules the cache cleanup job if the passed size
    
    379
    -    # exceeds the cache quota.
    
    406
    +    # _sched()
    
    380 407
         #
    
    381
    -    # Args:
    
    382
    -    #    cache_size (int): The calculated cache size (ignored)
    
    408
    +    # Run any jobs which are ready to run, or quit the main loop
    
    409
    +    # when nothing is running or is ready to run.
    
    383 410
         #
    
    384
    -    # NOTE: This runs in response to completion of the cache size
    
    385
    -    #       calculation job lauched by Scheduler.check_cache_size(),
    
    386
    -    #       which will report the calculated cache size.
    
    411
    +    # This is the main driving function of the scheduler, it is called
    
    412
    +    # initially when we enter Scheduler.run(), and at the end of whenever
    
    413
    +    # any job completes, after any bussiness logic has occurred and before
    
    414
    +    # going back to sleep.
    
    387 415
         #
    
    388
    -    def _run_cleanup(self, cache_size):
    
    389
    -        context = self.context
    
    390
    -        artifacts = context.artifactcache
    
    416
    +    def _sched(self):
    
    391 417
     
    
    392
    -        if not artifacts.has_quota_exceeded():
    
    393
    -            return
    
    418
    +        if not self.terminated:
    
    419
    +
    
    420
    +            #
    
    421
    +            # Try the cache management jobs
    
    422
    +            #
    
    423
    +            self._sched_cleanup_job()
    
    424
    +            self._sched_cache_size_job()
    
    425
    +
    
    426
    +            #
    
    427
    +            # Run as many jobs as the queues can handle for the
    
    428
    +            # available resources
    
    429
    +            #
    
    430
    +            self._sched_queue_jobs()
    
    394 431
     
    
    395
    -        job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
    
    396
    -                         resources=[ResourceType.CACHE,
    
    397
    -                                    ResourceType.PROCESS],
    
    398
    -                         exclusive_resources=[ResourceType.CACHE])
    
    399
    -        self._schedule_jobs([job])
    
    432
    +        #
    
    433
    +        # If nothing is ticking then bail out
    
    434
    +        #
    
    435
    +        if not self._active_jobs:
    
    436
    +            self.loop.stop()
    
    400 437
     
    
    401 438
         # _suspend_jobs()
    
    402 439
         #
    
    ... ... @@ -406,7 +443,7 @@ class Scheduler():
    406 443
             if not self.suspended:
    
    407 444
                 self._suspendtime = datetime.datetime.now()
    
    408 445
                 self.suspended = True
    
    409
    -            for job in self.active_jobs:
    
    446
    +            for job in self._active_jobs:
    
    410 447
                     job.suspend()
    
    411 448
     
    
    412 449
         # _resume_jobs()
    
    ... ... @@ -415,7 +452,7 @@ class Scheduler():
    415 452
         #
    
    416 453
         def _resume_jobs(self):
    
    417 454
             if self.suspended:
    
    418
    -            for job in self.active_jobs:
    
    455
    +            for job in self._active_jobs:
    
    419 456
                     job.resume()
    
    420 457
                 self.suspended = False
    
    421 458
                 self._starttime += (datetime.datetime.now() - self._suspendtime)
    
    ... ... @@ -488,19 +525,16 @@ class Scheduler():
    488 525
             wait_limit = 20.0
    
    489 526
     
    
    490 527
             # First tell all jobs to terminate
    
    491
    -        for job in self.active_jobs:
    
    528
    +        for job in self._active_jobs:
    
    492 529
                 job.terminate()
    
    493 530
     
    
    494 531
             # Now wait for them to really terminate
    
    495
    -        for job in self.active_jobs:
    
    532
    +        for job in self._active_jobs:
    
    496 533
                 elapsed = datetime.datetime.now() - wait_start
    
    497 534
                 timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
    
    498 535
                 if not job.terminate_wait(timeout):
    
    499 536
                     job.kill()
    
    500 537
     
    
    501
    -        # Clear out the waiting jobs
    
    502
    -        self.waiting_jobs = []
    
    503
    -
    
    504 538
         # Regular timeout for driving status in the UI
    
    505 539
         def _tick(self):
    
    506 540
             elapsed = self.elapsed_time()
    

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -38,7 +38,7 @@ from .._protos.google.rpc import code_pb2
    38 38
     from .._exceptions import SandboxError
    
    39 39
     from .. import _yaml
    
    40 40
     from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    41
    -from .._artifactcache.cascache import CASRemote, CASRemoteSpec
    
    41
    +from .._cas import CASRemote, CASRemoteSpec
    
    42 42
     
    
    43 43
     
    
    44 44
     class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')):
    
    ... ... @@ -348,17 +348,17 @@ class SandboxRemote(Sandbox):
    348 348
                 except grpc.RpcError as e:
    
    349 349
                     raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
    
    350 350
     
    
    351
    -            if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
    
    351
    +            if not casremote.verify_digest_on_remote(upload_vdir.ref):
    
    352 352
                     raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    353 353
     
    
    354 354
                 # Push command and action
    
    355 355
                 try:
    
    356
    -                cascache.push_message(casremote, command_proto)
    
    356
    +                casremote.push_message(command_proto)
    
    357 357
                 except grpc.RpcError as e:
    
    358 358
                     raise SandboxError("Failed to push command to remote: {}".format(e))
    
    359 359
     
    
    360 360
                 try:
    
    361
    -                cascache.push_message(casremote, action)
    
    361
    +                casremote.push_message(action)
    
    362 362
                 except grpc.RpcError as e:
    
    363 363
                     raise SandboxError("Failed to push action to remote: {}".format(e))
    
    364 364
     
    

  • conftest.py
    ... ... @@ -32,7 +32,7 @@ def pytest_addoption(parser):
    32 32
     
    
    33 33
     
    
    34 34
     def pytest_runtest_setup(item):
    
    35
    -    if item.get_marker('integration') and not item.config.getvalue('integration'):
    
    35
    +    if item.get_closest_marker('integration') and not item.config.getvalue('integration'):
    
    36 36
             pytest.skip('skipping integration test')
    
    37 37
     
    
    38 38
     
    

  • doc/source/using_configuring_artifact_server.rst
    ... ... @@ -94,7 +94,7 @@ requiring BuildStream's more exigent dependencies by setting the
    94 94
     Command reference
    
    95 95
     ~~~~~~~~~~~~~~~~~
    
    96 96
     
    
    97
    -.. click:: buildstream._artifactcache.casserver:server_main
    
    97
    +.. click:: buildstream._cas.casserver:server_main
    
    98 98
        :prog: bst-artifact-server
    
    99 99
     
    
    100 100
     
    

  • tests/artifactcache/config.py
    ... ... @@ -3,8 +3,7 @@ import pytest
    3 3
     import itertools
    
    4 4
     import os
    
    5 5
     
    
    6
    -from buildstream._artifactcache import ArtifactCacheSpec
    
    7
    -from buildstream._artifactcache.artifactcache import _configured_remote_artifact_cache_specs
    
    6
    +from buildstream._artifactcache import ArtifactCacheSpec, _configured_remote_artifact_cache_specs
    
    8 7
     from buildstream._context import Context
    
    9 8
     from buildstream._project import Project
    
    10 9
     from buildstream.utils import _deduplicate
    

  • tests/artifactcache/expiry.py
    ... ... @@ -342,13 +342,13 @@ def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, success):
    342 342
             total_space = 10000
    
    343 343
     
    
    344 344
         volume_space_patch = mock.patch(
    
    345
    -        "buildstream._artifactcache.artifactcache.ArtifactCache._get_volume_space_info_for",
    
    345
    +        "buildstream._artifactcache.ArtifactCache._get_volume_space_info_for",
    
    346 346
             autospec=True,
    
    347 347
             return_value=(free_space, total_space),
    
    348 348
         )
    
    349 349
     
    
    350 350
         cache_size_patch = mock.patch(
    
    351
    -        "buildstream._artifactcache.artifactcache.ArtifactCache.get_cache_size",
    
    351
    +        "buildstream._artifactcache.ArtifactCache.get_cache_size",
    
    352 352
             autospec=True,
    
    353 353
             return_value=0,
    
    354 354
         )
    

  • tests/frontend/order.py
    ... ... @@ -12,7 +12,21 @@ DATA_DIR = os.path.join(
    12 12
     )
    
    13 13
     
    
    14 14
     
    
    15
    -def create_element(repo, name, path, dependencies, ref=None):
    
    15
    +# create_element()
    
    16
    +#
    
    17
    +# Args:
    
    18
    +#    project (str): The project directory where testing is happening
    
    19
    +#    name (str): The element name to create
    
    20
    +#    dependencies (list): The list of dependencies to dump into YAML format
    
    21
    +#
    
    22
    +# Returns:
    
    23
    +#    (Repo): The corresponding git repository created for the element
    
    24
    +def create_element(project, name, dependencies):
    
    25
    +    dev_files_path = os.path.join(project, 'files', 'dev-files')
    
    26
    +    element_path = os.path.join(project, 'elements')
    
    27
    +    repo = create_repo('git', project, "{}-repo".format(name))
    
    28
    +    ref = repo.create(dev_files_path)
    
    29
    +
    
    16 30
         element = {
    
    17 31
             'kind': 'import',
    
    18 32
             'sources': [
    
    ... ... @@ -20,7 +34,9 @@ def create_element(repo, name, path, dependencies, ref=None):
    20 34
             ],
    
    21 35
             'depends': dependencies
    
    22 36
         }
    
    23
    -    _yaml.dump(element, os.path.join(path, name))
    
    37
    +    _yaml.dump(element, os.path.join(element_path, name))
    
    38
    +
    
    39
    +    return repo
    
    24 40
     
    
    25 41
     
    
    26 42
     # This tests a variety of scenarios and checks that the order in
    
    ... ... @@ -59,18 +75,6 @@ def create_element(repo, name, path, dependencies, ref=None):
    59 75
     @pytest.mark.parametrize("operation", [('show'), ('fetch'), ('build')])
    
    60 76
     def test_order(cli, datafiles, tmpdir, operation, target, template, expected):
    
    61 77
         project = os.path.join(datafiles.dirname, datafiles.basename)
    
    62
    -    dev_files_path = os.path.join(project, 'files', 'dev-files')
    
    63
    -    element_path = os.path.join(project, 'elements')
    
    64
    -
    
    65
    -    # FIXME: Remove this when the test passes reliably.
    
    66
    -    #
    
    67
    -    #        There is no reason why the order should not
    
    68
    -    #        be preserved when the builders is set to 1,
    
    69
    -    #        the scheduler queue processing still seems to
    
    70
    -    #        be losing the order.
    
    71
    -    #
    
    72
    -    if operation == 'build':
    
    73
    -        pytest.skip("FIXME: This still only sometimes passes")
    
    74 78
     
    
    75 79
         # Configure to only allow one fetcher at a time, make it easy to
    
    76 80
         # determine what is being planned in what order.
    
    ... ... @@ -84,11 +88,8 @@ def test_order(cli, datafiles, tmpdir, operation, target, template, expected):
    84 88
         # Build the project from the template, make import elements
    
    85 89
         # all with the same repo
    
    86 90
         #
    
    87
    -    repo = create_repo('git', str(tmpdir))
    
    88
    -    ref = repo.create(dev_files_path)
    
    89 91
         for element, dependencies in template.items():
    
    90
    -        create_element(repo, element, element_path, dependencies, ref=ref)
    
    91
    -        repo.add_commit()
    
    92
    +        create_element(project, element, dependencies)
    
    92 93
     
    
    93 94
         # Run test and collect results
    
    94 95
         if operation == 'show':
    

  • tests/sandboxes/storage-tests.py
    ... ... @@ -3,7 +3,7 @@ import pytest
    3 3
     
    
    4 4
     from buildstream._exceptions import ErrorDomain
    
    5 5
     
    
    6
    -from buildstream._artifactcache.cascache import CASCache
    
    6
    +from buildstream._cas import CASCache
    
    7 7
     from buildstream.storage._casbaseddirectory import CasBasedDirectory
    
    8 8
     from buildstream.storage._filebaseddirectory import FileBasedDirectory
    
    9 9
     
    

  • tests/sources/git.py
    ... ... @@ -883,6 +883,195 @@ def test_git_describe(cli, tmpdir, datafiles, ref_storage, tag_type):
    883 883
         assert p.returncode != 0
    
    884 884
     
    
    885 885
     
    
    886
    +@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
    
    887
    +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
    
    888
    +@pytest.mark.parametrize("ref_storage", [('inline'), ('project.refs')])
    
    889
    +@pytest.mark.parametrize("tag_type", [('annotated'), ('lightweight')])
    
    890
    +def test_git_describe_head_is_tagged(cli, tmpdir, datafiles, ref_storage, tag_type):
    
    891
    +    project = str(datafiles)
    
    892
    +
    
    893
    +    project_config = _yaml.load(os.path.join(project, 'project.conf'))
    
    894
    +    project_config['ref-storage'] = ref_storage
    
    895
    +    _yaml.dump(_yaml.node_sanitize(project_config), os.path.join(project, 'project.conf'))
    
    896
    +
    
    897
    +    repofiles = os.path.join(str(tmpdir), 'repofiles')
    
    898
    +    os.makedirs(repofiles, exist_ok=True)
    
    899
    +    file0 = os.path.join(repofiles, 'file0')
    
    900
    +    with open(file0, 'w') as f:
    
    901
    +        f.write('test\n')
    
    902
    +
    
    903
    +    repo = create_repo('git', str(tmpdir))
    
    904
    +
    
    905
    +    def tag(name):
    
    906
    +        if tag_type == 'annotated':
    
    907
    +            repo.add_annotated_tag(name, name)
    
    908
    +        else:
    
    909
    +            repo.add_tag(name)
    
    910
    +
    
    911
    +    ref = repo.create(repofiles)
    
    912
    +    tag('uselesstag')
    
    913
    +
    
    914
    +    file1 = os.path.join(str(tmpdir), 'file1')
    
    915
    +    with open(file1, 'w') as f:
    
    916
    +        f.write('test\n')
    
    917
    +    repo.add_file(file1)
    
    918
    +
    
    919
    +    file2 = os.path.join(str(tmpdir), 'file2')
    
    920
    +    with open(file2, 'w') as f:
    
    921
    +        f.write('test\n')
    
    922
    +    repo.branch('branch2')
    
    923
    +    repo.add_file(file2)
    
    924
    +
    
    925
    +    repo.checkout('master')
    
    926
    +    file3 = os.path.join(str(tmpdir), 'file3')
    
    927
    +    with open(file3, 'w') as f:
    
    928
    +        f.write('test\n')
    
    929
    +    repo.add_file(file3)
    
    930
    +
    
    931
    +    tagged_ref = repo.merge('branch2')
    
    932
    +    tag('tag')
    
    933
    +
    
    934
    +    config = repo.source_config()
    
    935
    +    config['track'] = repo.latest_commit()
    
    936
    +    config['track-tags'] = True
    
    937
    +
    
    938
    +    # Write out our test target
    
    939
    +    element = {
    
    940
    +        'kind': 'import',
    
    941
    +        'sources': [
    
    942
    +            config
    
    943
    +        ],
    
    944
    +    }
    
    945
    +    element_path = os.path.join(project, 'target.bst')
    
    946
    +    _yaml.dump(element, element_path)
    
    947
    +
    
    948
    +    if ref_storage == 'inline':
    
    949
    +        result = cli.run(project=project, args=['source', 'track', 'target.bst'])
    
    950
    +        result.assert_success()
    
    951
    +    else:
    
    952
    +        result = cli.run(project=project, args=['source', 'track', 'target.bst', '--deps', 'all'])
    
    953
    +        result.assert_success()
    
    954
    +
    
    955
    +    if ref_storage == 'inline':
    
    956
    +        element = _yaml.load(element_path)
    
    957
    +        tags = _yaml.node_sanitize(element['sources'][0]['tags'])
    
    958
    +        assert len(tags) == 1
    
    959
    +        for tag in tags:
    
    960
    +            assert 'tag' in tag
    
    961
    +            assert 'commit' in tag
    
    962
    +            assert 'annotated' in tag
    
    963
    +            assert tag['annotated'] == (tag_type == 'annotated')
    
    964
    +
    
    965
    +        assert set([(tag['tag'], tag['commit']) for tag in tags]) == set([('tag', repo.rev_parse('tag^{commit}'))])
    
    966
    +
    
    967
    +    checkout = os.path.join(str(tmpdir), 'checkout')
    
    968
    +
    
    969
    +    result = cli.run(project=project, args=['build', 'target.bst'])
    
    970
    +    result.assert_success()
    
    971
    +    result = cli.run(project=project, args=['checkout', 'target.bst', checkout])
    
    972
    +    result.assert_success()
    
    973
    +
    
    974
    +    if tag_type == 'annotated':
    
    975
    +        options = []
    
    976
    +    else:
    
    977
    +        options = ['--tags']
    
    978
    +    describe = subprocess.check_output(['git', 'describe'] + options,
    
    979
    +                                       cwd=checkout).decode('ascii')
    
    980
    +    assert describe.startswith('tag')
    
    981
    +
    
    982
    +    tags = subprocess.check_output(['git', 'tag'],
    
    983
    +                                   cwd=checkout).decode('ascii')
    
    984
    +    tags = set(tags.splitlines())
    
    985
    +    assert tags == set(['tag'])
    
    986
    +
    
    987
    +    rev_list = subprocess.check_output(['git', 'rev-list', '--all'],
    
    988
    +                                       cwd=checkout).decode('ascii')
    
    989
    +
    
    990
    +    assert set(rev_list.splitlines()) == set([tagged_ref])
    
    991
    +
    
    992
    +    p = subprocess.run(['git', 'log', repo.rev_parse('uselesstag')],
    
    993
    +                       cwd=checkout)
    
    994
    +    assert p.returncode != 0
    
    995
    +
    
    996
    +
    
    997
    +@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
    
    998
    +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
    
    999
    +def test_git_describe_relevant_history(cli, tmpdir, datafiles):
    
    1000
    +    project = str(datafiles)
    
    1001
    +
    
    1002
    +    project_config = _yaml.load(os.path.join(project, 'project.conf'))
    
    1003
    +    project_config['ref-storage'] = 'project.refs'
    
    1004
    +    _yaml.dump(_yaml.node_sanitize(project_config), os.path.join(project, 'project.conf'))
    
    1005
    +
    
    1006
    +    repofiles = os.path.join(str(tmpdir), 'repofiles')
    
    1007
    +    os.makedirs(repofiles, exist_ok=True)
    
    1008
    +    file0 = os.path.join(repofiles, 'file0')
    
    1009
    +    with open(file0, 'w') as f:
    
    1010
    +        f.write('test\n')
    
    1011
    +
    
    1012
    +    repo = create_repo('git', str(tmpdir))
    
    1013
    +    repo.create(repofiles)
    
    1014
    +
    
    1015
    +    file1 = os.path.join(str(tmpdir), 'file1')
    
    1016
    +    with open(file1, 'w') as f:
    
    1017
    +        f.write('test\n')
    
    1018
    +    repo.add_file(file1)
    
    1019
    +    repo.branch('branch')
    
    1020
    +    repo.checkout('master')
    
    1021
    +
    
    1022
    +    file2 = os.path.join(str(tmpdir), 'file2')
    
    1023
    +    with open(file2, 'w') as f:
    
    1024
    +        f.write('test\n')
    
    1025
    +    repo.add_file(file2)
    
    1026
    +
    
    1027
    +    file3 = os.path.join(str(tmpdir), 'file3')
    
    1028
    +    with open(file3, 'w') as f:
    
    1029
    +        f.write('test\n')
    
    1030
    +    branch_boundary = repo.add_file(file3)
    
    1031
    +
    
    1032
    +    repo.checkout('branch')
    
    1033
    +    file4 = os.path.join(str(tmpdir), 'file4')
    
    1034
    +    with open(file4, 'w') as f:
    
    1035
    +        f.write('test\n')
    
    1036
    +    tagged_ref = repo.add_file(file4)
    
    1037
    +    repo.add_annotated_tag('tag1', 'tag1')
    
    1038
    +
    
    1039
    +    head = repo.merge('master')
    
    1040
    +
    
    1041
    +    config = repo.source_config()
    
    1042
    +    config['track'] = head
    
    1043
    +    config['track-tags'] = True
    
    1044
    +
    
    1045
    +    # Write out our test target
    
    1046
    +    element = {
    
    1047
    +        'kind': 'import',
    
    1048
    +        'sources': [
    
    1049
    +            config
    
    1050
    +        ],
    
    1051
    +    }
    
    1052
    +    element_path = os.path.join(project, 'target.bst')
    
    1053
    +    _yaml.dump(element, element_path)
    
    1054
    +
    
    1055
    +    result = cli.run(project=project, args=['source', 'track', 'target.bst', '--deps', 'all'])
    
    1056
    +    result.assert_success()
    
    1057
    +
    
    1058
    +    checkout = os.path.join(str(tmpdir), 'checkout')
    
    1059
    +
    
    1060
    +    result = cli.run(project=project, args=['build', 'target.bst'])
    
    1061
    +    result.assert_success()
    
    1062
    +    result = cli.run(project=project, args=['checkout', 'target.bst', checkout])
    
    1063
    +    result.assert_success()
    
    1064
    +
    
    1065
    +    describe = subprocess.check_output(['git', 'describe'],
    
    1066
    +                                       cwd=checkout).decode('ascii')
    
    1067
    +    assert describe.startswith('tag1-2-')
    
    1068
    +
    
    1069
    +    rev_list = subprocess.check_output(['git', 'rev-list', '--all'],
    
    1070
    +                                       cwd=checkout).decode('ascii')
    
    1071
    +
    
    1072
    +    assert set(rev_list.splitlines()) == set([head, tagged_ref, branch_boundary])
    
    1073
    +
    
    1074
    +
    
    886 1075
     @pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
    
    887 1076
     @pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
    
    888 1077
     def test_default_do_not_track_tags(cli, tmpdir, datafiles):
    

  • tests/storage/virtual_directory_import.py
    ... ... @@ -8,7 +8,7 @@ from tests.testutils import cli
    8 8
     from buildstream.storage._casbaseddirectory import CasBasedDirectory
    
    9 9
     from buildstream.storage._filebaseddirectory import FileBasedDirectory
    
    10 10
     from buildstream._artifactcache import ArtifactCache
    
    11
    -from buildstream._artifactcache.cascache import CASCache
    
    11
    +from buildstream._cas import CASCache
    
    12 12
     from buildstream import utils
    
    13 13
     
    
    14 14
     
    

  • tests/testutils/artifactshare.py
    ... ... @@ -11,8 +11,8 @@ from multiprocessing import Process, Queue
    11 11
     import pytest_cov
    
    12 12
     
    
    13 13
     from buildstream import _yaml
    
    14
    -from buildstream._artifactcache.cascache import CASCache
    
    15
    -from buildstream._artifactcache.casserver import create_server
    
    14
    +from buildstream._cas import CASCache
    
    15
    +from buildstream._cas.casserver import create_server
    
    16 16
     from buildstream._exceptions import CASError
    
    17 17
     from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    18 18
     
    

  • tests/utils/misc.py
    ... ... @@ -23,7 +23,7 @@ def test_parse_size_over_1024T(cli, tmpdir):
    23 23
         _yaml.dump({'name': 'main'}, str(project.join("project.conf")))
    
    24 24
     
    
    25 25
         volume_space_patch = mock.patch(
    
    26
    -        "buildstream._artifactcache.artifactcache.ArtifactCache._get_volume_space_info_for",
    
    26
    +        "buildstream._artifactcache.ArtifactCache._get_volume_space_info_for",
    
    27 27
             autospec=True,
    
    28 28
             return_value=(1025 * TiB, 1025 * TiB)
    
    29 29
         )
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]