[Notes] [Git][BuildStream/buildstream][raoul/627-RE-instance-config] 3 commits: Add remote execution instance option



Title: GitLab

Jim MacArthur pushed to branch raoul/627-RE-instance-config at BuildStream / buildstream

Commits:

2 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -45,7 +45,7 @@ from .. import _yaml
    45 45
     _MAX_PAYLOAD_BYTES = 1024 * 1024
    
    46 46
     
    
    47 47
     
    
    48
    -class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert')):
    
    48
    +class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')):
    
    49 49
     
    
    50 50
         # _new_from_config_node
    
    51 51
         #
    
    ... ... @@ -53,7 +53,7 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
    53 53
         #
    
    54 54
         @staticmethod
    
    55 55
         def _new_from_config_node(spec_node, basedir=None):
    
    56
    -        _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert'])
    
    56
    +        _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance_name'])
    
    57 57
             url = _yaml.node_get(spec_node, str, 'url')
    
    58 58
             push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
    
    59 59
             if not url:
    
    ... ... @@ -61,6 +61,8 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
    61 61
                 raise LoadError(LoadErrorReason.INVALID_DATA,
    
    62 62
                                 "{}: empty artifact cache URL".format(provenance))
    
    63 63
     
    
    64
    +        instance_name = _yaml.node_get(spec_node, str, 'instance_name', default_value=None)
    
    65
    +
    
    64 66
             server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
    
    65 67
             if server_cert and basedir:
    
    66 68
                 server_cert = os.path.join(basedir, server_cert)
    
    ... ... @@ -83,10 +85,10 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
    83 85
                 raise LoadError(LoadErrorReason.INVALID_DATA,
    
    84 86
                                 "{}: 'client-cert' was specified without 'client-key'".format(provenance))
    
    85 87
     
    
    86
    -        return CASRemoteSpec(url, push, server_cert, client_key, client_cert)
    
    88
    +        return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name)
    
    87 89
     
    
    88 90
     
    
    89
    -CASRemoteSpec.__new__.__defaults__ = (None, None, None)
    
    91
    +CASRemoteSpec.__new__.__defaults__ = (None, None, None, None)
    
    90 92
     
    
    91 93
     
    
    92 94
     class BlobNotFound(CASError):
    
    ... ... @@ -248,7 +250,7 @@ class CASCache():
    248 250
                 remote = CASRemote(remote_spec)
    
    249 251
                 remote.init()
    
    250 252
     
    
    251
    -            request = buildstream_pb2.StatusRequest()
    
    253
    +            request = buildstream_pb2.StatusRequest(instance_name=remote_spec.instance_name)
    
    252 254
                 response = remote.ref_storage.Status(request)
    
    253 255
     
    
    254 256
                 if remote_spec.push and not response.allow_updates:
    
    ... ... @@ -284,7 +286,7 @@ class CASCache():
    284 286
             try:
    
    285 287
                 remote.init()
    
    286 288
     
    
    287
    -            request = buildstream_pb2.GetReferenceRequest()
    
    289
    +            request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
    
    288 290
                 request.key = ref
    
    289 291
                 response = remote.ref_storage.GetReference(request)
    
    290 292
     
    
    ... ... @@ -369,7 +371,7 @@ class CASCache():
    369 371
                     # Check whether ref is already on the server in which case
    
    370 372
                     # there is no need to push the ref
    
    371 373
                     try:
    
    372
    -                    request = buildstream_pb2.GetReferenceRequest()
    
    374
    +                    request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
    
    373 375
                         request.key = ref
    
    374 376
                         response = remote.ref_storage.GetReference(request)
    
    375 377
     
    
    ... ... @@ -384,7 +386,7 @@ class CASCache():
    384 386
     
    
    385 387
                     self._send_directory(remote, tree)
    
    386 388
     
    
    387
    -                request = buildstream_pb2.UpdateReferenceRequest()
    
    389
    +                request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
    
    388 390
                     request.keys.append(ref)
    
    389 391
                     request.digest.hash = tree.hash
    
    390 392
                     request.digest.size_bytes = tree.size_bytes
    
    ... ... @@ -448,7 +450,7 @@ class CASCache():
    448 450
         def verify_digest_on_remote(self, remote, digest):
    
    449 451
             remote.init()
    
    450 452
     
    
    451
    -        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    453
    +        request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
    
    452 454
             request.blob_digests.extend([digest])
    
    453 455
     
    
    454 456
             response = remote.cas.FindMissingBlobs(request)
    
    ... ... @@ -908,7 +910,10 @@ class CASCache():
    908 910
                 yield from self._required_blobs(dirnode.digest)
    
    909 911
     
    
    910 912
         def _fetch_blob(self, remote, digest, stream):
    
    911
    -        resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    913
    +        resource_name_components = ['blobs', digest.hash, str(digest.size_bytes)]
    
    914
    +        if remote.spec.instance_name:
    
    915
    +            resource_name_components.insert(0, remote.spec.instance_name)
    
    916
    +        resource_name = '/'.join(resource_name_components)
    
    912 917
             request = bytestream_pb2.ReadRequest()
    
    913 918
             request.resource_name = resource_name
    
    914 919
             request.read_offset = 0
    
    ... ... @@ -1064,8 +1069,11 @@ class CASCache():
    1064 1069
             return dirdigest
    
    1065 1070
     
    
    1066 1071
         def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
    
    1067
    -        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
    
    1068
    -                                  digest.hash, str(digest.size_bytes)])
    
    1072
    +        resource_name_components = ['uploads', str(u_uid), 'blobs',
    
    1073
    +                                    digest.hash, str(digest.size_bytes)]
    
    1074
    +        if remote.spec.instance_name:
    
    1075
    +            resource_name_components.insert(0, remote.spec.instance_name)
    
    1076
    +        resource_name = '/'.join(resource_name_components)
    
    1069 1077
     
    
    1070 1078
             def request_stream(resname, instream):
    
    1071 1079
                 offset = 0
    
    ... ... @@ -1097,7 +1105,7 @@ class CASCache():
    1097 1105
             missing_blobs = dict()
    
    1098 1106
             # Limit size of FindMissingBlobs request
    
    1099 1107
             for required_blobs_group in _grouper(required_blobs, 512):
    
    1100
    -            request = remote_execution_pb2.FindMissingBlobsRequest()
    
    1108
    +            request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
    
    1101 1109
     
    
    1102 1110
                 for required_digest in required_blobs_group:
    
    1103 1111
                     d = request.blob_digests.add()
    
    ... ... @@ -1193,7 +1201,7 @@ class CASRemote():
    1193 1201
     
    
    1194 1202
                 self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    1195 1203
                 try:
    
    1196
    -                request = remote_execution_pb2.GetCapabilitiesRequest()
    
    1204
    +                request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=self.spec.instance_name)
    
    1197 1205
                     response = self.capabilities.GetCapabilities(request)
    
    1198 1206
                     server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
    
    1199 1207
                     if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
    
    ... ... @@ -1206,7 +1214,7 @@ class CASRemote():
    1206 1214
                 # Check whether the server supports BatchReadBlobs()
    
    1207 1215
                 self.batch_read_supported = False
    
    1208 1216
                 try:
    
    1209
    -                request = remote_execution_pb2.BatchReadBlobsRequest()
    
    1217
    +                request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=self.spec.instance_name)
    
    1210 1218
                     response = self.cas.BatchReadBlobs(request)
    
    1211 1219
                     self.batch_read_supported = True
    
    1212 1220
                 except grpc.RpcError as e:
    
    ... ... @@ -1216,7 +1224,7 @@ class CASRemote():
    1216 1224
                 # Check whether the server supports BatchUpdateBlobs()
    
    1217 1225
                 self.batch_update_supported = False
    
    1218 1226
                 try:
    
    1219
    -                request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1227
    +                request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self.spec.instance_name)
    
    1220 1228
                     response = self.cas.BatchUpdateBlobs(request)
    
    1221 1229
                     self.batch_update_supported = True
    
    1222 1230
                 except grpc.RpcError as e:
    
    ... ... @@ -1233,7 +1241,7 @@ class _CASBatchRead():
    1233 1241
         def __init__(self, remote):
    
    1234 1242
             self._remote = remote
    
    1235 1243
             self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1236
    -        self._request = remote_execution_pb2.BatchReadBlobsRequest()
    
    1244
    +        self._request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=remote.spec.instance_name)
    
    1237 1245
             self._size = 0
    
    1238 1246
             self._sent = False
    
    1239 1247
     
    
    ... ... @@ -1280,7 +1288,7 @@ class _CASBatchUpdate():
    1280 1288
         def __init__(self, remote):
    
    1281 1289
             self._remote = remote
    
    1282 1290
             self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1283
    -        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    1291
    +        self._request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=remote.spec.instance_name)
    
    1284 1292
             self._size = 0
    
    1285 1293
             self._sent = False
    
    1286 1294
     
    

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -61,15 +61,20 @@ class SandboxRemote(Sandbox):
    61 61
     
    
    62 62
             self.storage_url = config.storage_service['url']
    
    63 63
             self.exec_url = config.exec_service['url']
    
    64
    +
    
    64 65
             if config.action_service:
    
    65 66
                 self.action_url = config.action_service['url']
    
    66 67
             else:
    
    67 68
                 self.action_url = None
    
    68 69
     
    
    70
    +        self.server_instance = config.exec_service.get('instance', None)
    
    71
    +        self.storage_instance = config.storage_service.get('instance', None)
    
    72
    +
    
    69 73
             self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True,
    
    70 74
                                                      server_cert=config.storage_service['server-cert'],
    
    71 75
                                                      client_key=config.storage_service['client-key'],
    
    72
    -                                                 client_cert=config.storage_service['client-cert'])
    
    76
    +                                                 client_cert=config.storage_service['client-cert'],
    
    77
    +                                                 instance_name=self.storage_instance)
    
    73 78
             self.operation_name = None
    
    74 79
     
    
    75 80
         def info(self, msg):
    
    ... ... @@ -104,7 +109,7 @@ class SandboxRemote(Sandbox):
    104 109
             remote_exec_storage_config = require_node(remote_config, 'storage-service')
    
    105 110
             remote_exec_action_config = remote_config.get('action-cache-service')
    
    106 111
     
    
    107
    -        _yaml.node_validate(remote_exec_service_config, ['url'])
    
    112
    +        _yaml.node_validate(remote_exec_service_config, ['url', 'instance'])
    
    108 113
             _yaml.node_validate(remote_exec_storage_config, ['url'] + tls_keys)
    
    109 114
             if remote_exec_action_config:
    
    110 115
                 _yaml.node_validate(remote_exec_action_config, ['url'])
    
    ... ... @@ -142,7 +147,8 @@ class SandboxRemote(Sandbox):
    142 147
     
    
    143 148
             # Try to create a communication channel to the BuildGrid server.
    
    144 149
             stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    145
    -        request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
    
    150
    +        request = remote_execution_pb2.ExecuteRequest(instance_name=self.server_instance,
    
    151
    +                                                      action_digest=action_digest,
    
    146 152
                                                           skip_cache_lookup=False)
    
    147 153
     
    
    148 154
             def __run_remote_command(stub, execute_request=None, running_operation=None):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]