finn pushed to branch finn/74-operation-cancelation at BuildGrid / buildgrid
Commits:
-
1e4bc505
by Finn at 2018-10-25T09:08:45Z
-
713e81dd
by Finn at 2018-10-25T12:11:09Z
-
2f41fcd3
by Finn at 2018-10-25T13:07:40Z
-
83f678f4
by Finn at 2018-10-25T15:37:37Z
-
907bcac2
by Finn at 2018-10-25T15:37:58Z
-
1ecce28e
by Finn at 2018-10-25T15:37:58Z
-
2064c35d
by Finn at 2018-10-25T15:37:58Z
7 changed files:
- buildgrid/_app/commands/cmd_operation.py
- buildgrid/_exceptions.py
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/operations/service.py
- buildgrid/server/scheduler.py
Changes:
... | ... | @@ -78,6 +78,19 @@ def status(context, operation_name): |
78 | 78 |
context.logger.info(response)
|
79 | 79 |
|
80 | 80 |
|
81 |
+@cli.command('cancel', short_help="Cancel an operation.")
|
|
82 |
+@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
|
|
83 |
+@pass_context
|
|
84 |
+def status(context, operation_name):
|
|
85 |
+ context.logger.info("Cancelling an operation...")
|
|
86 |
+ stub = operations_pb2_grpc.OperationsStub(context.channel)
|
|
87 |
+ |
|
88 |
+ request = operations_pb2.CancelOperationRequest(name=operation_name)
|
|
89 |
+ |
|
90 |
+ response = stub.CancelOperation(request)
|
|
91 |
+ context.logger.info("Operation cancelled")
|
|
92 |
+ |
|
93 |
+ |
|
81 | 94 |
@cli.command('list', short_help="List operations.")
|
82 | 95 |
@pass_context
|
83 | 96 |
def lists(context):
|
... | ... | @@ -52,6 +52,12 @@ class BotError(BgdError): |
52 | 52 |
super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
|
53 | 53 |
|
54 | 54 |
|
55 |
+class CancelledError(BgdError):
|
|
56 |
+ """The job was cancelled and any callers should be notified"""
|
|
57 |
+ def __init__(self, message, detail=None, reason=None):
|
|
58 |
+ super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
|
|
59 |
+ |
|
60 |
+ |
|
55 | 61 |
class InvalidArgumentError(BgdError):
|
56 | 62 |
"""A bad argument was passed, such as a name which doesn't exist."""
|
57 | 63 |
def __init__(self, message, detail=None, reason=None):
|
... | ... | @@ -26,7 +26,7 @@ from functools import partial |
26 | 26 |
|
27 | 27 |
import grpc
|
28 | 28 |
|
29 |
-from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
|
29 |
+from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
|
|
30 | 30 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
31 | 31 |
from buildgrid._protos.google.longrunning import operations_pb2
|
32 | 32 |
|
... | ... | @@ -67,6 +67,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
67 | 67 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
68 | 68 |
yield operations_pb2.Operation()
|
69 | 69 |
|
70 |
+ except CancelledError as e:
|
|
71 |
+ self.logger.error(e)
|
|
72 |
+ context.set_details(str(e))
|
|
73 |
+ context.set_code(grpc.StatusCode.CANCELLED)
|
|
74 |
+ yield operations_pb2.Operation()
|
|
75 |
+ |
|
70 | 76 |
def WaitExecution(self, request, context):
|
71 | 77 |
try:
|
72 | 78 |
names = request.name.split("/")
|
... | ... | @@ -93,6 +99,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
93 | 99 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
94 | 100 |
yield operations_pb2.Operation()
|
95 | 101 |
|
102 |
+ except CancelledError as e:
|
|
103 |
+ self.logger.error(e)
|
|
104 |
+ context.set_details(str(e))
|
|
105 |
+ context.set_code(grpc.StatusCode.CANCELLED)
|
|
106 |
+ yield operations_pb2.Operation()
|
|
107 |
+ |
|
96 | 108 |
def _get_instance(self, name):
|
97 | 109 |
try:
|
98 | 110 |
return self._instances[name]
|
... | ... | @@ -17,9 +17,11 @@ import logging |
17 | 17 |
import uuid
|
18 | 18 |
from enum import Enum
|
19 | 19 |
|
20 |
+from buildgrid._exceptions import CancelledError
|
|
20 | 21 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
21 | 22 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
22 | 23 |
from buildgrid._protos.google.longrunning import operations_pb2
|
24 |
+from buildgrid._protos.google.rpc import code_pb2
|
|
23 | 25 |
|
24 | 26 |
|
25 | 27 |
class OperationStage(Enum):
|
... | ... | @@ -60,6 +62,8 @@ class Job: |
60 | 62 |
|
61 | 63 |
self.__execute_response = None
|
62 | 64 |
self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
65 |
+ self.__operation_cancelled = False
|
|
66 |
+ self.__lease_cancelled = False
|
|
63 | 67 |
|
64 | 68 |
self.__operation_metadata.action_digest.CopyFrom(action_digest)
|
65 | 69 |
self.__operation_metadata.stage = OperationStage.UNKNOWN.value
|
... | ... | @@ -151,7 +155,9 @@ class Job: |
151 | 155 |
Only one :class:`Lease` can be emitted for a given job. This method
|
152 | 156 |
should only be used once, any furhter calls are ignored.
|
153 | 157 |
"""
|
154 |
- if self._lease is not None:
|
|
158 |
+ if self.__operation_cancelled:
|
|
159 |
+ return None
|
|
160 |
+ elif self._lease is not None:
|
|
155 | 161 |
return None
|
156 | 162 |
|
157 | 163 |
self._lease = bots_pb2.Lease()
|
... | ... | @@ -184,7 +190,7 @@ class Job: |
184 | 190 |
action_result = remote_execution_pb2.ActionResult()
|
185 | 191 |
|
186 | 192 |
# TODO: Make a distinction between build and bot failures!
|
187 |
- if status.code != 0:
|
|
193 |
+ if status.code != code_pb2.OK:
|
|
188 | 194 |
self._do_not_cache = True
|
189 | 195 |
|
190 | 196 |
if result is not None:
|
... | ... | @@ -196,6 +202,16 @@ class Job: |
196 | 202 |
self.__execute_response.cached_result = False
|
197 | 203 |
self.__execute_response.status.CopyFrom(status)
|
198 | 204 |
|
205 |
+ def cancel_lease(self):
|
|
206 |
+ """Triggers a job's :class:Lease cancellation.
|
|
207 |
+ |
|
208 |
+ This will not cancel the job's :class:Operation.
|
|
209 |
+ """
|
|
210 |
+ self.__lease_cancelled = True
|
|
211 |
+ if self._lease is not None:
|
|
212 |
+ self.update_lease_state(LeaseState.CANCELLED)
|
|
213 |
+ |
|
214 |
+ |
|
199 | 215 |
def update_operation_stage(self, stage):
|
200 | 216 |
"""Operates a stage transition for the job's :class:Operation.
|
201 | 217 |
|
... | ... | @@ -219,3 +235,18 @@ class Job: |
219 | 235 |
|
220 | 236 |
for queue in self._operation_update_queues:
|
221 | 237 |
queue.put(self._operation)
|
238 |
+ |
|
239 |
+ def cancel_operation(self):
|
|
240 |
+ """Triggers a job's :class:Operation cancellation.
|
|
241 |
+ |
|
242 |
+ This will also cancel any job's :class:Lease that may have been issued.
|
|
243 |
+ """
|
|
244 |
+ self.__operation_cancelled = True
|
|
245 |
+ if self._lease is not None:
|
|
246 |
+ self.cancel_lease()
|
|
247 |
+ |
|
248 |
+ self.__execute_response = remote_execution_pb2.ExecuteResponse()
|
|
249 |
+ self.__execute_response.status.code = code_pb2.CANCELLED
|
|
250 |
+ self.__execute_response.status.message = "Operation cancelled by client."
|
|
251 |
+ |
|
252 |
+ self.update_operation_stage(OperationStage.COMPLETED)
|
... | ... | @@ -58,6 +58,13 @@ class OperationsInstance: |
58 | 58 |
except KeyError:
|
59 | 59 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
60 | 60 |
|
61 |
+ def cancel_operation(self, name):
|
|
62 |
+ try:
|
|
63 |
+ self._scheduler.cancel_job_operation(name)
|
|
64 |
+ |
|
65 |
+ except KeyError:
|
|
66 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
67 |
+ |
|
61 | 68 |
def register_message_client(self, name, queue):
|
62 | 69 |
try:
|
63 | 70 |
self._scheduler.register_client(name, queue)
|
... | ... | @@ -78,7 +85,3 @@ class OperationsInstance: |
78 | 85 |
yield operation
|
79 | 86 |
operation = message_queue.get()
|
80 | 87 |
yield operation
|
81 |
- |
|
82 |
- def cancel_operation(self, name):
|
|
83 |
- # TODO: Cancel leases
|
|
84 |
- raise NotImplementedError("Cancelled operations not supported")
|
... | ... | @@ -25,7 +25,7 @@ import grpc |
25 | 25 |
|
26 | 26 |
from google.protobuf.empty_pb2 import Empty
|
27 | 27 |
|
28 |
-from buildgrid._exceptions import InvalidArgumentError
|
|
28 |
+from buildgrid._exceptions import CancelledError, InvalidArgumentError
|
|
29 | 29 |
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
|
30 | 30 |
|
31 | 31 |
|
... | ... | @@ -107,7 +107,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
107 | 107 |
|
108 | 108 |
instance.cancel_operation(operation_name)
|
109 | 109 |
|
110 |
- except NotImplementedError as e:
|
|
110 |
+ except CancelledError as e:
|
|
111 | 111 |
self.logger.error(e)
|
112 | 112 |
context.set_details(str(e))
|
113 | 113 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
... | ... | @@ -94,7 +94,10 @@ class Scheduler: |
94 | 94 |
# For now, one lease at a time:
|
95 | 95 |
lease = job.create_lease()
|
96 | 96 |
|
97 |
- return [lease]
|
|
97 |
+ if lease:
|
|
98 |
+ return [lease]
|
|
99 |
+ |
|
100 |
+ return None
|
|
98 | 101 |
|
99 | 102 |
def update_job_lease_state(self, job_name, lease_state, lease_status=None, lease_result=None):
|
100 | 103 |
"""Requests a state transition for a job's current :class:Lease.
|
... | ... | @@ -128,3 +131,13 @@ class Scheduler: |
128 | 131 |
def get_job_operation(self, job_name):
|
129 | 132 |
"""Returns the operation associated to job."""
|
130 | 133 |
return self.jobs[job_name].operation
|
134 |
+ |
|
135 |
+ def cancel_job_operation(self, job_name):
|
|
136 |
+ """"Cancels the underlying operation of a given job.
|
|
137 |
+ |
|
138 |
+ This will also cancel any job's lease that may have been issued.
|
|
139 |
+ |
|
140 |
+ Args:
|
|
141 |
+ job_name (str): name of the job holding the operation to cancel.
|
|
142 |
+ """
|
|
143 |
+ self.jobs[job_name].cancel_operation()
|