... |
... |
@@ -26,6 +26,8 @@ from functools import partial |
26
|
26
|
|
27
|
27
|
import grpc
|
28
|
28
|
|
|
29
|
+from buildstream._message import Message, MessageType
|
|
30
|
+from buildstream import utils
|
29
|
31
|
from . import Sandbox, SandboxCommandError
|
30
|
32
|
from .sandbox import _SandboxBatch
|
31
|
33
|
from ..storage._filebaseddirectory import FileBasedDirectory
|
... |
... |
@@ -39,7 +41,7 @@ from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc |
39
|
41
|
from .._artifactcache.cascache import CASRemote, CASRemoteSpec
|
40
|
42
|
|
41
|
43
|
|
42
|
|
-class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service')):
|
|
44
|
+class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')):
|
43
|
45
|
pass
|
44
|
46
|
|
45
|
47
|
|
... |
... |
@@ -59,6 +61,10 @@ class SandboxRemote(Sandbox): |
59
|
61
|
|
60
|
62
|
self.storage_url = config.storage_service['url']
|
61
|
63
|
self.exec_url = config.exec_service['url']
|
|
64
|
+ if config.action_service:
|
|
65
|
+ self.action_url = config.action_service['url']
|
|
66
|
+ else:
|
|
67
|
+ self.action_url = None
|
62
|
68
|
|
63
|
69
|
self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True,
|
64
|
70
|
server_cert=config.storage_service['server-cert'],
|
... |
... |
@@ -66,6 +72,9 @@ class SandboxRemote(Sandbox): |
66
|
72
|
client_cert=config.storage_service['client-cert'])
|
67
|
73
|
self.operation_name = None
|
68
|
74
|
|
|
75
|
+ def info(self, msg):
|
|
76
|
+ self._get_context().message(Message(None, MessageType.INFO, msg))
|
|
77
|
+
|
69
|
78
|
@staticmethod
|
70
|
79
|
def specs_from_config_node(config_node, basedir):
|
71
|
80
|
|
... |
... |
@@ -88,12 +97,19 @@ class SandboxRemote(Sandbox): |
88
|
97
|
|
89
|
98
|
tls_keys = ['client-key', 'client-cert', 'server-cert']
|
90
|
99
|
|
91
|
|
- _yaml.node_validate(remote_config, ['execution-service', 'storage-service', 'url'])
|
|
100
|
+ _yaml.node_validate(
|
|
101
|
+ remote_config,
|
|
102
|
+ ['execution-service', 'storage-service', 'url', 'action-cache-service'])
|
92
|
103
|
remote_exec_service_config = require_node(remote_config, 'execution-service')
|
93
|
104
|
remote_exec_storage_config = require_node(remote_config, 'storage-service')
|
|
105
|
+ remote_exec_action_config = remote_config.get('action-cache-service')
|
94
|
106
|
|
95
|
107
|
_yaml.node_validate(remote_exec_service_config, ['url'])
|
96
|
108
|
_yaml.node_validate(remote_exec_storage_config, ['url'] + tls_keys)
|
|
109
|
+ if remote_exec_action_config:
|
|
110
|
+ _yaml.node_validate(remote_exec_action_config, ['url'])
|
|
111
|
+ else:
|
|
112
|
+ remote_config['action-service'] = None
|
97
|
113
|
|
98
|
114
|
if 'url' in remote_config:
|
99
|
115
|
if 'execution-service' not in remote_config:
|
... |
... |
@@ -114,52 +130,17 @@ class SandboxRemote(Sandbox): |
114
|
130
|
"remote-execution configuration. Your config is missing '{}'."
|
115
|
131
|
.format(str(provenance), tls_keys, key))
|
116
|
132
|
|
117
|
|
- spec = RemoteExecutionSpec(remote_config['execution-service'], remote_config['storage-service'])
|
|
133
|
+ spec = RemoteExecutionSpec(remote_config['execution-service'],
|
|
134
|
+ remote_config['storage-service'],
|
|
135
|
+ remote_config['action-cache-service'])
|
118
|
136
|
return spec
|
119
|
137
|
|
120
|
|
- def run_remote_command(self, command, input_root_digest, working_directory, environment):
|
|
138
|
+ def run_remote_command(self, channel, action_digest):
|
121
|
139
|
# Sends an execution request to the remote execution server.
|
122
|
140
|
#
|
123
|
141
|
# This function blocks until it gets a response from the server.
|
124
|
|
- #
|
125
|
|
- environment_variables = [remote_execution_pb2.Command.
|
126
|
|
- EnvironmentVariable(name=k, value=v)
|
127
|
|
- for (k, v) in environment.items()]
|
128
|
|
-
|
129
|
|
- # Create and send the Command object.
|
130
|
|
- remote_command = remote_execution_pb2.Command(arguments=command,
|
131
|
|
- working_directory=working_directory,
|
132
|
|
- environment_variables=environment_variables,
|
133
|
|
- output_files=[],
|
134
|
|
- output_directories=[self._output_directory],
|
135
|
|
- platform=None)
|
136
|
|
- context = self._get_context()
|
137
|
|
- cascache = context.get_cascache()
|
138
|
|
- casremote = CASRemote(self.storage_remote_spec)
|
139
|
|
-
|
140
|
|
- # Upload the Command message to the remote CAS server
|
141
|
|
- command_digest = cascache.push_message(casremote, remote_command)
|
142
|
|
-
|
143
|
|
- # Create and send the action.
|
144
|
|
- action = remote_execution_pb2.Action(command_digest=command_digest,
|
145
|
|
- input_root_digest=input_root_digest,
|
146
|
|
- timeout=None,
|
147
|
|
- do_not_cache=False)
|
148
|
|
-
|
149
|
|
- # Upload the Action message to the remote CAS server
|
150
|
|
- action_digest = cascache.push_message(casremote, action)
|
151
|
|
-
|
152
|
|
- # Next, try to create a communication channel to the BuildGrid server.
|
153
|
|
- url = urlparse(self.exec_url)
|
154
|
|
- if not url.port:
|
155
|
|
- raise SandboxError("You must supply a protocol and port number in the execution-service url, "
|
156
|
|
- "for example: http://buildservice:50051.")
|
157
|
|
- if url.scheme == 'http':
|
158
|
|
- channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
|
159
|
|
- else:
|
160
|
|
- raise SandboxError("Remote execution currently only supports the 'http' protocol "
|
161
|
|
- "and '{}' was supplied.".format(url.scheme))
|
162
|
142
|
|
|
143
|
+ # Try to create a communication channel to the BuildGrid server.
|
163
|
144
|
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
|
164
|
145
|
request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
|
165
|
146
|
skip_cache_lookup=False)
|
... |
... |
@@ -279,13 +260,12 @@ class SandboxRemote(Sandbox): |
279
|
260
|
# to replace the sandbox's virtual directory with that. Creating a new virtual directory object
|
280
|
261
|
# from another hash will be interesting, though...
|
281
|
262
|
|
282
|
|
- new_dir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=dir_digest)
|
|
263
|
+ new_dir = CasBasedDirectory(context.artifactcache.cas, ref=dir_digest)
|
283
|
264
|
self._set_virtual_directory(new_dir)
|
284
|
265
|
|
285
|
266
|
def _run(self, command, flags, *, cwd, env):
|
286
|
|
- # Upload sources
|
|
267
|
+ # set up virtual dircetory
|
287
|
268
|
upload_vdir = self.get_virtual_directory()
|
288
|
|
-
|
289
|
269
|
cascache = self._get_context().get_cascache()
|
290
|
270
|
if isinstance(upload_vdir, FileBasedDirectory):
|
291
|
271
|
# Make a new temporary directory to put source in
|
... |
... |
@@ -294,16 +274,111 @@ class SandboxRemote(Sandbox): |
294
|
274
|
|
295
|
275
|
upload_vdir.recalculate_hash()
|
296
|
276
|
|
297
|
|
- casremote = CASRemote(self.storage_remote_spec)
|
298
|
|
- # Now, push that key (without necessarily needing a ref) to the remote.
|
|
277
|
+ # Generate action_digest first
|
|
278
|
+ input_root_digest = upload_vdir.ref
|
|
279
|
+ command_proto = self._create_command(command, cwd, env)
|
|
280
|
+ command_digest = utils._message_digest(command_proto.SerializeToString())
|
|
281
|
+ action = remote_execution_pb2.Action(command_digest=command_digest,
|
|
282
|
+ input_root_digest=input_root_digest)
|
|
283
|
+ action_digest = utils._message_digest(action.SerializeToString())
|
|
284
|
+
|
|
285
|
+ # Next, try to create a communication channel to the BuildGrid server.
|
|
286
|
+ url = urlparse(self.exec_url)
|
|
287
|
+ if not url.port:
|
|
288
|
+ raise SandboxError("You must supply a protocol and port number in the execution-service url, "
|
|
289
|
+ "for example: http://buildservice:50051.")
|
|
290
|
+ if url.scheme == 'http':
|
|
291
|
+ channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
|
|
292
|
+ else:
|
|
293
|
+ raise SandboxError("Remote execution currently only supports the 'http' protocol "
|
|
294
|
+ "and '{}' was supplied.".format(url.scheme))
|
|
295
|
+
|
|
296
|
+ # check action cache download and download if there
|
|
297
|
+ action_result = self._check_action_cache(action_digest)
|
|
298
|
+
|
|
299
|
+ if not action_result:
|
|
300
|
+ casremote = CASRemote(self.storage_remote_spec)
|
|
301
|
+
|
|
302
|
+ # Now, push that key (without necessarily needing a ref) to the remote.
|
|
303
|
+ try:
|
|
304
|
+ cascache.push_directory(casremote, upload_vdir)
|
|
305
|
+ except grpc.RpcError as e:
|
|
306
|
+ raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
|
|
307
|
+
|
|
308
|
+ if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
|
|
309
|
+ raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
|
|
310
|
+
|
|
311
|
+ # Push command and action
|
|
312
|
+ try:
|
|
313
|
+ cascache.push_message(casremote, command_proto)
|
|
314
|
+ except grpc.RpcError as e:
|
|
315
|
+ raise SandboxError("Failed to push command to remote: {}".format(e))
|
|
316
|
+
|
|
317
|
+ try:
|
|
318
|
+ cascache.push_message(casremote, action)
|
|
319
|
+ except grpc.RpcError as e:
|
|
320
|
+ raise SandboxError("Failed to push action to remote: {}".format(e))
|
|
321
|
+
|
|
322
|
+ # Now request to execute the action
|
|
323
|
+ operation = self.run_remote_command(channel, action_digest)
|
|
324
|
+ action_result = self._extract_action_result(operation)
|
|
325
|
+
|
|
326
|
+ if action_result.exit_code != 0:
|
|
327
|
+ # A normal error during the build: the remote execution system
|
|
328
|
+ # has worked correctly but the command failed.
|
|
329
|
+ # action_result.stdout and action_result.stderr also contains
|
|
330
|
+ # build command outputs which we ignore at the moment.
|
|
331
|
+ return action_result.exit_code
|
|
332
|
+
|
|
333
|
+ # Get output of build
|
|
334
|
+ self.process_job_output(action_result.output_directories, action_result.output_files)
|
|
335
|
+
|
|
336
|
+ return 0
|
|
337
|
+
|
|
338
|
+ def _check_action_cache(self, action_digest):
|
|
339
|
+ # Checks the action cache to see if this artifact has already been built
|
|
340
|
+ #
|
|
341
|
+ # Should return either the action response or None if not found, raise
|
|
342
|
+ # Sandboxerror if other grpc error was raised
|
|
343
|
+ if not self.action_url:
|
|
344
|
+ return None
|
|
345
|
+ url = urlparse(self.action_url)
|
|
346
|
+ if not url.port:
|
|
347
|
+ raise SandboxError("You must supply a protocol and port number in the action-cache-service url, "
|
|
348
|
+ "for example: http://buildservice:50051.")
|
|
349
|
+ if not url.scheme == "http":
|
|
350
|
+ raise SandboxError("Currently only support http for the action cache"
|
|
351
|
+ "and {} was supplied".format(url.scheme))
|
|
352
|
+
|
|
353
|
+ channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
|
|
354
|
+ request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
|
|
355
|
+ stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
|
299
|
356
|
try:
|
300
|
|
- cascache.push_directory(casremote, upload_vdir)
|
|
357
|
+ result = stub.GetActionResult(request)
|
301
|
358
|
except grpc.RpcError as e:
|
302
|
|
- raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
|
|
359
|
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
360
|
+ raise SandboxError("Failed to query action cache: {} ({})"
|
|
361
|
+ .format(e.code(), e.details()))
|
|
362
|
+ else:
|
|
363
|
+ return None
|
|
364
|
+ else:
|
|
365
|
+ self.info("Action result found in action cache")
|
|
366
|
+ return result
|
303
|
367
|
|
304
|
|
- # Now transmit the command to execute
|
305
|
|
- operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
|
|
368
|
+ def _create_command(self, command, working_directory, environment):
|
|
369
|
+ # Creates a command proto
|
|
370
|
+ environment_variables = [remote_execution_pb2.Command.
|
|
371
|
+ EnvironmentVariable(name=k, value=v)
|
|
372
|
+ for (k, v) in environment.items()]
|
|
373
|
+ return remote_execution_pb2.Command(arguments=command,
|
|
374
|
+ working_directory=working_directory,
|
|
375
|
+ environment_variables=environment_variables,
|
|
376
|
+ output_files=[],
|
|
377
|
+ output_directories=[self._output_directory],
|
|
378
|
+ platform=None)
|
306
|
379
|
|
|
380
|
+ @staticmethod
|
|
381
|
+ def _extract_action_result(operation):
|
307
|
382
|
if operation is None:
|
308
|
383
|
# Failure of remote execution, usually due to an error in BuildStream
|
309
|
384
|
raise SandboxError("No response returned from server")
|
... |
... |
@@ -324,18 +399,7 @@ class SandboxRemote(Sandbox): |
324
|
399
|
else:
|
325
|
400
|
raise SandboxError("Remote server failed at executing the build request.")
|
326
|
401
|
|
327
|
|
- action_result = execution_response.result
|
328
|
|
-
|
329
|
|
- if action_result.exit_code != 0:
|
330
|
|
- # A normal error during the build: the remote execution system
|
331
|
|
- # has worked correctly but the command failed.
|
332
|
|
- # action_result.stdout and action_result.stderr also contains
|
333
|
|
- # build command outputs which we ignore at the moment.
|
334
|
|
- return action_result.exit_code
|
335
|
|
-
|
336
|
|
- self.process_job_output(action_result.output_directories, action_result.output_files)
|
337
|
|
-
|
338
|
|
- return 0
|
|
402
|
+ return execution_response.result
|
339
|
403
|
|
340
|
404
|
def _create_batch(self, main_group, flags, *, collect=None):
|
341
|
405
|
return _SandboxRemoteBatch(self, main_group, flags, collect=collect)
|