... |
... |
@@ -26,6 +26,7 @@ from functools import partial |
26
|
26
|
|
27
|
27
|
import grpc
|
28
|
28
|
|
|
29
|
+from buildstream import utils
|
29
|
30
|
from . import Sandbox, SandboxCommandError
|
30
|
31
|
from .sandbox import _SandboxBatch
|
31
|
32
|
from ..storage._filebaseddirectory import FileBasedDirectory
|
... |
... |
@@ -117,52 +118,12 @@ class SandboxRemote(Sandbox): |
117
|
118
|
spec = RemoteExecutionSpec(remote_config['execution-service'], remote_config['storage-service'])
|
118
|
119
|
return spec
|
119
|
120
|
|
120
|
|
- def run_remote_command(self, command, input_root_digest, working_directory, environment):
|
|
121
|
+ def run_remote_command(self, channel, action_digest):
|
121
|
122
|
# Sends an execution request to the remote execution server.
|
122
|
123
|
#
|
123
|
124
|
# This function blocks until it gets a response from the server.
|
124
|
|
- #
|
125
|
|
- environment_variables = [remote_execution_pb2.Command.
|
126
|
|
- EnvironmentVariable(name=k, value=v)
|
127
|
|
- for (k, v) in environment.items()]
|
128
|
|
-
|
129
|
|
- # Create and send the Command object.
|
130
|
|
- remote_command = remote_execution_pb2.Command(arguments=command,
|
131
|
|
- working_directory=working_directory,
|
132
|
|
- environment_variables=environment_variables,
|
133
|
|
- output_files=[],
|
134
|
|
- output_directories=[self._output_directory],
|
135
|
|
- platform=None)
|
136
|
|
- context = self._get_context()
|
137
|
|
- cascache = context.get_cascache()
|
138
|
|
- casremote = CASRemote(self.storage_remote_spec)
|
139
|
|
-
|
140
|
|
- # Upload the Command message to the remote CAS server
|
141
|
|
- command_digest = cascache.push_message(casremote, remote_command)
|
142
|
|
- if not command_digest or not cascache.verify_digest_on_remote(casremote, command_digest):
|
143
|
|
- raise SandboxError("Failed pushing build command to remote CAS.")
|
144
|
|
- # Create and send the action.
|
145
|
|
- action = remote_execution_pb2.Action(command_digest=command_digest,
|
146
|
|
- input_root_digest=input_root_digest,
|
147
|
|
- timeout=None,
|
148
|
|
- do_not_cache=False)
|
149
|
|
-
|
150
|
|
- # Upload the Action message to the remote CAS server
|
151
|
|
- action_digest = cascache.push_message(casremote, action)
|
152
|
|
- if not action_digest or not cascache.verify_digest_on_remote(casremote, action_digest):
|
153
|
|
- raise SandboxError("Failed pushing build action to remote CAS.")
|
154
|
|
-
|
155
|
|
- # Next, try to create a communication channel to the BuildGrid server.
|
156
|
|
- url = urlparse(self.exec_url)
|
157
|
|
- if not url.port:
|
158
|
|
- raise SandboxError("You must supply a protocol and port number in the execution-service url, "
|
159
|
|
- "for example: http://buildservice:50051.")
|
160
|
|
- if url.scheme == 'http':
|
161
|
|
- channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
|
162
|
|
- else:
|
163
|
|
- raise SandboxError("Remote execution currently only supports the 'http' protocol "
|
164
|
|
- "and '{}' was supplied.".format(url.scheme))
|
165
|
125
|
|
|
126
|
+ # Try to create a communication channel to the BuildGrid server.
|
166
|
127
|
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
|
167
|
128
|
request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
|
168
|
129
|
skip_cache_lookup=False)
|
... |
... |
@@ -282,13 +243,12 @@ class SandboxRemote(Sandbox): |
282
|
243
|
# to replace the sandbox's virtual directory with that. Creating a new virtual directory object
|
283
|
244
|
# from another hash will be interesting, though...
|
284
|
245
|
|
285
|
|
- new_dir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=dir_digest)
|
|
246
|
+ new_dir = CasBasedDirectory(context.artifactcache.cas, ref=dir_digest)
|
286
|
247
|
self._set_virtual_directory(new_dir)
|
287
|
248
|
|
288
|
249
|
def _run(self, command, flags, *, cwd, env):
|
289
|
|
- # Upload sources
|
|
250
|
+ # set up virtual dircetory
|
290
|
251
|
upload_vdir = self.get_virtual_directory()
|
291
|
|
-
|
292
|
252
|
cascache = self._get_context().get_cascache()
|
293
|
253
|
if isinstance(upload_vdir, FileBasedDirectory):
|
294
|
254
|
# Make a new temporary directory to put source in
|
... |
... |
@@ -297,20 +257,97 @@ class SandboxRemote(Sandbox): |
297
|
257
|
|
298
|
258
|
upload_vdir.recalculate_hash()
|
299
|
259
|
|
300
|
|
- casremote = CASRemote(self.storage_remote_spec)
|
301
|
|
- # Now, push that key (without necessarily needing a ref) to the remote.
|
|
260
|
+ # Generate action_digest first
|
|
261
|
+ input_root_digest = upload_vdir.ref
|
|
262
|
+ command_proto = self._create_command(command, cwd, env)
|
|
263
|
+ command_digest = utils._message_digest(command_proto.SerializeToString())
|
|
264
|
+ action = remote_execution_pb2.Action(command_digest=command_digest,
|
|
265
|
+ input_root_digest=input_root_digest)
|
|
266
|
+ action_digest = utils._message_digest(action.SerializeToString())
|
|
267
|
+
|
|
268
|
+ # Next, try to create a communication channel to the BuildGrid server.
|
|
269
|
+ url = urlparse(self.exec_url)
|
|
270
|
+ if not url.port:
|
|
271
|
+ raise SandboxError("You must supply a protocol and port number in the execution-service url, "
|
|
272
|
+ "for example: http://buildservice:50051.")
|
|
273
|
+ if url.scheme == 'http':
|
|
274
|
+ channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
|
|
275
|
+ else:
|
|
276
|
+ raise SandboxError("Remote execution currently only supports the 'http' protocol "
|
|
277
|
+ "and '{}' was supplied.".format(url.scheme))
|
|
278
|
+
|
|
279
|
+ # check action cache download and download if there
|
|
280
|
+ action_result = self._check_action_cache(channel, action_digest)
|
|
281
|
+
|
|
282
|
+ if not action_result:
|
|
283
|
+ casremote = CASRemote(self.storage_remote_spec)
|
302
|
284
|
|
|
285
|
+ # Now, push that key (without necessarily needing a ref) to the remote.
|
|
286
|
+ try:
|
|
287
|
+ cascache.push_directory(casremote, upload_vdir)
|
|
288
|
+ except grpc.RpcError as e:
|
|
289
|
+ raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
|
|
290
|
+
|
|
291
|
+ if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
|
|
292
|
+ raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
|
|
293
|
+
|
|
294
|
+ # Push command and action
|
|
295
|
+ try:
|
|
296
|
+ cascache.push_message(casremote, command_proto)
|
|
297
|
+ except grpc.RpcError as e:
|
|
298
|
+ raise SandboxError("Failed to push command to remote: {}".format(e))
|
|
299
|
+
|
|
300
|
+ try:
|
|
301
|
+ cascache.push_message(casremote, action)
|
|
302
|
+ except grpc.RpcError as e:
|
|
303
|
+ raise SandboxError("Failed to push action to remote: {}".format(e))
|
|
304
|
+
|
|
305
|
+ # Now request to execute the action
|
|
306
|
+ operation = self.run_remote_command(channel, action_digest)
|
|
307
|
+ action_result = self._extract_action_result(operation)
|
|
308
|
+
|
|
309
|
+ if action_result.exit_code != 0:
|
|
310
|
+ # A normal error during the build: the remote execution system
|
|
311
|
+ # has worked correctly but the command failed.
|
|
312
|
+ # action_result.stdout and action_result.stderr also contains
|
|
313
|
+ # build command outputs which we ignore at the moment.
|
|
314
|
+ return action_result.exit_code
|
|
315
|
+
|
|
316
|
+ # Get output of build
|
|
317
|
+ self.process_job_output(action_result.output_directories, action_result.output_files)
|
|
318
|
+
|
|
319
|
+ return 0
|
|
320
|
+
|
|
321
|
+ def _check_action_cache(self, channel, action_digest):
|
|
322
|
+ # Checks the action cache to see if this artifact has already been built
|
|
323
|
+ #
|
|
324
|
+ # Should return either the action response or None if not found, raise
|
|
325
|
+ # Sandboxerror if other grpc error was raised
|
|
326
|
+ request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
|
|
327
|
+ stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
|
303
|
328
|
try:
|
304
|
|
- cascache.push_directory(casremote, upload_vdir)
|
|
329
|
+ return stub.GetActionResult(request)
|
305
|
330
|
except grpc.RpcError as e:
|
306
|
|
- raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
|
307
|
|
-
|
308
|
|
- if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
|
309
|
|
- raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
|
|
331
|
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
332
|
+ raise SandboxError("Failed to query action cache: {} ({})"
|
|
333
|
+ .format(e.code(), e.details()))
|
|
334
|
+ else:
|
|
335
|
+ return None
|
310
|
336
|
|
311
|
|
- # Now transmit the command to execute
|
312
|
|
- operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
|
|
337
|
+ def _create_command(self, command, working_directory, environment):
|
|
338
|
+ # Creates a command proto
|
|
339
|
+ environment_variables = [remote_execution_pb2.Command.
|
|
340
|
+ EnvironmentVariable(name=k, value=v)
|
|
341
|
+ for (k, v) in environment.items()]
|
|
342
|
+ return remote_execution_pb2.Command(arguments=command,
|
|
343
|
+ working_directory=working_directory,
|
|
344
|
+ environment_variables=environment_variables,
|
|
345
|
+ output_files=[],
|
|
346
|
+ output_directories=[self._output_directory],
|
|
347
|
+ platform=None)
|
313
|
348
|
|
|
349
|
+ @staticmethod
|
|
350
|
+ def _extract_action_result(operation):
|
314
|
351
|
if operation is None:
|
315
|
352
|
# Failure of remote execution, usually due to an error in BuildStream
|
316
|
353
|
raise SandboxError("No response returned from server")
|
... |
... |
@@ -331,18 +368,7 @@ class SandboxRemote(Sandbox): |
331
|
368
|
else:
|
332
|
369
|
raise SandboxError("Remote server failed at executing the build request.")
|
333
|
370
|
|
334
|
|
- action_result = execution_response.result
|
335
|
|
-
|
336
|
|
- if action_result.exit_code != 0:
|
337
|
|
- # A normal error during the build: the remote execution system
|
338
|
|
- # has worked correctly but the command failed.
|
339
|
|
- # action_result.stdout and action_result.stderr also contains
|
340
|
|
- # build command outputs which we ignore at the moment.
|
341
|
|
- return action_result.exit_code
|
342
|
|
-
|
343
|
|
- self.process_job_output(action_result.output_directories, action_result.output_files)
|
344
|
|
-
|
345
|
|
- return 0
|
|
371
|
+ return execution_response.result
|
346
|
372
|
|
347
|
373
|
def _create_batch(self, main_group, flags, *, collect=None):
|
348
|
374
|
return _SandboxRemoteBatch(self, main_group, flags, collect=collect)
|