[Notes] [Git][BuildStream/buildstream][raoul/802-refactor-artifactcache] 2 commits: _cas: Rename artifactcache folder and move that to a root module



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream

Commits:

13 changed files:

Changes:

  • buildstream/_artifactcache/artifactcache.pybuildstream/_artifactcache.py
    ... ... @@ -23,14 +23,14 @@ import signal
    23 23
     import string
    
    24 24
     from collections.abc import Mapping
    
    25 25
     
    
    26
    -from ..types import _KeyStrength
    
    27
    -from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
    
    28
    -from .._message import Message, MessageType
    
    29
    -from .. import _signals
    
    30
    -from .. import utils
    
    31
    -from .. import _yaml
    
    32
    -
    
    33
    -from .cascache import CASRemote, CASRemoteSpec
    
    26
    +from .types import _KeyStrength
    
    27
    +from ._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
    
    28
    +from ._message import Message, MessageType
    
    29
    +from . import _signals
    
    30
    +from . import utils
    
    31
    +from . import _yaml
    
    32
    +
    
    33
    +from ._cas.casremote import CASRemote, CASRemoteSpec
    
    34 34
     
    
    35 35
     
    
    36 36
     CACHE_SIZE_FILE = "cache_size"
    

  • buildstream/_artifactcache/__init__.py deleted
    1
    -#
    
    2
    -#  Copyright (C) 2017-2018 Codethink Limited
    
    3
    -#
    
    4
    -#  This program is free software; you can redistribute it and/or
    
    5
    -#  modify it under the terms of the GNU Lesser General Public
    
    6
    -#  License as published by the Free Software Foundation; either
    
    7
    -#  version 2 of the License, or (at your option) any later version.
    
    8
    -#
    
    9
    -#  This library is distributed in the hope that it will be useful,
    
    10
    -#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    11
    -#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    12
    -#  Lesser General Public License for more details.
    
    13
    -#
    
    14
    -#  You should have received a copy of the GNU Lesser General Public
    
    15
    -#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    16
    -#
    
    17
    -#  Authors:
    
    18
    -#        Tristan Van Berkom <tristan vanberkom codethink co uk>
    
    19
    -
    
    20
    -from .artifactcache import ArtifactCache, ArtifactCacheSpec, CACHE_SIZE_FILE

  • buildstream/_cas/__init__.py

  • buildstream/_artifactcache/cascache.pybuildstream/_cas/cascache.py
    ... ... @@ -17,7 +17,6 @@
    17 17
     #  Authors:
    
    18 18
     #        Jürg Billeter <juerg billeter codethink co uk>
    
    19 19
     
    
    20
    -from collections import namedtuple
    
    21 20
     import hashlib
    
    22 21
     import itertools
    
    23 22
     import io
    
    ... ... @@ -26,74 +25,17 @@ import stat
    26 25
     import tempfile
    
    27 26
     import uuid
    
    28 27
     import contextlib
    
    29
    -from urllib.parse import urlparse
    
    30 28
     
    
    31 29
     import grpc
    
    32 30
     
    
    33
    -from .._protos.google.rpc import code_pb2
    
    34
    -from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    35
    -from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    36
    -from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    31
    +from .._protos.google.bytestream import bytestream_pb2
    
    32
    +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    33
    +from .._protos.buildstream.v2 import buildstream_pb2
    
    37 34
     
    
    38 35
     from .. import utils
    
    39
    -from .._exceptions import CASError, LoadError, LoadErrorReason
    
    40
    -from .. import _yaml
    
    36
    +from .._exceptions import CASError
    
    41 37
     
    
    42
    -
    
    43
    -# The default limit for gRPC messages is 4 MiB.
    
    44
    -# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    45
    -_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    46
    -
    
    47
    -
    
    48
    -class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert')):
    
    49
    -
    
    50
    -    # _new_from_config_node
    
    51
    -    #
    
    52
    -    # Creates an CASRemoteSpec() from a YAML loaded node
    
    53
    -    #
    
    54
    -    @staticmethod
    
    55
    -    def _new_from_config_node(spec_node, basedir=None):
    
    56
    -        _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert'])
    
    57
    -        url = _yaml.node_get(spec_node, str, 'url')
    
    58
    -        push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
    
    59
    -        if not url:
    
    60
    -            provenance = _yaml.node_get_provenance(spec_node, 'url')
    
    61
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    62
    -                            "{}: empty artifact cache URL".format(provenance))
    
    63
    -
    
    64
    -        server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
    
    65
    -        if server_cert and basedir:
    
    66
    -            server_cert = os.path.join(basedir, server_cert)
    
    67
    -
    
    68
    -        client_key = _yaml.node_get(spec_node, str, 'client-key', default_value=None)
    
    69
    -        if client_key and basedir:
    
    70
    -            client_key = os.path.join(basedir, client_key)
    
    71
    -
    
    72
    -        client_cert = _yaml.node_get(spec_node, str, 'client-cert', default_value=None)
    
    73
    -        if client_cert and basedir:
    
    74
    -            client_cert = os.path.join(basedir, client_cert)
    
    75
    -
    
    76
    -        if client_key and not client_cert:
    
    77
    -            provenance = _yaml.node_get_provenance(spec_node, 'client-key')
    
    78
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    79
    -                            "{}: 'client-key' was specified without 'client-cert'".format(provenance))
    
    80
    -
    
    81
    -        if client_cert and not client_key:
    
    82
    -            provenance = _yaml.node_get_provenance(spec_node, 'client-cert')
    
    83
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    84
    -                            "{}: 'client-cert' was specified without 'client-key'".format(provenance))
    
    85
    -
    
    86
    -        return CASRemoteSpec(url, push, server_cert, client_key, client_cert)
    
    87
    -
    
    88
    -
    
    89
    -CASRemoteSpec.__new__.__defaults__ = (None, None, None)
    
    90
    -
    
    91
    -
    
    92
    -class BlobNotFound(CASError):
    
    93
    -
    
    94
    -    def __init__(self, blob, msg):
    
    95
    -        self.blob = blob
    
    96
    -        super().__init__(msg)
    
    38
    +from .casremote import CASRemote, BlobNotFound, _CASBatchRead, _CASBatchUpdate, _MAX_PAYLOAD_BYTES
    
    97 39
     
    
    98 40
     
    
    99 41
     # A CASCache manages a CAS repository as specified in the Remote Execution API.
    
    ... ... @@ -1137,183 +1079,6 @@ class CASCache():
    1137 1079
             batch.send()
    
    1138 1080
     
    
    1139 1081
     
    
    1140
    -# Represents a single remote CAS cache.
    
    1141
    -#
    
    1142
    -class CASRemote():
    
    1143
    -    def __init__(self, spec):
    
    1144
    -        self.spec = spec
    
    1145
    -        self._initialized = False
    
    1146
    -        self.channel = None
    
    1147
    -        self.bytestream = None
    
    1148
    -        self.cas = None
    
    1149
    -        self.ref_storage = None
    
    1150
    -        self.batch_update_supported = None
    
    1151
    -        self.batch_read_supported = None
    
    1152
    -        self.capabilities = None
    
    1153
    -        self.max_batch_total_size_bytes = None
    
    1154
    -
    
    1155
    -    def init(self):
    
    1156
    -        if not self._initialized:
    
    1157
    -            url = urlparse(self.spec.url)
    
    1158
    -            if url.scheme == 'http':
    
    1159
    -                port = url.port or 80
    
    1160
    -                self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port))
    
    1161
    -            elif url.scheme == 'https':
    
    1162
    -                port = url.port or 443
    
    1163
    -
    
    1164
    -                if self.spec.server_cert:
    
    1165
    -                    with open(self.spec.server_cert, 'rb') as f:
    
    1166
    -                        server_cert_bytes = f.read()
    
    1167
    -                else:
    
    1168
    -                    server_cert_bytes = None
    
    1169
    -
    
    1170
    -                if self.spec.client_key:
    
    1171
    -                    with open(self.spec.client_key, 'rb') as f:
    
    1172
    -                        client_key_bytes = f.read()
    
    1173
    -                else:
    
    1174
    -                    client_key_bytes = None
    
    1175
    -
    
    1176
    -                if self.spec.client_cert:
    
    1177
    -                    with open(self.spec.client_cert, 'rb') as f:
    
    1178
    -                        client_cert_bytes = f.read()
    
    1179
    -                else:
    
    1180
    -                    client_cert_bytes = None
    
    1181
    -
    
    1182
    -                credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes,
    
    1183
    -                                                           private_key=client_key_bytes,
    
    1184
    -                                                           certificate_chain=client_cert_bytes)
    
    1185
    -                self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
    
    1186
    -            else:
    
    1187
    -                raise CASError("Unsupported URL: {}".format(self.spec.url))
    
    1188
    -
    
    1189
    -            self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    1190
    -            self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    1191
    -            self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
    
    1192
    -            self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
    
    1193
    -
    
    1194
    -            self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    1195
    -            try:
    
    1196
    -                request = remote_execution_pb2.GetCapabilitiesRequest()
    
    1197
    -                response = self.capabilities.GetCapabilities(request)
    
    1198
    -                server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
    
    1199
    -                if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
    
    1200
    -                    self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
    
    1201
    -            except grpc.RpcError as e:
    
    1202
    -                # Simply use the defaults for servers that don't implement GetCapabilities()
    
    1203
    -                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1204
    -                    raise
    
    1205
    -
    
    1206
    -            # Check whether the server supports BatchReadBlobs()
    
    1207
    -            self.batch_read_supported = False
    
    1208
    -            try:
    
    1209
    -                request = remote_execution_pb2.BatchReadBlobsRequest()
    
    1210
    -                response = self.cas.BatchReadBlobs(request)
    
    1211
    -                self.batch_read_supported = True
    
    1212
    -            except grpc.RpcError as e:
    
    1213
    -                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1214
    -                    raise
    
    1215
    -
    
    1216
    -            # Check whether the server supports BatchUpdateBlobs()
    
    1217
    -            self.batch_update_supported = False
    
    1218
    -            try:
    
    1219
    -                request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1220
    -                response = self.cas.BatchUpdateBlobs(request)
    
    1221
    -                self.batch_update_supported = True
    
    1222
    -            except grpc.RpcError as e:
    
    1223
    -                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    1224
    -                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    1225
    -                    raise
    
    1226
    -
    
    1227
    -            self._initialized = True
    
    1228
    -
    
    1229
    -
    
    1230
    -# Represents a batch of blobs queued for fetching.
    
    1231
    -#
    
    1232
    -class _CASBatchRead():
    
    1233
    -    def __init__(self, remote):
    
    1234
    -        self._remote = remote
    
    1235
    -        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1236
    -        self._request = remote_execution_pb2.BatchReadBlobsRequest()
    
    1237
    -        self._size = 0
    
    1238
    -        self._sent = False
    
    1239
    -
    
    1240
    -    def add(self, digest):
    
    1241
    -        assert not self._sent
    
    1242
    -
    
    1243
    -        new_batch_size = self._size + digest.size_bytes
    
    1244
    -        if new_batch_size > self._max_total_size_bytes:
    
    1245
    -            # Not enough space left in current batch
    
    1246
    -            return False
    
    1247
    -
    
    1248
    -        request_digest = self._request.digests.add()
    
    1249
    -        request_digest.hash = digest.hash
    
    1250
    -        request_digest.size_bytes = digest.size_bytes
    
    1251
    -        self._size = new_batch_size
    
    1252
    -        return True
    
    1253
    -
    
    1254
    -    def send(self):
    
    1255
    -        assert not self._sent
    
    1256
    -        self._sent = True
    
    1257
    -
    
    1258
    -        if not self._request.digests:
    
    1259
    -            return
    
    1260
    -
    
    1261
    -        batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    1262
    -
    
    1263
    -        for response in batch_response.responses:
    
    1264
    -            if response.status.code == code_pb2.NOT_FOUND:
    
    1265
    -                raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
    
    1266
    -                    response.digest.hash, response.status.code))
    
    1267
    -            if response.status.code != code_pb2.OK:
    
    1268
    -                raise CASError("Failed to download blob {}: {}".format(
    
    1269
    -                    response.digest.hash, response.status.code))
    
    1270
    -            if response.digest.size_bytes != len(response.data):
    
    1271
    -                raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    1272
    -                    response.digest.hash, response.digest.size_bytes, len(response.data)))
    
    1273
    -
    
    1274
    -            yield (response.digest, response.data)
    
    1275
    -
    
    1276
    -
    
    1277
    -# Represents a batch of blobs queued for upload.
    
    1278
    -#
    
    1279
    -class _CASBatchUpdate():
    
    1280
    -    def __init__(self, remote):
    
    1281
    -        self._remote = remote
    
    1282
    -        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1283
    -        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1284
    -        self._size = 0
    
    1285
    -        self._sent = False
    
    1286
    -
    
    1287
    -    def add(self, digest, stream):
    
    1288
    -        assert not self._sent
    
    1289
    -
    
    1290
    -        new_batch_size = self._size + digest.size_bytes
    
    1291
    -        if new_batch_size > self._max_total_size_bytes:
    
    1292
    -            # Not enough space left in current batch
    
    1293
    -            return False
    
    1294
    -
    
    1295
    -        blob_request = self._request.requests.add()
    
    1296
    -        blob_request.digest.hash = digest.hash
    
    1297
    -        blob_request.digest.size_bytes = digest.size_bytes
    
    1298
    -        blob_request.data = stream.read(digest.size_bytes)
    
    1299
    -        self._size = new_batch_size
    
    1300
    -        return True
    
    1301
    -
    
    1302
    -    def send(self):
    
    1303
    -        assert not self._sent
    
    1304
    -        self._sent = True
    
    1305
    -
    
    1306
    -        if not self._request.requests:
    
    1307
    -            return
    
    1308
    -
    
    1309
    -        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    1310
    -
    
    1311
    -        for response in batch_response.responses:
    
    1312
    -            if response.status.code != code_pb2.OK:
    
    1313
    -                raise CASError("Failed to upload blob {}: {}".format(
    
    1314
    -                    response.digest.hash, response.status.code))
    
    1315
    -
    
    1316
    -
    
    1317 1082
     def _grouper(iterable, n):
    
    1318 1083
         while True:
    
    1319 1084
             try:
    

  • buildstream/_cas/casremote.py
    1
    +from collections import namedtuple
    
    2
    +import os
    
    3
    +from urllib.parse import urlparse
    
    4
    +
    
    5
    +import grpc
    
    6
    +
    
    7
    +from .. import _yaml
    
    8
    +from .._protos.google.rpc import code_pb2
    
    9
    +from .._protos.google.bytestream import bytestream_pb2_grpc
    
    10
    +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    11
    +from .._protos.buildstream.v2 import buildstream_pb2_grpc
    
    12
    +
    
    13
    +from .._exceptions import CASError, LoadError, LoadErrorReason
    
    14
    +
    
    15
    +# The default limit for gRPC messages is 4 MiB.
    
    16
    +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    17
    +_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    18
    +
    
    19
    +
    
    20
    +class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert')):
    
    21
    +
    
    22
    +    # _new_from_config_node
    
    23
    +    #
    
    24
    +    # Creates an CASRemoteSpec() from a YAML loaded node
    
    25
    +    #
    
    26
    +    @staticmethod
    
    27
    +    def _new_from_config_node(spec_node, basedir=None):
    
    28
    +        _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert'])
    
    29
    +        url = _yaml.node_get(spec_node, str, 'url')
    
    30
    +        push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
    
    31
    +        if not url:
    
    32
    +            provenance = _yaml.node_get_provenance(spec_node, 'url')
    
    33
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    34
    +                            "{}: empty artifact cache URL".format(provenance))
    
    35
    +
    
    36
    +        server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
    
    37
    +        if server_cert and basedir:
    
    38
    +            server_cert = os.path.join(basedir, server_cert)
    
    39
    +
    
    40
    +        client_key = _yaml.node_get(spec_node, str, 'client-key', default_value=None)
    
    41
    +        if client_key and basedir:
    
    42
    +            client_key = os.path.join(basedir, client_key)
    
    43
    +
    
    44
    +        client_cert = _yaml.node_get(spec_node, str, 'client-cert', default_value=None)
    
    45
    +        if client_cert and basedir:
    
    46
    +            client_cert = os.path.join(basedir, client_cert)
    
    47
    +
    
    48
    +        if client_key and not client_cert:
    
    49
    +            provenance = _yaml.node_get_provenance(spec_node, 'client-key')
    
    50
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    51
    +                            "{}: 'client-key' was specified without 'client-cert'".format(provenance))
    
    52
    +
    
    53
    +        if client_cert and not client_key:
    
    54
    +            provenance = _yaml.node_get_provenance(spec_node, 'client-cert')
    
    55
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    56
    +                            "{}: 'client-cert' was specified without 'client-key'".format(provenance))
    
    57
    +
    
    58
    +        return CASRemoteSpec(url, push, server_cert, client_key, client_cert)
    
    59
    +
    
    60
    +
    
    61
    +CASRemoteSpec.__new__.__defaults__ = (None, None, None)
    
    62
    +
    
    63
    +
    
    64
    +class BlobNotFound(CASError):
    
    65
    +
    
    66
    +    def __init__(self, blob, msg):
    
    67
    +        self.blob = blob
    
    68
    +        super().__init__(msg)
    
    69
    +
    
    70
    +
    
    71
    +# Represents a single remote CAS cache.
    
    72
    +#
    
    73
    +class CASRemote():
    
    74
    +    def __init__(self, spec):
    
    75
    +        self.spec = spec
    
    76
    +        self._initialized = False
    
    77
    +        self.channel = None
    
    78
    +        self.bytestream = None
    
    79
    +        self.cas = None
    
    80
    +        self.ref_storage = None
    
    81
    +        self.batch_update_supported = None
    
    82
    +        self.batch_read_supported = None
    
    83
    +        self.capabilities = None
    
    84
    +        self.max_batch_total_size_bytes = None
    
    85
    +
    
    86
    +    def init(self):
    
    87
    +        if not self._initialized:
    
    88
    +            url = urlparse(self.spec.url)
    
    89
    +            if url.scheme == 'http':
    
    90
    +                port = url.port or 80
    
    91
    +                self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port))
    
    92
    +            elif url.scheme == 'https':
    
    93
    +                port = url.port or 443
    
    94
    +
    
    95
    +                if self.spec.server_cert:
    
    96
    +                    with open(self.spec.server_cert, 'rb') as f:
    
    97
    +                        server_cert_bytes = f.read()
    
    98
    +                else:
    
    99
    +                    server_cert_bytes = None
    
    100
    +
    
    101
    +                if self.spec.client_key:
    
    102
    +                    with open(self.spec.client_key, 'rb') as f:
    
    103
    +                        client_key_bytes = f.read()
    
    104
    +                else:
    
    105
    +                    client_key_bytes = None
    
    106
    +
    
    107
    +                if self.spec.client_cert:
    
    108
    +                    with open(self.spec.client_cert, 'rb') as f:
    
    109
    +                        client_cert_bytes = f.read()
    
    110
    +                else:
    
    111
    +                    client_cert_bytes = None
    
    112
    +
    
    113
    +                credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes,
    
    114
    +                                                           private_key=client_key_bytes,
    
    115
    +                                                           certificate_chain=client_cert_bytes)
    
    116
    +                self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
    
    117
    +            else:
    
    118
    +                raise CASError("Unsupported URL: {}".format(self.spec.url))
    
    119
    +
    
    120
    +            self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    121
    +            self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    122
    +            self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
    
    123
    +            self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
    
    124
    +
    
    125
    +            self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    126
    +            try:
    
    127
    +                request = remote_execution_pb2.GetCapabilitiesRequest()
    
    128
    +                response = self.capabilities.GetCapabilities(request)
    
    129
    +                server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
    
    130
    +                if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
    
    131
    +                    self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
    
    132
    +            except grpc.RpcError as e:
    
    133
    +                # Simply use the defaults for servers that don't implement GetCapabilities()
    
    134
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    135
    +                    raise
    
    136
    +
    
    137
    +            # Check whether the server supports BatchReadBlobs()
    
    138
    +            self.batch_read_supported = False
    
    139
    +            try:
    
    140
    +                request = remote_execution_pb2.BatchReadBlobsRequest()
    
    141
    +                response = self.cas.BatchReadBlobs(request)
    
    142
    +                self.batch_read_supported = True
    
    143
    +            except grpc.RpcError as e:
    
    144
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    145
    +                    raise
    
    146
    +
    
    147
    +            # Check whether the server supports BatchUpdateBlobs()
    
    148
    +            self.batch_update_supported = False
    
    149
    +            try:
    
    150
    +                request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    151
    +                response = self.cas.BatchUpdateBlobs(request)
    
    152
    +                self.batch_update_supported = True
    
    153
    +            except grpc.RpcError as e:
    
    154
    +                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    155
    +                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    156
    +                    raise
    
    157
    +
    
    158
    +            self._initialized = True
    
    159
    +
    
    160
    +
    
    161
    +# Represents a batch of blobs queued for fetching.
    
    162
    +#
    
    163
    +class _CASBatchRead():
    
    164
    +    def __init__(self, remote):
    
    165
    +        self._remote = remote
    
    166
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    167
    +        self._request = remote_execution_pb2.BatchReadBlobsRequest()
    
    168
    +        self._size = 0
    
    169
    +        self._sent = False
    
    170
    +
    
    171
    +    def add(self, digest):
    
    172
    +        assert not self._sent
    
    173
    +
    
    174
    +        new_batch_size = self._size + digest.size_bytes
    
    175
    +        if new_batch_size > self._max_total_size_bytes:
    
    176
    +            # Not enough space left in current batch
    
    177
    +            return False
    
    178
    +
    
    179
    +        request_digest = self._request.digests.add()
    
    180
    +        request_digest.hash = digest.hash
    
    181
    +        request_digest.size_bytes = digest.size_bytes
    
    182
    +        self._size = new_batch_size
    
    183
    +        return True
    
    184
    +
    
    185
    +    def send(self):
    
    186
    +        assert not self._sent
    
    187
    +        self._sent = True
    
    188
    +
    
    189
    +        if not self._request.digests:
    
    190
    +            return
    
    191
    +
    
    192
    +        batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    193
    +
    
    194
    +        for response in batch_response.responses:
    
    195
    +            if response.status.code == code_pb2.NOT_FOUND:
    
    196
    +                raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
    
    197
    +                    response.digest.hash, response.status.code))
    
    198
    +            if response.status.code != code_pb2.OK:
    
    199
    +                raise CASError("Failed to download blob {}: {}".format(
    
    200
    +                    response.digest.hash, response.status.code))
    
    201
    +            if response.digest.size_bytes != len(response.data):
    
    202
    +                raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    203
    +                    response.digest.hash, response.digest.size_bytes, len(response.data)))
    
    204
    +
    
    205
    +            yield (response.digest, response.data)
    
    206
    +
    
    207
    +
    
    208
    +# Represents a batch of blobs queued for upload.
    
    209
    +#
    
    210
    +class _CASBatchUpdate():
    
    211
    +    def __init__(self, remote):
    
    212
    +        self._remote = remote
    
    213
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    214
    +        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    215
    +        self._size = 0
    
    216
    +        self._sent = False
    
    217
    +
    
    218
    +    def add(self, digest, stream):
    
    219
    +        assert not self._sent
    
    220
    +
    
    221
    +        new_batch_size = self._size + digest.size_bytes
    
    222
    +        if new_batch_size > self._max_total_size_bytes:
    
    223
    +            # Not enough space left in current batch
    
    224
    +            return False
    
    225
    +
    
    226
    +        blob_request = self._request.requests.add()
    
    227
    +        blob_request.digest.hash = digest.hash
    
    228
    +        blob_request.digest.size_bytes = digest.size_bytes
    
    229
    +        blob_request.data = stream.read(digest.size_bytes)
    
    230
    +        self._size = new_batch_size
    
    231
    +        return True
    
    232
    +
    
    233
    +    def send(self):
    
    234
    +        assert not self._sent
    
    235
    +        self._sent = True
    
    236
    +
    
    237
    +        if not self._request.requests:
    
    238
    +            return
    
    239
    +
    
    240
    +        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    241
    +
    
    242
    +        for response in batch_response.responses:
    
    243
    +            if response.status.code != code_pb2.OK:
    
    244
    +                raise CASError("Failed to upload blob {}: {}".format(
    
    245
    +                    response.digest.hash, response.status.code))

  • buildstream/_artifactcache/casserver.pybuildstream/_cas/casserver.py

  • buildstream/_context.py
    ... ... @@ -31,7 +31,7 @@ from ._exceptions import LoadError, LoadErrorReason, BstError
    31 31
     from ._message import Message, MessageType
    
    32 32
     from ._profile import Topics, profile_start, profile_end
    
    33 33
     from ._artifactcache import ArtifactCache
    
    34
    -from ._artifactcache.cascache import CASCache
    
    34
    +from ._cas.cascache import CASCache
    
    35 35
     from ._workspaces import Workspaces
    
    36 36
     from .plugin import _plugin_lookup
    
    37 37
     
    

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -38,7 +38,7 @@ from .._protos.google.rpc import code_pb2
    38 38
     from .._exceptions import SandboxError
    
    39 39
     from .. import _yaml
    
    40 40
     from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    41
    -from .._artifactcache.cascache import CASRemote, CASRemoteSpec
    
    41
    +from .._cas.casremote import CASRemote, CASRemoteSpec
    
    42 42
     
    
    43 43
     
    
    44 44
     class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')):
    

  • doc/source/using_configuring_artifact_server.rst
    ... ... @@ -94,7 +94,7 @@ requiring BuildStream's more exigent dependencies by setting the
    94 94
     Command reference
    
    95 95
     ~~~~~~~~~~~~~~~~~
    
    96 96
     
    
    97
    -.. click:: buildstream._artifactcache.casserver:server_main
    
    97
    +.. click:: buildstream._cas.casserver:server_main
    
    98 98
        :prog: bst-artifact-server
    
    99 99
     
    
    100 100
     
    

  • tests/artifactcache/config.py
    ... ... @@ -3,8 +3,7 @@ import pytest
    3 3
     import itertools
    
    4 4
     import os
    
    5 5
     
    
    6
    -from buildstream._artifactcache import ArtifactCacheSpec
    
    7
    -from buildstream._artifactcache.artifactcache import _configured_remote_artifact_cache_specs
    
    6
    +from buildstream._artifactcache import ArtifactCacheSpec, _configured_remote_artifact_cache_specs
    
    8 7
     from buildstream._context import Context
    
    9 8
     from buildstream._project import Project
    
    10 9
     from buildstream.utils import _deduplicate
    

  • tests/sandboxes/storage-tests.py
    ... ... @@ -3,7 +3,7 @@ import pytest
    3 3
     
    
    4 4
     from buildstream._exceptions import ErrorDomain
    
    5 5
     
    
    6
    -from buildstream._artifactcache.cascache import CASCache
    
    6
    +from buildstream._cas.cascache import CASCache
    
    7 7
     from buildstream.storage._casbaseddirectory import CasBasedDirectory
    
    8 8
     from buildstream.storage._filebaseddirectory import FileBasedDirectory
    
    9 9
     
    

  • tests/storage/virtual_directory_import.py
    ... ... @@ -8,7 +8,7 @@ from tests.testutils import cli
    8 8
     from buildstream.storage._casbaseddirectory import CasBasedDirectory
    
    9 9
     from buildstream.storage._filebaseddirectory import FileBasedDirectory
    
    10 10
     from buildstream._artifactcache import ArtifactCache
    
    11
    -from buildstream._artifactcache.cascache import CASCache
    
    11
    +from buildstream._cas.cascache import CASCache
    
    12 12
     from buildstream import utils
    
    13 13
     
    
    14 14
     
    

  • tests/testutils/artifactshare.py
    ... ... @@ -11,8 +11,8 @@ from multiprocessing import Process, Queue
    11 11
     import pytest_cov
    
    12 12
     
    
    13 13
     from buildstream import _yaml
    
    14
    -from buildstream._artifactcache.cascache import CASCache
    
    15
    -from buildstream._artifactcache.casserver import create_server
    
    14
    +from buildstream._cas.cascache import CASCache
    
    15
    +from buildstream._cas.casserver import create_server
    
    16 16
     from buildstream._exceptions import CASError
    
    17 17
     from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    18 18
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]