... |
... |
@@ -45,7 +45,7 @@ from .. import _yaml |
45
|
45
|
_MAX_PAYLOAD_BYTES = 1024 * 1024
|
46
|
46
|
|
47
|
47
|
|
48
|
|
-class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert')):
|
|
48
|
+class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')):
|
49
|
49
|
|
50
|
50
|
# _new_from_config_node
|
51
|
51
|
#
|
... |
... |
@@ -53,7 +53,7 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key |
53
|
53
|
#
|
54
|
54
|
@staticmethod
|
55
|
55
|
def _new_from_config_node(spec_node, basedir=None):
|
56
|
|
- _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert'])
|
|
56
|
+ _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance_name'])
|
57
|
57
|
url = _yaml.node_get(spec_node, str, 'url')
|
58
|
58
|
push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
|
59
|
59
|
if not url:
|
... |
... |
@@ -61,6 +61,8 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key |
61
|
61
|
raise LoadError(LoadErrorReason.INVALID_DATA,
|
62
|
62
|
"{}: empty artifact cache URL".format(provenance))
|
63
|
63
|
|
|
64
|
+ instance_name = _yaml.node_get(spec_node, str, 'instance_name', default_value=None)
|
|
65
|
+
|
64
|
66
|
server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
|
65
|
67
|
if server_cert and basedir:
|
66
|
68
|
server_cert = os.path.join(basedir, server_cert)
|
... |
... |
@@ -83,10 +85,10 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key |
83
|
85
|
raise LoadError(LoadErrorReason.INVALID_DATA,
|
84
|
86
|
"{}: 'client-cert' was specified without 'client-key'".format(provenance))
|
85
|
87
|
|
86
|
|
- return CASRemoteSpec(url, push, server_cert, client_key, client_cert)
|
|
88
|
+ return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name)
|
87
|
89
|
|
88
|
90
|
|
89
|
|
-CASRemoteSpec.__new__.__defaults__ = (None, None, None)
|
|
91
|
+CASRemoteSpec.__new__.__defaults__ = (None, None, None, None)
|
90
|
92
|
|
91
|
93
|
|
92
|
94
|
class BlobNotFound(CASError):
|
... |
... |
@@ -248,7 +250,7 @@ class CASCache(): |
248
|
250
|
remote = CASRemote(remote_spec)
|
249
|
251
|
remote.init()
|
250
|
252
|
|
251
|
|
- request = buildstream_pb2.StatusRequest()
|
|
253
|
+ request = buildstream_pb2.StatusRequest(instance_name=remote_spec.instance_name)
|
252
|
254
|
response = remote.ref_storage.Status(request)
|
253
|
255
|
|
254
|
256
|
if remote_spec.push and not response.allow_updates:
|
... |
... |
@@ -284,7 +286,7 @@ class CASCache(): |
284
|
286
|
try:
|
285
|
287
|
remote.init()
|
286
|
288
|
|
287
|
|
- request = buildstream_pb2.GetReferenceRequest()
|
|
289
|
+ request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
|
288
|
290
|
request.key = ref
|
289
|
291
|
response = remote.ref_storage.GetReference(request)
|
290
|
292
|
|
... |
... |
@@ -369,7 +371,7 @@ class CASCache(): |
369
|
371
|
# Check whether ref is already on the server in which case
|
370
|
372
|
# there is no need to push the ref
|
371
|
373
|
try:
|
372
|
|
- request = buildstream_pb2.GetReferenceRequest()
|
|
374
|
+ request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
|
373
|
375
|
request.key = ref
|
374
|
376
|
response = remote.ref_storage.GetReference(request)
|
375
|
377
|
|
... |
... |
@@ -384,7 +386,7 @@ class CASCache(): |
384
|
386
|
|
385
|
387
|
self._send_directory(remote, tree)
|
386
|
388
|
|
387
|
|
- request = buildstream_pb2.UpdateReferenceRequest()
|
|
389
|
+ request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
|
388
|
390
|
request.keys.append(ref)
|
389
|
391
|
request.digest.hash = tree.hash
|
390
|
392
|
request.digest.size_bytes = tree.size_bytes
|
... |
... |
@@ -448,7 +450,7 @@ class CASCache(): |
448
|
450
|
def verify_digest_on_remote(self, remote, digest):
|
449
|
451
|
remote.init()
|
450
|
452
|
|
451
|
|
- request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
453
|
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
|
452
|
454
|
request.blob_digests.extend([digest])
|
453
|
455
|
|
454
|
456
|
response = remote.cas.FindMissingBlobs(request)
|
... |
... |
@@ -908,7 +910,10 @@ class CASCache(): |
908
|
910
|
yield from self._required_blobs(dirnode.digest)
|
909
|
911
|
|
910
|
912
|
def _fetch_blob(self, remote, digest, stream):
|
911
|
|
- resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
|
913
|
+ resource_name_components = ['blobs', digest.hash, str(digest.size_bytes)]
|
|
914
|
+ if remote.spec.instance_name:
|
|
915
|
+ resource_name_components.insert(0, remote.spec.instance_name)
|
|
916
|
+ resource_name = '/'.join(resource_name_components)
|
912
|
917
|
request = bytestream_pb2.ReadRequest()
|
913
|
918
|
request.resource_name = resource_name
|
914
|
919
|
request.read_offset = 0
|
... |
... |
@@ -1064,8 +1069,11 @@ class CASCache(): |
1064
|
1069
|
return dirdigest
|
1065
|
1070
|
|
1066
|
1071
|
def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
|
1067
|
|
- resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
|
1068
|
|
- digest.hash, str(digest.size_bytes)])
|
|
1072
|
+ resource_name_components = ['uploads', str(u_uid), 'blobs',
|
|
1073
|
+ digest.hash, str(digest.size_bytes)]
|
|
1074
|
+ if remote.spec.instance_name:
|
|
1075
|
+ resource_name_components.insert(0, remote.spec.instance_name)
|
|
1076
|
+ resource_name = '/'.join(resource_name_components)
|
1069
|
1077
|
|
1070
|
1078
|
def request_stream(resname, instream):
|
1071
|
1079
|
offset = 0
|
... |
... |
@@ -1097,7 +1105,7 @@ class CASCache(): |
1097
|
1105
|
missing_blobs = dict()
|
1098
|
1106
|
# Limit size of FindMissingBlobs request
|
1099
|
1107
|
for required_blobs_group in _grouper(required_blobs, 512):
|
1100
|
|
- request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
1108
|
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
|
1101
|
1109
|
|
1102
|
1110
|
for required_digest in required_blobs_group:
|
1103
|
1111
|
d = request.blob_digests.add()
|
... |
... |
@@ -1193,7 +1201,7 @@ class CASRemote(): |
1193
|
1201
|
|
1194
|
1202
|
self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
1195
|
1203
|
try:
|
1196
|
|
- request = remote_execution_pb2.GetCapabilitiesRequest()
|
|
1204
|
+ request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=self.spec.instance_name)
|
1197
|
1205
|
response = self.capabilities.GetCapabilities(request)
|
1198
|
1206
|
server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
|
1199
|
1207
|
if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
|
... |
... |
@@ -1206,7 +1214,7 @@ class CASRemote(): |
1206
|
1214
|
# Check whether the server supports BatchReadBlobs()
|
1207
|
1215
|
self.batch_read_supported = False
|
1208
|
1216
|
try:
|
1209
|
|
- request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1217
|
+ request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=self.spec.instance_name)
|
1210
|
1218
|
response = self.cas.BatchReadBlobs(request)
|
1211
|
1219
|
self.batch_read_supported = True
|
1212
|
1220
|
except grpc.RpcError as e:
|
... |
... |
@@ -1216,7 +1224,7 @@ class CASRemote(): |
1216
|
1224
|
# Check whether the server supports BatchUpdateBlobs()
|
1217
|
1225
|
self.batch_update_supported = False
|
1218
|
1226
|
try:
|
1219
|
|
- request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1227
|
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self.spec.instance_name)
|
1220
|
1228
|
response = self.cas.BatchUpdateBlobs(request)
|
1221
|
1229
|
self.batch_update_supported = True
|
1222
|
1230
|
except grpc.RpcError as e:
|
... |
... |
@@ -1233,7 +1241,7 @@ class _CASBatchRead(): |
1233
|
1241
|
def __init__(self, remote):
|
1234
|
1242
|
self._remote = remote
|
1235
|
1243
|
self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
1236
|
|
- self._request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1244
|
+ self._request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=remote.spec.instance_name)
|
1237
|
1245
|
self._size = 0
|
1238
|
1246
|
self._sent = False
|
1239
|
1247
|
|
... |
... |
@@ -1280,7 +1288,7 @@ class _CASBatchUpdate(): |
1280
|
1288
|
def __init__(self, remote):
|
1281
|
1289
|
self._remote = remote
|
1282
|
1290
|
self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
1283
|
|
- self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1291
|
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=remote.spec.instance_name)
|
1284
|
1292
|
self._size = 0
|
1285
|
1293
|
self._sent = False
|
1286
|
1294
|
|