... |
... |
@@ -20,14 +20,20 @@ |
20
|
20
|
|
21
|
21
|
import os
|
22
|
22
|
from urllib.parse import urlparse
|
|
23
|
+from functools import partial
|
23
|
24
|
|
24
|
25
|
import grpc
|
25
|
26
|
|
26
|
27
|
from . import Sandbox
|
27
|
28
|
from ..storage._filebaseddirectory import FileBasedDirectory
|
28
|
29
|
from ..storage._casbaseddirectory import CasBasedDirectory
|
|
30
|
+from .. import _signals
|
29
|
31
|
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
30
|
32
|
from .._protos.google.rpc import code_pb2
|
|
33
|
+from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
|
|
34
|
+
|
|
35
|
+# TODO remove this later
|
|
36
|
+from .._message import Message, MessageType
|
31
|
37
|
|
32
|
38
|
|
33
|
39
|
class SandboxError(Exception):
|
... |
... |
@@ -104,11 +110,14 @@ class SandboxRemote(Sandbox): |
104
|
110
|
request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
|
105
|
111
|
operation_iterator = stub.WaitExecution(request)
|
106
|
112
|
|
107
|
|
- for operation in operation_iterator:
|
108
|
|
- if operation.done:
|
109
|
|
- return operation
|
110
|
|
- else:
|
111
|
|
- last_operation = operation
|
|
113
|
+ operation = next(operation_iterator)
|
|
114
|
+ operation_name = operation.name
|
|
115
|
+ with _signals.terminator(partial(self.cancel_operation, operation_name)):
|
|
116
|
+ for operation in operation_iterator:
|
|
117
|
+ if operation.done:
|
|
118
|
+ return operation
|
|
119
|
+ else:
|
|
120
|
+ last_operation = operation
|
112
|
121
|
except grpc.RpcError as e:
|
113
|
122
|
status_code = e.code()
|
114
|
123
|
if status_code == grpc.StatusCode.UNAVAILABLE:
|
... |
... |
@@ -135,12 +144,25 @@ class SandboxRemote(Sandbox): |
135
|
144
|
return None
|
136
|
145
|
elif operation.done:
|
137
|
146
|
return operation
|
138
|
|
-
|
139
|
147
|
while operation is not None and not operation.done:
|
140
|
148
|
operation = __run_remote_command(stub, running_operation=operation)
|
141
|
149
|
|
142
|
150
|
return operation
|
143
|
151
|
|
|
152
|
+ def cancel_operation(self, operation_name):
|
|
153
|
+ channel = grpc.insecure_channel(self.server_url)
|
|
154
|
+ stub = operations_pb2_grpc.OperationsStub(channel)
|
|
155
|
+ request = operations_pb2.CancelOperationRequest(name="/{}".format(operation_name))
|
|
156
|
+ self._get_context().message(
|
|
157
|
+ Message(None, MessageType.WARN,
|
|
158
|
+ "CancelOperation request: {}".format(str(request))))
|
|
159
|
+
|
|
160
|
+ try:
|
|
161
|
+ stub.CancelOperation(request)
|
|
162
|
+ except grpc.RpcError as e:
|
|
163
|
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED):
|
|
164
|
+ raise SandboxError("{} ({})".format(e.details(), e.code().name))
|
|
165
|
+
|
144
|
166
|
def process_job_output(self, output_directories, output_files):
|
145
|
167
|
# Reads the remote execution server response to an execution request.
|
146
|
168
|
#
|