... |
... |
@@ -20,15 +20,18 @@ |
20
|
20
|
|
21
|
21
|
import os
|
22
|
22
|
from urllib.parse import urlparse
|
|
23
|
+from functools import partial
|
23
|
24
|
|
24
|
25
|
import grpc
|
25
|
26
|
|
26
|
27
|
from . import Sandbox
|
27
|
28
|
from ..storage._filebaseddirectory import FileBasedDirectory
|
28
|
29
|
from ..storage._casbaseddirectory import CasBasedDirectory
|
|
30
|
+from .. import _signals
|
29
|
31
|
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
30
|
32
|
from .._protos.google.rpc import code_pb2
|
31
|
33
|
from .._exceptions import SandboxError
|
|
34
|
+from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
|
32
|
35
|
|
33
|
36
|
|
34
|
37
|
# SandboxRemote()
|
... |
... |
@@ -51,6 +54,7 @@ class SandboxRemote(Sandbox): |
51
|
54
|
"Only plain HTTP is currenlty supported (no HTTPS).")
|
52
|
55
|
|
53
|
56
|
self.server_url = '{}:{}'.format(url.hostname, url.port)
|
|
57
|
+ self.operation_name = None
|
54
|
58
|
|
55
|
59
|
def run_remote_command(self, command, input_root_digest, working_directory, environment):
|
56
|
60
|
# Sends an execution request to the remote execution server.
|
... |
... |
@@ -102,10 +106,13 @@ class SandboxRemote(Sandbox): |
102
|
106
|
operation_iterator = stub.WaitExecution(request)
|
103
|
107
|
|
104
|
108
|
for operation in operation_iterator:
|
|
109
|
+ if not self.operation_name:
|
|
110
|
+ self.operation_name = operation.name
|
105
|
111
|
if operation.done:
|
106
|
112
|
return operation
|
107
|
113
|
else:
|
108
|
114
|
last_operation = operation
|
|
115
|
+
|
109
|
116
|
except grpc.RpcError as e:
|
110
|
117
|
status_code = e.code()
|
111
|
118
|
if status_code == grpc.StatusCode.UNAVAILABLE:
|
... |
... |
@@ -125,19 +132,38 @@ class SandboxRemote(Sandbox): |
125
|
132
|
|
126
|
133
|
return last_operation
|
127
|
134
|
|
|
135
|
+ # Set up signal handler to trigger cancel_operation on SIGTERM
|
128
|
136
|
operation = None
|
129
|
|
- with self._get_context().timed_activity("Waiting for the remote build to complete"):
|
|
137
|
+ with self._get_context().timed_activity("Waiting for the remote build to complete"), \
|
|
138
|
+ _signals.terminator(partial(self.cancel_operation, channel)):
|
130
|
139
|
operation = __run_remote_command(stub, execute_request=request)
|
131
|
140
|
if operation is None:
|
132
|
141
|
return None
|
133
|
142
|
elif operation.done:
|
134
|
143
|
return operation
|
135
|
|
-
|
136
|
144
|
while operation is not None and not operation.done:
|
137
|
145
|
operation = __run_remote_command(stub, running_operation=operation)
|
138
|
146
|
|
139
|
147
|
return operation
|
140
|
148
|
|
|
149
|
+ def cancel_operation(self, channel):
|
|
150
|
+ # If we don't have the name can't send request.
|
|
151
|
+ if self.operation_name is None:
|
|
152
|
+ return
|
|
153
|
+
|
|
154
|
+ stub = operations_pb2_grpc.OperationsStub(channel)
|
|
155
|
+ request = operations_pb2.CancelOperationRequest(
|
|
156
|
+ name=str(self.operation_name))
|
|
157
|
+
|
|
158
|
+ try:
|
|
159
|
+ stub.CancelOperation(request)
|
|
160
|
+ except grpc.RpcError as e:
|
|
161
|
+ if (e.code() == grpc.StatusCode.UNIMPLEMENTED or
|
|
162
|
+ e.code() == grpc.StatusCode.INVALID_ARGUMENT):
|
|
163
|
+ pass
|
|
164
|
+ else:
|
|
165
|
+ raise SandboxError("{} ({})".format(e.details(), e.code().name))
|
|
166
|
+
|
141
|
167
|
def process_job_output(self, output_directories, output_files):
|
142
|
168
|
# Reads the remote execution server response to an execution request.
|
143
|
169
|
#
|