... |
... |
@@ -249,7 +249,7 @@ class CASCache(): |
249
|
249
|
remote.init()
|
250
|
250
|
|
251
|
251
|
request = buildstream_pb2.StatusRequest()
|
252
|
|
- response = remote.ref_storage.Status(request)
|
|
252
|
+ response = remote._ref_storage_stub.Status(request)
|
253
|
253
|
|
254
|
254
|
if remote_spec.push and not response.allow_updates:
|
255
|
255
|
q.put('CAS server does not allow push')
|
... |
... |
@@ -284,9 +284,7 @@ class CASCache(): |
284
|
284
|
try:
|
285
|
285
|
remote.init()
|
286
|
286
|
|
287
|
|
- request = buildstream_pb2.GetReferenceRequest()
|
288
|
|
- request.key = ref
|
289
|
|
- response = remote.ref_storage.GetReference(request)
|
|
287
|
+ response = remote.get_reference(ref)
|
290
|
288
|
|
291
|
289
|
tree = remote_execution_pb2.Digest()
|
292
|
290
|
tree.hash = response.digest.hash
|
... |
... |
@@ -369,9 +367,7 @@ class CASCache(): |
369
|
367
|
# Check whether ref is already on the server in which case
|
370
|
368
|
# there is no need to push the ref
|
371
|
369
|
try:
|
372
|
|
- request = buildstream_pb2.GetReferenceRequest()
|
373
|
|
- request.key = ref
|
374
|
|
- response = remote.ref_storage.GetReference(request)
|
|
370
|
+ response = remote.get_reference(ref)
|
375
|
371
|
|
376
|
372
|
if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
377
|
373
|
# ref is already on the server with the same tree
|
... |
... |
@@ -384,11 +380,7 @@ class CASCache(): |
384
|
380
|
|
385
|
381
|
self._send_directory(remote, tree)
|
386
|
382
|
|
387
|
|
- request = buildstream_pb2.UpdateReferenceRequest()
|
388
|
|
- request.keys.append(ref)
|
389
|
|
- request.digest.hash = tree.hash
|
390
|
|
- request.digest.size_bytes = tree.size_bytes
|
391
|
|
- remote.ref_storage.UpdateReference(request)
|
|
383
|
+ remote.update_reference(ref, tree.hash, tree.size_bytes)
|
392
|
384
|
|
393
|
385
|
skipped_remote = False
|
394
|
386
|
except grpc.RpcError as e:
|
... |
... |
@@ -909,10 +901,8 @@ class CASCache(): |
909
|
901
|
|
910
|
902
|
def _fetch_blob(self, remote, digest, stream):
|
911
|
903
|
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
912
|
|
- request = bytestream_pb2.ReadRequest()
|
913
|
|
- request.resource_name = resource_name
|
914
|
|
- request.read_offset = 0
|
915
|
|
- for response in remote.bytestream.Read(request):
|
|
904
|
+ for response in remote.read(resource_name=resource_name, read_offset=0):
|
|
905
|
+ print(response)
|
916
|
906
|
stream.write(response.data)
|
917
|
907
|
stream.flush()
|
918
|
908
|
|
... |
... |
@@ -1067,27 +1057,7 @@ class CASCache(): |
1067
|
1057
|
resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
|
1068
|
1058
|
digest.hash, str(digest.size_bytes)])
|
1069
|
1059
|
|
1070
|
|
- def request_stream(resname, instream):
|
1071
|
|
- offset = 0
|
1072
|
|
- finished = False
|
1073
|
|
- remaining = digest.size_bytes
|
1074
|
|
- while not finished:
|
1075
|
|
- chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
1076
|
|
- remaining -= chunk_size
|
1077
|
|
-
|
1078
|
|
- request = bytestream_pb2.WriteRequest()
|
1079
|
|
- request.write_offset = offset
|
1080
|
|
- # max. _MAX_PAYLOAD_BYTES chunks
|
1081
|
|
- request.data = instream.read(chunk_size)
|
1082
|
|
- request.resource_name = resname
|
1083
|
|
- request.finish_write = remaining <= 0
|
1084
|
|
-
|
1085
|
|
- yield request
|
1086
|
|
-
|
1087
|
|
- offset += chunk_size
|
1088
|
|
- finished = request.finish_write
|
1089
|
|
-
|
1090
|
|
- response = remote.bytestream.Write(request_stream(resource_name, stream))
|
|
1060
|
+ response = remote.write(digest, stream, resource_name)
|
1091
|
1061
|
|
1092
|
1062
|
assert response.committed_size == digest.size_bytes
|
1093
|
1063
|
|
... |
... |
@@ -1097,14 +1067,16 @@ class CASCache(): |
1097
|
1067
|
missing_blobs = dict()
|
1098
|
1068
|
# Limit size of FindMissingBlobs request
|
1099
|
1069
|
for required_blobs_group in _grouper(required_blobs, 512):
|
1100
|
|
- request = remote_execution_pb2.FindMissingBlobsRequest()
|
1101
|
1070
|
|
|
1071
|
+ digests = []
|
1102
|
1072
|
for required_digest in required_blobs_group:
|
1103
|
|
- d = request.blob_digests.add()
|
|
1073
|
+ # d = request.blob_digests.add()
|
|
1074
|
+ d = remote_execution_pb2.Digest()
|
1104
|
1075
|
d.hash = required_digest.hash
|
1105
|
1076
|
d.size_bytes = required_digest.size_bytes
|
|
1077
|
+ digests.append(d)
|
|
1078
|
+ response = remote.find_missing_blobs(digests)
|
1106
|
1079
|
|
1107
|
|
- response = remote.cas.FindMissingBlobs(request)
|
1108
|
1080
|
for missing_digest in response.missing_blob_digests:
|
1109
|
1081
|
d = remote_execution_pb2.Digest()
|
1110
|
1082
|
d.hash = missing_digest.hash
|
... |
... |
@@ -1144,14 +1116,15 @@ class CASRemote(): |
1144
|
1116
|
self.spec = spec
|
1145
|
1117
|
self._initialized = False
|
1146
|
1118
|
self.channel = None
|
1147
|
|
- self.bytestream = None
|
1148
|
|
- self.cas = None
|
1149
|
|
- self.ref_storage = None
|
1150
|
1119
|
self.batch_update_supported = None
|
1151
|
1120
|
self.batch_read_supported = None
|
1152
|
|
- self.capabilities = None
|
1153
|
1121
|
self.max_batch_total_size_bytes = None
|
1154
|
1122
|
|
|
1123
|
+ self._bytestream_stub = None
|
|
1124
|
+ self._cas_stub = None
|
|
1125
|
+ self._capabilities_stub = None
|
|
1126
|
+ self._ref_storage_stub = None
|
|
1127
|
+
|
1155
|
1128
|
def init(self):
|
1156
|
1129
|
if not self._initialized:
|
1157
|
1130
|
url = urlparse(self.spec.url)
|
... |
... |
@@ -1186,15 +1159,15 @@ class CASRemote(): |
1186
|
1159
|
else:
|
1187
|
1160
|
raise CASError("Unsupported URL: {}".format(self.spec.url))
|
1188
|
1161
|
|
1189
|
|
- self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
1190
|
|
- self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
1191
|
|
- self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
|
1192
|
|
- self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
|
|
1162
|
+ self._bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
|
1163
|
+ self._cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
|
1164
|
+ self._capabilities_stub = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
|
|
1165
|
+ self._ref_storage_stub = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
|
1193
|
1166
|
|
1194
|
1167
|
self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
1195
|
1168
|
try:
|
1196
|
1169
|
request = remote_execution_pb2.GetCapabilitiesRequest()
|
1197
|
|
- response = self.capabilities.GetCapabilities(request)
|
|
1170
|
+ response = self._capabilities_stub.GetCapabilities(request)
|
1198
|
1171
|
server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
|
1199
|
1172
|
if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
|
1200
|
1173
|
self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
|
... |
... |
@@ -1207,7 +1180,7 @@ class CASRemote(): |
1207
|
1180
|
self.batch_read_supported = False
|
1208
|
1181
|
try:
|
1209
|
1182
|
request = remote_execution_pb2.BatchReadBlobsRequest()
|
1210
|
|
- response = self.cas.BatchReadBlobs(request)
|
|
1183
|
+ response = self._cas_stub.BatchReadBlobs(request)
|
1211
|
1184
|
self.batch_read_supported = True
|
1212
|
1185
|
except grpc.RpcError as e:
|
1213
|
1186
|
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
... |
... |
@@ -1217,7 +1190,7 @@ class CASRemote(): |
1217
|
1190
|
self.batch_update_supported = False
|
1218
|
1191
|
try:
|
1219
|
1192
|
request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
1220
|
|
- response = self.cas.BatchUpdateBlobs(request)
|
|
1193
|
+ response = self._cas_stub.BatchUpdateBlobs(request)
|
1221
|
1194
|
self.batch_update_supported = True
|
1222
|
1195
|
except grpc.RpcError as e:
|
1223
|
1196
|
if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
... |
... |
@@ -1226,6 +1199,68 @@ class CASRemote(): |
1226
|
1199
|
|
1227
|
1200
|
self._initialized = True
|
1228
|
1201
|
|
|
1202
|
+ def write(self, digest, stream, resource_name):
|
|
1203
|
+
|
|
1204
|
+ def __request_stream(resname, instream):
|
|
1205
|
+ offset = 0
|
|
1206
|
+ finished = False
|
|
1207
|
+ remaining = digest.size_bytes
|
|
1208
|
+ while not finished:
|
|
1209
|
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
1210
|
+ remaining -= chunk_size
|
|
1211
|
+
|
|
1212
|
+ request = bytestream_pb2.WriteRequest()
|
|
1213
|
+ request.write_offset = offset
|
|
1214
|
+ # max. _MAX_PAYLOAD_BYTES chunks
|
|
1215
|
+ request.data = instream.read(chunk_size)
|
|
1216
|
+ request.resource_name = resname
|
|
1217
|
+ request.finish_write = remaining <= 0
|
|
1218
|
+
|
|
1219
|
+ yield request
|
|
1220
|
+
|
|
1221
|
+ offset += chunk_size
|
|
1222
|
+ finished = request.finish_write
|
|
1223
|
+
|
|
1224
|
+ response = self._bytestream_stub.Write(__request_stream(resource_name, stream))
|
|
1225
|
+ return response
|
|
1226
|
+
|
|
1227
|
+ def read(self, resource_name, read_offset):
|
|
1228
|
+ request = bytestream_pb2.ReadRequest()
|
|
1229
|
+ request.resource_name = resource_name
|
|
1230
|
+ request.read_offset = read_offset
|
|
1231
|
+ response = self._bytestream_stub.Read(request)
|
|
1232
|
+ return response
|
|
1233
|
+
|
|
1234
|
+ def batch_read_blobs(self, request):
|
|
1235
|
+ response = self._cas_stub.BatchReadBlobs(request)
|
|
1236
|
+ return response
|
|
1237
|
+
|
|
1238
|
+ def find_missing_blobs(self, digests):
|
|
1239
|
+ request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
1240
|
+ request.blob_digests.extend(digests)
|
|
1241
|
+ response = self._cas_stub.FindMissingBlobs(request)
|
|
1242
|
+ return response
|
|
1243
|
+
|
|
1244
|
+ def batch_update_blobs(self, request):
|
|
1245
|
+ response = self._cas_stub.BatchUpdateBlobs(request)
|
|
1246
|
+ return response
|
|
1247
|
+
|
|
1248
|
+ def get_capabilities(self):
|
|
1249
|
+ request = remote_execution_pb2.GetCapabilitiesRequest()
|
|
1250
|
+ return self._capabilities_stub.GetCapabilities(request)
|
|
1251
|
+
|
|
1252
|
+ def get_reference(self, key):
|
|
1253
|
+ request = buildstream_pb2.GetReferenceRequest()
|
|
1254
|
+ request.key = key
|
|
1255
|
+ return self._ref_storage_stub.GetReference(request)
|
|
1256
|
+
|
|
1257
|
+ def update_reference(self, keys, digest_hash, digest_size_bytes):
|
|
1258
|
+ request = buildstream_pb2.UpdateReferenceRequest()
|
|
1259
|
+ request.keys.append(keys)
|
|
1260
|
+ request.digest.hash = digest_hash
|
|
1261
|
+ request.digest.size_bytes = digest_size_bytes
|
|
1262
|
+ return self._ref_storage_stub.UpdateReference(request)
|
|
1263
|
+
|
1229
|
1264
|
|
1230
|
1265
|
# Represents a batch of blobs queued for fetching.
|
1231
|
1266
|
#
|
... |
... |
@@ -1258,7 +1293,7 @@ class _CASBatchRead(): |
1258
|
1293
|
if not self._request.digests:
|
1259
|
1294
|
return
|
1260
|
1295
|
|
1261
|
|
- batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
|
1296
|
+ batch_response = self._remote.batch_read_blobs(self._request)
|
1262
|
1297
|
|
1263
|
1298
|
for response in batch_response.responses:
|
1264
|
1299
|
if response.status.code == code_pb2.NOT_FOUND:
|
... |
... |
@@ -1284,6 +1319,9 @@ class _CASBatchUpdate(): |
1284
|
1319
|
self._size = 0
|
1285
|
1320
|
self._sent = False
|
1286
|
1321
|
|
|
1322
|
+ self._hash = None
|
|
1323
|
+ self._size_bytes = None
|
|
1324
|
+
|
1287
|
1325
|
def add(self, digest, stream):
|
1288
|
1326
|
assert not self._sent
|
1289
|
1327
|
|
... |
... |
@@ -1296,7 +1334,7 @@ class _CASBatchUpdate(): |
1296
|
1334
|
blob_request.digest.hash = digest.hash
|
1297
|
1335
|
blob_request.digest.size_bytes = digest.size_bytes
|
1298
|
1336
|
blob_request.data = stream.read(digest.size_bytes)
|
1299
|
|
- self._size = new_batch_size
|
|
1337
|
+
|
1300
|
1338
|
return True
|
1301
|
1339
|
|
1302
|
1340
|
def send(self):
|
... |
... |
@@ -1306,7 +1344,7 @@ class _CASBatchUpdate(): |
1306
|
1344
|
if not self._request.requests:
|
1307
|
1345
|
return
|
1308
|
1346
|
|
1309
|
|
- batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
|
|
1347
|
+ batch_response = self._remote.batch_update_blobs(self._request)
|
1310
|
1348
|
|
1311
|
1349
|
for response in batch_response.responses:
|
1312
|
1350
|
if response.status.code != code_pb2.OK:
|