Jim MacArthur pushed to branch master at BuildStream / buildstream
Commits:
-
d2105909
by Raoul Hidalgo Charman at 2018-12-18T11:13:56Z
-
a3bbec23
by Jim MacArthur at 2018-12-18T11:13:56Z
-
89219f61
by Jim MacArthur at 2018-12-18T11:13:56Z
-
3dc20963
by Jim MacArthur at 2018-12-18T11:13:56Z
-
644d8b28
by Jim MacArthur at 2018-12-18T11:45:30Z
3 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/sandbox/_sandboxremote.py
- doc/source/format_project.rst
Changes:
... | ... | @@ -45,7 +45,7 @@ from .. import _yaml |
45 | 45 |
_MAX_PAYLOAD_BYTES = 1024 * 1024
|
46 | 46 |
|
47 | 47 |
|
48 |
-class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert')):
|
|
48 |
+class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')):
|
|
49 | 49 |
|
50 | 50 |
# _new_from_config_node
|
51 | 51 |
#
|
... | ... | @@ -53,7 +53,7 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key |
53 | 53 |
#
|
54 | 54 |
@staticmethod
|
55 | 55 |
def _new_from_config_node(spec_node, basedir=None):
|
56 |
- _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert'])
|
|
56 |
+ _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance_name'])
|
|
57 | 57 |
url = _yaml.node_get(spec_node, str, 'url')
|
58 | 58 |
push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
|
59 | 59 |
if not url:
|
... | ... | @@ -61,6 +61,8 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key |
61 | 61 |
raise LoadError(LoadErrorReason.INVALID_DATA,
|
62 | 62 |
"{}: empty artifact cache URL".format(provenance))
|
63 | 63 |
|
64 |
+ instance_name = _yaml.node_get(spec_node, str, 'instance_name', default_value=None)
|
|
65 |
+ |
|
64 | 66 |
server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
|
65 | 67 |
if server_cert and basedir:
|
66 | 68 |
server_cert = os.path.join(basedir, server_cert)
|
... | ... | @@ -83,10 +85,10 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key |
83 | 85 |
raise LoadError(LoadErrorReason.INVALID_DATA,
|
84 | 86 |
"{}: 'client-cert' was specified without 'client-key'".format(provenance))
|
85 | 87 |
|
86 |
- return CASRemoteSpec(url, push, server_cert, client_key, client_cert)
|
|
88 |
+ return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name)
|
|
87 | 89 |
|
88 | 90 |
|
89 |
-CASRemoteSpec.__new__.__defaults__ = (None, None, None)
|
|
91 |
+CASRemoteSpec.__new__.__defaults__ = (None, None, None, None)
|
|
90 | 92 |
|
91 | 93 |
|
92 | 94 |
class BlobNotFound(CASError):
|
... | ... | @@ -248,7 +250,7 @@ class CASCache(): |
248 | 250 |
remote = CASRemote(remote_spec)
|
249 | 251 |
remote.init()
|
250 | 252 |
|
251 |
- request = buildstream_pb2.StatusRequest()
|
|
253 |
+ request = buildstream_pb2.StatusRequest(instance_name=remote_spec.instance_name)
|
|
252 | 254 |
response = remote.ref_storage.Status(request)
|
253 | 255 |
|
254 | 256 |
if remote_spec.push and not response.allow_updates:
|
... | ... | @@ -284,7 +286,7 @@ class CASCache(): |
284 | 286 |
try:
|
285 | 287 |
remote.init()
|
286 | 288 |
|
287 |
- request = buildstream_pb2.GetReferenceRequest()
|
|
289 |
+ request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
|
|
288 | 290 |
request.key = ref
|
289 | 291 |
response = remote.ref_storage.GetReference(request)
|
290 | 292 |
|
... | ... | @@ -369,7 +371,7 @@ class CASCache(): |
369 | 371 |
# Check whether ref is already on the server in which case
|
370 | 372 |
# there is no need to push the ref
|
371 | 373 |
try:
|
372 |
- request = buildstream_pb2.GetReferenceRequest()
|
|
374 |
+ request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
|
|
373 | 375 |
request.key = ref
|
374 | 376 |
response = remote.ref_storage.GetReference(request)
|
375 | 377 |
|
... | ... | @@ -384,7 +386,7 @@ class CASCache(): |
384 | 386 |
|
385 | 387 |
self._send_directory(remote, tree)
|
386 | 388 |
|
387 |
- request = buildstream_pb2.UpdateReferenceRequest()
|
|
389 |
+ request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
|
|
388 | 390 |
request.keys.append(ref)
|
389 | 391 |
request.digest.hash = tree.hash
|
390 | 392 |
request.digest.size_bytes = tree.size_bytes
|
... | ... | @@ -448,7 +450,7 @@ class CASCache(): |
448 | 450 |
def verify_digest_on_remote(self, remote, digest):
|
449 | 451 |
remote.init()
|
450 | 452 |
|
451 |
- request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
453 |
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
|
|
452 | 454 |
request.blob_digests.extend([digest])
|
453 | 455 |
|
454 | 456 |
response = remote.cas.FindMissingBlobs(request)
|
... | ... | @@ -908,7 +910,13 @@ class CASCache(): |
908 | 910 |
yield from self._required_blobs(dirnode.digest)
|
909 | 911 |
|
910 | 912 |
def _fetch_blob(self, remote, digest, stream):
|
911 |
- resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
|
913 |
+ resource_name_components = ['blobs', digest.hash, str(digest.size_bytes)]
|
|
914 |
+ |
|
915 |
+ if remote.spec.instance_name:
|
|
916 |
+ resource_name_components.insert(0, remote.spec.instance_name)
|
|
917 |
+ |
|
918 |
+ resource_name = '/'.join(resource_name_components)
|
|
919 |
+ |
|
912 | 920 |
request = bytestream_pb2.ReadRequest()
|
913 | 921 |
request.resource_name = resource_name
|
914 | 922 |
request.read_offset = 0
|
... | ... | @@ -1064,8 +1072,13 @@ class CASCache(): |
1064 | 1072 |
return dirdigest
|
1065 | 1073 |
|
1066 | 1074 |
def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
|
1067 |
- resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
|
|
1068 |
- digest.hash, str(digest.size_bytes)])
|
|
1075 |
+ resource_name_components = ['uploads', str(u_uid), 'blobs',
|
|
1076 |
+ digest.hash, str(digest.size_bytes)]
|
|
1077 |
+ |
|
1078 |
+ if remote.spec.instance_name:
|
|
1079 |
+ resource_name_components.insert(0, remote.spec.instance_name)
|
|
1080 |
+ |
|
1081 |
+ resource_name = '/'.join(resource_name_components)
|
|
1069 | 1082 |
|
1070 | 1083 |
def request_stream(resname, instream):
|
1071 | 1084 |
offset = 0
|
... | ... | @@ -1097,7 +1110,7 @@ class CASCache(): |
1097 | 1110 |
missing_blobs = dict()
|
1098 | 1111 |
# Limit size of FindMissingBlobs request
|
1099 | 1112 |
for required_blobs_group in _grouper(required_blobs, 512):
|
1100 |
- request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
1113 |
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
|
|
1101 | 1114 |
|
1102 | 1115 |
for required_digest in required_blobs_group:
|
1103 | 1116 |
d = request.blob_digests.add()
|
... | ... | @@ -1193,7 +1206,7 @@ class CASRemote(): |
1193 | 1206 |
|
1194 | 1207 |
self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
1195 | 1208 |
try:
|
1196 |
- request = remote_execution_pb2.GetCapabilitiesRequest()
|
|
1209 |
+ request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=self.spec.instance_name)
|
|
1197 | 1210 |
response = self.capabilities.GetCapabilities(request)
|
1198 | 1211 |
server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
|
1199 | 1212 |
if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
|
... | ... | @@ -1206,7 +1219,7 @@ class CASRemote(): |
1206 | 1219 |
# Check whether the server supports BatchReadBlobs()
|
1207 | 1220 |
self.batch_read_supported = False
|
1208 | 1221 |
try:
|
1209 |
- request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1222 |
+ request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=self.spec.instance_name)
|
|
1210 | 1223 |
response = self.cas.BatchReadBlobs(request)
|
1211 | 1224 |
self.batch_read_supported = True
|
1212 | 1225 |
except grpc.RpcError as e:
|
... | ... | @@ -1216,7 +1229,7 @@ class CASRemote(): |
1216 | 1229 |
# Check whether the server supports BatchUpdateBlobs()
|
1217 | 1230 |
self.batch_update_supported = False
|
1218 | 1231 |
try:
|
1219 |
- request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1232 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self.spec.instance_name)
|
|
1220 | 1233 |
response = self.cas.BatchUpdateBlobs(request)
|
1221 | 1234 |
self.batch_update_supported = True
|
1222 | 1235 |
except grpc.RpcError as e:
|
... | ... | @@ -1233,7 +1246,7 @@ class _CASBatchRead(): |
1233 | 1246 |
def __init__(self, remote):
|
1234 | 1247 |
self._remote = remote
|
1235 | 1248 |
self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
1236 |
- self._request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
1249 |
+ self._request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=remote.spec.instance_name)
|
|
1237 | 1250 |
self._size = 0
|
1238 | 1251 |
self._sent = False
|
1239 | 1252 |
|
... | ... | @@ -1280,7 +1293,7 @@ class _CASBatchUpdate(): |
1280 | 1293 |
def __init__(self, remote):
|
1281 | 1294 |
self._remote = remote
|
1282 | 1295 |
self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
1283 |
- self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
1296 |
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=remote.spec.instance_name)
|
|
1284 | 1297 |
self._size = 0
|
1285 | 1298 |
self._sent = False
|
1286 | 1299 |
|
... | ... | @@ -61,15 +61,20 @@ class SandboxRemote(Sandbox): |
61 | 61 |
|
62 | 62 |
self.storage_url = config.storage_service['url']
|
63 | 63 |
self.exec_url = config.exec_service['url']
|
64 |
+ |
|
64 | 65 |
if config.action_service:
|
65 | 66 |
self.action_url = config.action_service['url']
|
66 | 67 |
else:
|
67 | 68 |
self.action_url = None
|
68 | 69 |
|
70 |
+ self.server_instance = config.exec_service.get('instance', None)
|
|
71 |
+ self.storage_instance = config.storage_service.get('instance', None)
|
|
72 |
+ |
|
69 | 73 |
self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True,
|
70 | 74 |
server_cert=config.storage_service['server-cert'],
|
71 | 75 |
client_key=config.storage_service['client-key'],
|
72 |
- client_cert=config.storage_service['client-cert'])
|
|
76 |
+ client_cert=config.storage_service['client-cert'],
|
|
77 |
+ instance_name=self.storage_instance)
|
|
73 | 78 |
self.operation_name = None
|
74 | 79 |
|
75 | 80 |
def info(self, msg):
|
... | ... | @@ -102,10 +107,10 @@ class SandboxRemote(Sandbox): |
102 | 107 |
['execution-service', 'storage-service', 'url', 'action-cache-service'])
|
103 | 108 |
remote_exec_service_config = require_node(remote_config, 'execution-service')
|
104 | 109 |
remote_exec_storage_config = require_node(remote_config, 'storage-service')
|
105 |
- remote_exec_action_config = remote_config.get('action-cache-service')
|
|
110 |
+ remote_exec_action_config = remote_config.get('action-cache-service', {})
|
|
106 | 111 |
|
107 |
- _yaml.node_validate(remote_exec_service_config, ['url'])
|
|
108 |
- _yaml.node_validate(remote_exec_storage_config, ['url'] + tls_keys)
|
|
112 |
+ _yaml.node_validate(remote_exec_service_config, ['url', 'instance'])
|
|
113 |
+ _yaml.node_validate(remote_exec_storage_config, ['url', 'instance'] + tls_keys)
|
|
109 | 114 |
if remote_exec_action_config:
|
110 | 115 |
_yaml.node_validate(remote_exec_action_config, ['url'])
|
111 | 116 |
else:
|
... | ... | @@ -132,7 +137,7 @@ class SandboxRemote(Sandbox): |
132 | 137 |
|
133 | 138 |
spec = RemoteExecutionSpec(remote_config['execution-service'],
|
134 | 139 |
remote_config['storage-service'],
|
135 |
- remote_config['action-cache-service'])
|
|
140 |
+ remote_exec_action_config)
|
|
136 | 141 |
return spec
|
137 | 142 |
|
138 | 143 |
def run_remote_command(self, channel, action_digest):
|
... | ... | @@ -142,7 +147,8 @@ class SandboxRemote(Sandbox): |
142 | 147 |
|
143 | 148 |
# Try to create a communication channel to the BuildGrid server.
|
144 | 149 |
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
|
145 |
- request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
|
|
150 |
+ request = remote_execution_pb2.ExecuteRequest(instance_name=self.server_instance,
|
|
151 |
+ action_digest=action_digest,
|
|
146 | 152 |
skip_cache_lookup=False)
|
147 | 153 |
|
148 | 154 |
def __run_remote_command(stub, execute_request=None, running_operation=None):
|
... | ... | @@ -233,11 +233,13 @@ using the `remote-execution` option: |
233 | 233 |
# A url defining a remote execution server
|
234 | 234 |
execution-service:
|
235 | 235 |
url: http://buildserver.example.com:50051
|
236 |
+ instance-name: development-emea-1
|
|
236 | 237 |
storage-service:
|
237 |
- - url: https://foo.com:11002/
|
|
238 |
+ url: https://foo.com:11002/
|
|
238 | 239 |
server-cert: server.crt
|
239 | 240 |
client-cert: client.crt
|
240 | 241 |
client-key: client.key
|
242 |
+ instance-name: development-emea-1
|
|
241 | 243 |
action-cache-service:
|
242 | 244 |
url: http://bar.action.com:50052
|
243 | 245 |
|
... | ... | @@ -257,6 +259,13 @@ caching. Remote execution cannot work without push access to the |
257 | 259 |
storage endpoint, so you must specify a client certificate and key,
|
258 | 260 |
and a server certificate.
|
259 | 261 |
|
262 |
+Instance name is optional. Instance names separate different shards on
|
|
263 |
+the same endpoint (url). You can supply a different instance name for
|
|
264 |
+`execution-service` and `storage-service`, if needed. The instance
|
|
265 |
+name should be given to you by the service provider of each
|
|
266 |
+service. Not all remote execution and storage services support
|
|
267 |
+instance names.
|
|
268 |
+ |
|
260 | 269 |
The Remote Execution API can be found via https://github.com/bazelbuild/remote-apis.
|
261 | 270 |
|
262 | 271 |
.. _project_essentials_mirrors:
|