[Notes] [Git][BuildStream/buildstream][finn/refactor-remote-stubs] 3 commits: Refactored reference stub.



Title: GitLab

finn pushed to branch finn/refactor-remote-stubs at BuildStream / buildstream

Commits:

1 changed file:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -249,7 +249,7 @@ class CASCache():
    249 249
                 remote.init()
    
    250 250
     
    
    251 251
                 request = buildstream_pb2.StatusRequest()
    
    252
    -            response = remote.ref_storage.Status(request)
    
    252
    +            response = remote._ref_storage_stub.Status(request)
    
    253 253
     
    
    254 254
                 if remote_spec.push and not response.allow_updates:
    
    255 255
                     q.put('CAS server does not allow push')
    
    ... ... @@ -284,9 +284,7 @@ class CASCache():
    284 284
             try:
    
    285 285
                 remote.init()
    
    286 286
     
    
    287
    -            request = buildstream_pb2.GetReferenceRequest()
    
    288
    -            request.key = ref
    
    289
    -            response = remote.ref_storage.GetReference(request)
    
    287
    +            response = remote.get_reference(ref)
    
    290 288
     
    
    291 289
                 tree = remote_execution_pb2.Digest()
    
    292 290
                 tree.hash = response.digest.hash
    
    ... ... @@ -369,9 +367,7 @@ class CASCache():
    369 367
                     # Check whether ref is already on the server in which case
    
    370 368
                     # there is no need to push the ref
    
    371 369
                     try:
    
    372
    -                    request = buildstream_pb2.GetReferenceRequest()
    
    373
    -                    request.key = ref
    
    374
    -                    response = remote.ref_storage.GetReference(request)
    
    370
    +                    response = remote.get_reference(ref)
    
    375 371
     
    
    376 372
                         if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    377 373
                             # ref is already on the server with the same tree
    
    ... ... @@ -384,11 +380,7 @@ class CASCache():
    384 380
     
    
    385 381
                     self._send_directory(remote, tree)
    
    386 382
     
    
    387
    -                request = buildstream_pb2.UpdateReferenceRequest()
    
    388
    -                request.keys.append(ref)
    
    389
    -                request.digest.hash = tree.hash
    
    390
    -                request.digest.size_bytes = tree.size_bytes
    
    391
    -                remote.ref_storage.UpdateReference(request)
    
    383
    +                remote.update_reference(ref, tree.hash, tree.size_bytes)
    
    392 384
     
    
    393 385
                     skipped_remote = False
    
    394 386
             except grpc.RpcError as e:
    
    ... ... @@ -909,10 +901,8 @@ class CASCache():
    909 901
     
    
    910 902
         def _fetch_blob(self, remote, digest, stream):
    
    911 903
             resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    912
    -        request = bytestream_pb2.ReadRequest()
    
    913
    -        request.resource_name = resource_name
    
    914
    -        request.read_offset = 0
    
    915
    -        for response in remote.bytestream.Read(request):
    
    904
    +        for response in remote.read(resource_name=resource_name, read_offset=0):
    
    905
    +            print(response)
    
    916 906
                 stream.write(response.data)
    
    917 907
             stream.flush()
    
    918 908
     
    
    ... ... @@ -1067,27 +1057,7 @@ class CASCache():
    1067 1057
             resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
    
    1068 1058
                                       digest.hash, str(digest.size_bytes)])
    
    1069 1059
     
    
    1070
    -        def request_stream(resname, instream):
    
    1071
    -            offset = 0
    
    1072
    -            finished = False
    
    1073
    -            remaining = digest.size_bytes
    
    1074
    -            while not finished:
    
    1075
    -                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    1076
    -                remaining -= chunk_size
    
    1077
    -
    
    1078
    -                request = bytestream_pb2.WriteRequest()
    
    1079
    -                request.write_offset = offset
    
    1080
    -                # max. _MAX_PAYLOAD_BYTES chunks
    
    1081
    -                request.data = instream.read(chunk_size)
    
    1082
    -                request.resource_name = resname
    
    1083
    -                request.finish_write = remaining <= 0
    
    1084
    -
    
    1085
    -                yield request
    
    1086
    -
    
    1087
    -                offset += chunk_size
    
    1088
    -                finished = request.finish_write
    
    1089
    -
    
    1090
    -        response = remote.bytestream.Write(request_stream(resource_name, stream))
    
    1060
    +        response = remote.write(digest, stream, resource_name)
    
    1091 1061
     
    
    1092 1062
             assert response.committed_size == digest.size_bytes
    
    1093 1063
     
    
    ... ... @@ -1097,14 +1067,16 @@ class CASCache():
    1097 1067
             missing_blobs = dict()
    
    1098 1068
             # Limit size of FindMissingBlobs request
    
    1099 1069
             for required_blobs_group in _grouper(required_blobs, 512):
    
    1100
    -            request = remote_execution_pb2.FindMissingBlobsRequest()
    
    1101 1070
     
    
    1071
    +            digests = []
    
    1102 1072
                 for required_digest in required_blobs_group:
    
    1103
    -                d = request.blob_digests.add()
    
    1073
    +                # d = request.blob_digests.add()
    
    1074
    +                d = remote_execution_pb2.Digest()
    
    1104 1075
                     d.hash = required_digest.hash
    
    1105 1076
                     d.size_bytes = required_digest.size_bytes
    
    1077
    +                digests.append(d)
    
    1078
    +            response = remote.find_missing_blobs(digests)
    
    1106 1079
     
    
    1107
    -            response = remote.cas.FindMissingBlobs(request)
    
    1108 1080
                 for missing_digest in response.missing_blob_digests:
    
    1109 1081
                     d = remote_execution_pb2.Digest()
    
    1110 1082
                     d.hash = missing_digest.hash
    
    ... ... @@ -1144,14 +1116,15 @@ class CASRemote():
    1144 1116
             self.spec = spec
    
    1145 1117
             self._initialized = False
    
    1146 1118
             self.channel = None
    
    1147
    -        self.bytestream = None
    
    1148
    -        self.cas = None
    
    1149
    -        self.ref_storage = None
    
    1150 1119
             self.batch_update_supported = None
    
    1151 1120
             self.batch_read_supported = None
    
    1152
    -        self.capabilities = None
    
    1153 1121
             self.max_batch_total_size_bytes = None
    
    1154 1122
     
    
    1123
    +        self._bytestream_stub = None
    
    1124
    +        self._cas_stub = None
    
    1125
    +        self._capabilities_stub = None
    
    1126
    +        self._ref_storage_stub = None
    
    1127
    +
    
    1155 1128
         def init(self):
    
    1156 1129
             if not self._initialized:
    
    1157 1130
                 url = urlparse(self.spec.url)
    
    ... ... @@ -1186,15 +1159,15 @@ class CASRemote():
    1186 1159
                 else:
    
    1187 1160
                     raise CASError("Unsupported URL: {}".format(self.spec.url))
    
    1188 1161
     
    
    1189
    -            self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    1190
    -            self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    1191
    -            self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
    
    1192
    -            self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
    
    1162
    +            self._bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    1163
    +            self._cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    1164
    +            self._capabilities_stub = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
    
    1165
    +            self._ref_storage_stub = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
    
    1193 1166
     
    
    1194 1167
                 self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    1195 1168
                 try:
    
    1196 1169
                     request = remote_execution_pb2.GetCapabilitiesRequest()
    
    1197
    -                response = self.capabilities.GetCapabilities(request)
    
    1170
    +                response = self._capabilities_stub.GetCapabilities(request)
    
    1198 1171
                     server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
    
    1199 1172
                     if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
    
    1200 1173
                         self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
    
    ... ... @@ -1207,7 +1180,7 @@ class CASRemote():
    1207 1180
                 self.batch_read_supported = False
    
    1208 1181
                 try:
    
    1209 1182
                     request = remote_execution_pb2.BatchReadBlobsRequest()
    
    1210
    -                response = self.cas.BatchReadBlobs(request)
    
    1183
    +                response = self._cas_stub.BatchReadBlobs(request)
    
    1211 1184
                     self.batch_read_supported = True
    
    1212 1185
                 except grpc.RpcError as e:
    
    1213 1186
                     if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    ... ... @@ -1217,7 +1190,7 @@ class CASRemote():
    1217 1190
                 self.batch_update_supported = False
    
    1218 1191
                 try:
    
    1219 1192
                     request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1220
    -                response = self.cas.BatchUpdateBlobs(request)
    
    1193
    +                response = self._cas_stub.BatchUpdateBlobs(request)
    
    1221 1194
                     self.batch_update_supported = True
    
    1222 1195
                 except grpc.RpcError as e:
    
    1223 1196
                     if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    ... ... @@ -1226,6 +1199,68 @@ class CASRemote():
    1226 1199
     
    
    1227 1200
                 self._initialized = True
    
    1228 1201
     
    
    1202
    +    def write(self, digest, stream, resource_name):
    
    1203
    +
    
    1204
    +        def __request_stream(resname, instream):
    
    1205
    +            offset = 0
    
    1206
    +            finished = False
    
    1207
    +            remaining = digest.size_bytes
    
    1208
    +            while not finished:
    
    1209
    +                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    1210
    +                remaining -= chunk_size
    
    1211
    +
    
    1212
    +                request = bytestream_pb2.WriteRequest()
    
    1213
    +                request.write_offset = offset
    
    1214
    +                # max. _MAX_PAYLOAD_BYTES chunks
    
    1215
    +                request.data = instream.read(chunk_size)
    
    1216
    +                request.resource_name = resname
    
    1217
    +                request.finish_write = remaining <= 0
    
    1218
    +
    
    1219
    +                yield request
    
    1220
    +
    
    1221
    +                offset += chunk_size
    
    1222
    +                finished = request.finish_write
    
    1223
    +
    
    1224
    +        response = self._bytestream_stub.Write(__request_stream(resource_name, stream))
    
    1225
    +        return response
    
    1226
    +
    
    1227
    +    def read(self, resource_name, read_offset):
    
    1228
    +        request = bytestream_pb2.ReadRequest()
    
    1229
    +        request.resource_name = resource_name
    
    1230
    +        request.read_offset = read_offset
    
    1231
    +        response = self._bytestream_stub.Read(request)
    
    1232
    +        return response
    
    1233
    +
    
    1234
    +    def batch_read_blobs(self, request):
    
    1235
    +        response = self._cas_stub.BatchReadBlobs(request)
    
    1236
    +        return response
    
    1237
    +
    
    1238
    +    def find_missing_blobs(self, digests):
    
    1239
    +        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    1240
    +        request.blob_digests.extend(digests)
    
    1241
    +        response = self._cas_stub.FindMissingBlobs(request)
    
    1242
    +        return response
    
    1243
    +
    
    1244
    +    def batch_update_blobs(self, request):
    
    1245
    +        response = self._cas_stub.BatchUpdateBlobs(request)
    
    1246
    +        return response
    
    1247
    +
    
    1248
    +    def get_capabilities(self):
    
    1249
    +        request = remote_execution_pb2.GetCapabilitiesRequest()
    
    1250
    +        return self._capabilities_stub.GetCapabilities(request)
    
    1251
    +
    
    1252
    +    def get_reference(self, key):
    
    1253
    +        request = buildstream_pb2.GetReferenceRequest()
    
    1254
    +        request.key = key
    
    1255
    +        return self._ref_storage_stub.GetReference(request)
    
    1256
    +
    
    1257
    +    def update_reference(self, keys, digest_hash, digest_size_bytes):
    
    1258
    +        request = buildstream_pb2.UpdateReferenceRequest()
    
    1259
    +        request.keys.append(keys)
    
    1260
    +        request.digest.hash = digest_hash
    
    1261
    +        request.digest.size_bytes = digest_size_bytes
    
    1262
    +        return self._ref_storage_stub.UpdateReference(request)
    
    1263
    +
    
    1229 1264
     
    
    1230 1265
     # Represents a batch of blobs queued for fetching.
    
    1231 1266
     #
    
    ... ... @@ -1258,7 +1293,7 @@ class _CASBatchRead():
    1258 1293
             if not self._request.digests:
    
    1259 1294
                 return
    
    1260 1295
     
    
    1261
    -        batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    1296
    +        batch_response = self._remote.batch_read_blobs(self._request)
    
    1262 1297
     
    
    1263 1298
             for response in batch_response.responses:
    
    1264 1299
                 if response.status.code == code_pb2.NOT_FOUND:
    
    ... ... @@ -1284,6 +1319,9 @@ class _CASBatchUpdate():
    1284 1319
             self._size = 0
    
    1285 1320
             self._sent = False
    
    1286 1321
     
    
    1322
    +        self._hash = None
    
    1323
    +        self._size_bytes = None
    
    1324
    +
    
    1287 1325
         def add(self, digest, stream):
    
    1288 1326
             assert not self._sent
    
    1289 1327
     
    
    ... ... @@ -1296,7 +1334,7 @@ class _CASBatchUpdate():
    1296 1334
             blob_request.digest.hash = digest.hash
    
    1297 1335
             blob_request.digest.size_bytes = digest.size_bytes
    
    1298 1336
             blob_request.data = stream.read(digest.size_bytes)
    
    1299
    -        self._size = new_batch_size
    
    1337
    +
    
    1300 1338
             return True
    
    1301 1339
     
    
    1302 1340
         def send(self):
    
    ... ... @@ -1306,7 +1344,7 @@ class _CASBatchUpdate():
    1306 1344
             if not self._request.requests:
    
    1307 1345
                 return
    
    1308 1346
     
    
    1309
    -        batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
    
    1347
    +        batch_response = self._remote.batch_update_blobs(self._request)
    
    1310 1348
     
    
    1311 1349
             for response in batch_response.responses:
    
    1312 1350
                 if response.status.code != code_pb2.OK:
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]