... |
... |
@@ -19,6 +19,7 @@ |
19
|
19
|
# Jim MacArthur <jim macarthur codethink co uk>
|
20
|
20
|
|
21
|
21
|
import os
|
|
22
|
+import signal
|
22
|
23
|
from urllib.parse import urlparse
|
23
|
24
|
|
24
|
25
|
import grpc
|
... |
... |
@@ -26,8 +27,13 @@ import grpc |
26
|
27
|
from . import Sandbox
|
27
|
28
|
from ..storage._filebaseddirectory import FileBasedDirectory
|
28
|
29
|
from ..storage._casbaseddirectory import CasBasedDirectory
|
|
30
|
+from .. import _signals
|
29
|
31
|
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
30
|
32
|
from .._protos.google.rpc import code_pb2
|
|
33
|
+from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
|
|
34
|
+
|
|
35
|
+# TODO remove this later
|
|
36
|
+from .._message import Message, MessageType
|
31
|
37
|
|
32
|
38
|
|
33
|
39
|
class SandboxError(Exception):
|
... |
... |
@@ -54,6 +60,7 @@ class SandboxRemote(Sandbox): |
54
|
60
|
"Only plain HTTP is currenlty supported (no HTTPS).")
|
55
|
61
|
|
56
|
62
|
self.server_url = '{}:{}'.format(url.hostname, url.port)
|
|
63
|
+ self.operation_name = None
|
57
|
64
|
|
58
|
65
|
def run_remote_command(self, command, input_root_digest, working_directory, environment):
|
59
|
66
|
# Sends an execution request to the remote execution server.
|
... |
... |
@@ -104,11 +111,15 @@ class SandboxRemote(Sandbox): |
104
|
111
|
request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
|
105
|
112
|
operation_iterator = stub.WaitExecution(request)
|
106
|
113
|
|
107
|
|
- for operation in operation_iterator:
|
108
|
|
- if operation.done:
|
109
|
|
- return operation
|
110
|
|
- else:
|
111
|
|
- last_operation = operation
|
|
114
|
+ with _signals.terminator(self.cancel_operation):
|
|
115
|
+ with _signals.blocked([signal.SIGTERM], ignore=False):
|
|
116
|
+ operation = next(operation_iterator)
|
|
117
|
+ self.operation_name = operation.name
|
|
118
|
+ for operation in operation_iterator:
|
|
119
|
+ if operation.done:
|
|
120
|
+ return operation
|
|
121
|
+ else:
|
|
122
|
+ last_operation = operation
|
112
|
123
|
except grpc.RpcError as e:
|
113
|
124
|
status_code = e.code()
|
114
|
125
|
if status_code == grpc.StatusCode.UNAVAILABLE:
|
... |
... |
@@ -135,12 +146,23 @@ class SandboxRemote(Sandbox): |
135
|
146
|
return None
|
136
|
147
|
elif operation.done:
|
137
|
148
|
return operation
|
138
|
|
-
|
139
|
149
|
while operation is not None and not operation.done:
|
140
|
150
|
operation = __run_remote_command(stub, running_operation=operation)
|
141
|
151
|
|
142
|
152
|
return operation
|
143
|
153
|
|
|
154
|
+ def cancel_operation(self):
|
|
155
|
+ channel = grpc.insecure_channel(self.server_url)
|
|
156
|
+ stub = operations_pb2_grpc.OperationsStub(channel)
|
|
157
|
+ request = operations_pb2.CancelOperationRequest(
|
|
158
|
+ name="/{}".format(self.operation_name))
|
|
159
|
+
|
|
160
|
+ try:
|
|
161
|
+ stub.CancelOperation(request)
|
|
162
|
+ except grpc.RpcError as e:
|
|
163
|
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
164
|
+ raise SandboxError("{} ({})".format(e.details(), e.code().name))
|
|
165
|
+
|
144
|
166
|
def process_job_output(self, output_directories, output_files):
|
145
|
167
|
# Reads the remote execution server response to an execution request.
|
146
|
168
|
#
|