[Notes] [Git][BuildStream/buildstream][master] 3 commits: _artifactcache/casserver.py: Implement BatchUpdateBlobs



Title: GitLab

Jürg Billeter pushed to branch master at BuildStream / buildstream

Commits:

2 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -1048,10 +1048,29 @@ class CASCache(ArtifactCache):
    1048 1048
                     missing_blobs[d.hash] = d
    
    1049 1049
     
    
    1050 1050
             # Upload any blobs missing on the server
    
    1051
    -        for blob_digest in missing_blobs.values():
    
    1052
    -            with open(self.objpath(blob_digest), 'rb') as f:
    
    1053
    -                assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
    
    1054
    -                self._send_blob(remote, blob_digest, f, u_uid=u_uid)
    
    1051
    +        self._send_blobs(remote, missing_blobs.values(), u_uid)
    
    1052
    +
    
    1053
    +    def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
    
    1054
    +        batch = _CASBatchUpdate(remote)
    
    1055
    +
    
    1056
    +        for digest in digests:
    
    1057
    +            with open(self.objpath(digest), 'rb') as f:
    
    1058
    +                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    1059
    +
    
    1060
    +                if (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    1061
    +                        not remote.batch_update_supported):
    
    1062
    +                    # Too large for batch request, upload in independent request.
    
    1063
    +                    self._send_blob(remote, digest, f, u_uid=u_uid)
    
    1064
    +                else:
    
    1065
    +                    if not batch.add(digest, f):
    
    1066
    +                        # Not enough space left in batch request.
    
    1067
    +                        # Complete pending batch first.
    
    1068
    +                        batch.send()
    
    1069
    +                        batch = _CASBatchUpdate(remote)
    
    1070
    +                        batch.add(digest, f)
    
    1071
    +
    
    1072
    +        # Send final batch
    
    1073
    +        batch.send()
    
    1055 1074
     
    
    1056 1075
     
    
    1057 1076
     # Represents a single remote CAS cache.
    
    ... ... @@ -1126,6 +1145,17 @@ class _CASRemote():
    1126 1145
                     if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1127 1146
                         raise
    
    1128 1147
     
    
    1148
    +            # Check whether the server supports BatchUpdateBlobs()
    
    1149
    +            self.batch_update_supported = False
    
    1150
    +            try:
    
    1151
    +                request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1152
    +                response = self.cas.BatchUpdateBlobs(request)
    
    1153
    +                self.batch_update_supported = True
    
    1154
    +            except grpc.RpcError as e:
    
    1155
    +                if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    1156
    +                        e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    1157
    +                    raise
    
    1158
    +
    
    1129 1159
                 self._initialized = True
    
    1130 1160
     
    
    1131 1161
     
    
    ... ... @@ -1173,6 +1203,46 @@ class _CASBatchRead():
    1173 1203
                 yield (response.digest, response.data)
    
    1174 1204
     
    
    1175 1205
     
    
    1206
    +# Represents a batch of blobs queued for upload.
    
    1207
    +#
    
    1208
    +class _CASBatchUpdate():
    
    1209
    +    def __init__(self, remote):
    
    1210
    +        self._remote = remote
    
    1211
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1212
    +        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1213
    +        self._size = 0
    
    1214
    +        self._sent = False
    
    1215
    +
    
    1216
    +    def add(self, digest, stream):
    
    1217
    +        assert not self._sent
    
    1218
    +
    
    1219
    +        new_batch_size = self._size + digest.size_bytes
    
    1220
    +        if new_batch_size > self._max_total_size_bytes:
    
    1221
    +            # Not enough space left in current batch
    
    1222
    +            return False
    
    1223
    +
    
    1224
    +        blob_request = self._request.requests.add()
    
    1225
    +        blob_request.digest.hash = digest.hash
    
    1226
    +        blob_request.digest.size_bytes = digest.size_bytes
    
    1227
    +        blob_request.data = stream.read(digest.size_bytes)
    
    1228
    +        self._size = new_batch_size
    
    1229
    +        return True
    
    1230
    +
    
    1231
    +    def send(self):
    
    1232
    +        assert not self._sent
    
    1233
    +        self._sent = True
    
    1234
    +
    
    1235
    +        if len(self._request.requests) == 0:
    
    1236
    +            return
    
    1237
    +
    
    1238
    +        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    1239
    +
    
    1240
    +        for response in batch_response.responses:
    
    1241
    +            if response.status.code != grpc.StatusCode.OK.value[0]:
    
    1242
    +                raise ArtifactError("Failed to upload blob {}: {}".format(
    
    1243
    +                    response.digest.hash, response.status.code))
    
    1244
    +
    
    1245
    +
    
    1176 1246
     def _grouper(iterable, n):
    
    1177 1247
         while True:
    
    1178 1248
             try:
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -68,7 +68,7 @@ def create_server(repo, *, enable_push):
    68 68
             _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
    
    69 69
     
    
    70 70
         remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    71
    -        _ContentAddressableStorageServicer(artifactcache), server)
    
    71
    +        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
    
    72 72
     
    
    73 73
         remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
    
    74 74
             _CapabilitiesServicer(), server)
    
    ... ... @@ -222,9 +222,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    222 222
     
    
    223 223
     
    
    224 224
     class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    225
    -    def __init__(self, cas):
    
    225
    +    def __init__(self, cas, *, enable_push):
    
    226 226
             super().__init__()
    
    227 227
             self.cas = cas
    
    228
    +        self.enable_push = enable_push
    
    228 229
     
    
    229 230
         def FindMissingBlobs(self, request, context):
    
    230 231
             response = remote_execution_pb2.FindMissingBlobsResponse()
    
    ... ... @@ -260,6 +261,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    260 261
     
    
    261 262
             return response
    
    262 263
     
    
    264
    +    def BatchUpdateBlobs(self, request, context):
    
    265
    +        response = remote_execution_pb2.BatchUpdateBlobsResponse()
    
    266
    +
    
    267
    +        if not self.enable_push:
    
    268
    +            context.set_code(grpc.StatusCode.PERMISSION_DENIED)
    
    269
    +            return response
    
    270
    +
    
    271
    +        batch_size = 0
    
    272
    +
    
    273
    +        for blob_request in request.requests:
    
    274
    +            digest = blob_request.digest
    
    275
    +
    
    276
    +            batch_size += digest.size_bytes
    
    277
    +            if batch_size > _MAX_PAYLOAD_BYTES:
    
    278
    +                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    279
    +                return response
    
    280
    +
    
    281
    +            blob_response = response.responses.add()
    
    282
    +            blob_response.digest.hash = digest.hash
    
    283
    +            blob_response.digest.size_bytes = digest.size_bytes
    
    284
    +
    
    285
    +            if len(blob_request.data) != digest.size_bytes:
    
    286
    +                blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    287
    +                continue
    
    288
    +
    
    289
    +            try:
    
    290
    +                _clean_up_cache(self.cas, digest.size_bytes)
    
    291
    +
    
    292
    +                with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
    
    293
    +                    out.write(blob_request.data)
    
    294
    +                    out.flush()
    
    295
    +                    server_digest = self.cas.add_object(path=out.name)
    
    296
    +                    if server_digest.hash != digest.hash:
    
    297
    +                        blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
    
    298
    +
    
    299
    +            except ArtifactTooLargeException:
    
    300
    +                blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
    
    301
    +
    
    302
    +        return response
    
    303
    +
    
    263 304
     
    
    264 305
     class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
    
    265 306
         def GetCapabilities(self, request, context):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]