finn pushed to branch finn/74-operation-cancelation at BuildGrid / buildgrid
Commits:
-
28bf01e2
by Finn at 2018-11-02T14:23:38Z
-
146d07e0
by Finn at 2018-11-02T14:33:12Z
-
d4735838
by Finn at 2018-11-02T14:33:16Z
5 changed files:
- buildgrid/_app/commands/cmd_operation.py
- buildgrid/server/execution/instance.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- tests/integration/operations_service.py
Changes:
... | ... | @@ -155,6 +155,25 @@ def status(context, operation_name, json): |
155 | 155 |
click.echo(json_format.MessageToJson(operation))
|
156 | 156 |
|
157 | 157 |
|
158 |
+@cli.command('cancel', short_help="Cancel an operation.")
|
|
159 |
+@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
|
|
160 |
+@pass_context
|
|
161 |
+def cancel(context, operation_name):
|
|
162 |
+ context.logger.info("Cancelling an operation...")
|
|
163 |
+ stub = operations_pb2_grpc.OperationsStub(context.channel)
|
|
164 |
+ |
|
165 |
+ request = operations_pb2.CancelOperationRequest(name=operation_name)
|
|
166 |
+ |
|
167 |
+ try:
|
|
168 |
+ stub.CancelOperation(request)
|
|
169 |
+ except grpc.RpcError as e:
|
|
170 |
+ status_code = e.code()
|
|
171 |
+ if status_code != grpc.StatusCode.CANCELLED:
|
|
172 |
+ raise
|
|
173 |
+ |
|
174 |
+ context.logger.info("Operation cancelled: [{}]".format(request))
|
|
175 |
+ |
|
176 |
+ |
|
158 | 177 |
@cli.command('list', short_help="List operations.")
|
159 | 178 |
@click.option('--json', is_flag=True, show_default=True,
|
160 | 179 |
help="Print operations list in JSON format.")
|
... | ... | @@ -71,8 +71,11 @@ class ExecutionInstance: |
71 | 71 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
72 | 72 |
|
73 | 73 |
def stream_operation_updates(self, message_queue, operation_name):
|
74 |
- operation = message_queue.get()
|
|
75 |
- while not operation.done:
|
|
76 |
- yield operation
|
|
77 |
- operation = message_queue.get()
|
|
78 |
- yield operation
|
|
74 |
+ job = message_queue.get()
|
|
75 |
+ while not job.operation.done:
|
|
76 |
+ yield job.operation
|
|
77 |
+ job = message_queue.get()
|
|
78 |
+ |
|
79 |
+ job.check_operation_status()
|
|
80 |
+ |
|
81 |
+ yield job.operation
|
... | ... | @@ -113,7 +113,7 @@ class Job: |
113 | 113 |
queue (queue.Queue): the event queue to register.
|
114 | 114 |
"""
|
115 | 115 |
self._operation_update_queues.append(queue)
|
116 |
- queue.put(self._operation)
|
|
116 |
+ queue.put(self)
|
|
117 | 117 |
|
118 | 118 |
def unregister_client(self, queue):
|
119 | 119 |
"""Unsubscribes to the job's :class:`Operation` stage change events.
|
... | ... | @@ -229,7 +229,16 @@ class Job: |
229 | 229 |
self._operation.metadata.Pack(self.__operation_metadata)
|
230 | 230 |
|
231 | 231 |
for queue in self._operation_update_queues:
|
232 |
- queue.put(self._operation)
|
|
232 |
+ queue.put(self)
|
|
233 |
+ |
|
234 |
+ def check_operation_status(self):
|
|
235 |
+ """Reports errors on unexpected job's :class:Operation state.
|
|
236 |
+ |
|
237 |
+ Raises:
|
|
238 |
+ CancelledError: if the job's :class:Operation was cancelled.
|
|
239 |
+ """
|
|
240 |
+ if self.__operation_cancelled:
|
|
241 |
+ raise CancelledError(self.__execute_response.status.message)
|
|
233 | 242 |
|
234 | 243 |
def cancel_operation(self):
|
235 | 244 |
"""Triggers a job's :class:Operation cancellation.
|
... | ... | @@ -86,8 +86,11 @@ class OperationsInstance: |
86 | 86 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
87 | 87 |
|
88 | 88 |
def stream_operation_updates(self, message_queue, operation_name):
|
89 |
- operation = message_queue.get()
|
|
90 |
- while not operation.done:
|
|
91 |
- yield operation
|
|
92 |
- operation = message_queue.get()
|
|
93 |
- yield operation
|
|
89 |
+ job = message_queue.get()
|
|
90 |
+ while not job.operation.done:
|
|
91 |
+ yield job.operation
|
|
92 |
+ job = message_queue.get()
|
|
93 |
+ |
|
94 |
+ job.check_operation_status()
|
|
95 |
+ |
|
96 |
+ yield job.operation
|
... | ... | @@ -24,6 +24,7 @@ import grpc |
24 | 24 |
from grpc._server import _Context
|
25 | 25 |
import pytest
|
26 | 26 |
|
27 |
+from buildgrid._enums import OperationStage
|
|
27 | 28 |
from buildgrid._exceptions import InvalidArgumentError
|
28 | 29 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
29 | 30 |
from buildgrid._protos.google.longrunning import operations_pb2
|
... | ... | @@ -236,12 +237,26 @@ def test_delete_operation_fail(instance, context): |
236 | 237 |
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
237 | 238 |
|
238 | 239 |
|
239 |
-def test_cancel_operation(instance, context):
|
|
240 |
+def test_cancel_operation(instance, controller, execute_request, context):
|
|
241 |
+ response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
242 |
+ execute_request.skip_cache_lookup)
|
|
243 |
+ |
|
240 | 244 |
request = operations_pb2.CancelOperationRequest()
|
241 |
- request.name = "{}/{}".format(instance_name, "runner")
|
|
245 |
+ request.name = "{}/{}".format(instance_name, response_execute.name)
|
|
246 |
+ |
|
242 | 247 |
instance.CancelOperation(request, context)
|
243 | 248 |
|
244 |
- context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
|
249 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.CANCELLED)
|
|
250 |
+ |
|
251 |
+ request = operations_pb2.ListOperationsRequest(name=instance_name)
|
|
252 |
+ response = instance.ListOperations(request, context)
|
|
253 |
+ |
|
254 |
+ assert len(response.operations) is 1
|
|
255 |
+ |
|
256 |
+ for operation in response.operations:
|
|
257 |
+ operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
|
258 |
+ operation.metadata.Unpack(operation_metadata)
|
|
259 |
+ assert operation_metadata.stage == OperationStage.COMPLETED.value
|
|
245 | 260 |
|
246 | 261 |
|
247 | 262 |
def test_cancel_operation_blank(blank_instance, context):
|
... | ... | @@ -249,7 +264,7 @@ def test_cancel_operation_blank(blank_instance, context): |
249 | 264 |
request.name = "runner"
|
250 | 265 |
blank_instance.CancelOperation(request, context)
|
251 | 266 |
|
252 |
- context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
|
267 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
|
253 | 268 |
|
254 | 269 |
|
255 | 270 |
def test_cancel_operation_instance_fail(instance, context):
|