[Notes] [Git][BuildGrid/buildgrid][mablanch/74-operation-cancelation] 5 commits: job.py: Implement operation cancellation



Title: GitLab

Martin Blanchard pushed to branch mablanch/74-operation-cancelation at BuildGrid / buildgrid

Commits:

5 changed files:

Changes:

  • buildgrid/_exceptions.py
    ... ... @@ -52,6 +52,11 @@ class BotError(BgdError):
    52 52
             super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
    
    53 53
     
    
    54 54
     
    
    55
    +class CancelledError(BgdError):
    
    56
    +    def __init__(self, message, detail=None, reason=None):
    
    57
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    58
    +
    
    59
    +
    
    55 60
     class InvalidArgumentError(BgdError):
    
    56 61
         """A bad argument was passed, such as a name which doesn't exist."""
    
    57 62
         def __init__(self, message, detail=None, reason=None):
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -22,7 +22,7 @@ An instance of the Remote Execution Service.
    22 22
     import logging
    
    23 23
     
    
    24 24
     from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    25
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    25
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    26 26
     
    
    27 27
     from ..job import Job
    
    28 28
     
    
    ... ... @@ -43,7 +43,7 @@ class ExecutionInstance:
    43 43
             this action.
    
    44 44
             """
    
    45 45
     
    
    46
    -        action = self._storage.get_message(action_digest, Action)
    
    46
    +        action = self._storage.get_message(action_digest, remote_execution_pb2.Action)
    
    47 47
     
    
    48 48
             if not action:
    
    49 49
                 raise FailedPreconditionError("Could not get action from storage.")
    
    ... ... @@ -73,8 +73,11 @@ class ExecutionInstance:
    73 73
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    74 74
     
    
    75 75
         def stream_operation_updates(self, message_queue, operation_name):
    
    76
    -        operation = message_queue.get()
    
    77
    -        while not operation.done:
    
    78
    -            yield operation
    
    79
    -            operation = message_queue.get()
    
    80
    -        yield operation
    76
    +        job = message_queue.get()
    
    77
    +        while not job.operation.done:
    
    78
    +            yield job.operation
    
    79
    +            job = message_queue.get()
    
    80
    +
    
    81
    +        job.check_operation_status()
    
    82
    +
    
    83
    +        yield job.operation

  • buildgrid/server/execution/service.py
    ... ... @@ -26,7 +26,7 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    29
    +from buildgrid._exceptions import CancelledError, FailedPreconditionError, InvalidArgumentError
    
    30 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32 32
     
    
    ... ... @@ -67,6 +67,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    67 67
                 context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    68 68
                 yield operations_pb2.Operation()
    
    69 69
     
    
    70
    +        except CancelledError as e:
    
    71
    +            self.logger.error(e)
    
    72
    +            context.set_details(str(e))
    
    73
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    74
    +            yield operations_pb2.Operation()
    
    75
    +
    
    70 76
         def WaitExecution(self, request, context):
    
    71 77
             try:
    
    72 78
                 names = request.name.split("/")
    

  • buildgrid/server/job.py
    ... ... @@ -17,9 +17,11 @@ import logging
    17 17
     import uuid
    
    18 18
     from enum import Enum
    
    19 19
     
    
    20
    +from buildgrid._exceptions import CancelledError
    
    20 21
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    21 22
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    22 23
     from buildgrid._protos.google.longrunning import operations_pb2
    
    24
    +from buildgrid._protos.google.rpc import code_pb2
    
    23 25
     
    
    24 26
     
    
    25 27
     class OperationStage(Enum):
    
    ... ... @@ -60,6 +62,8 @@ class Job:
    60 62
     
    
    61 63
             self.__execute_response = None
    
    62 64
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    65
    +        self.__operation_canceled = False
    
    66
    +        self.__lease_canceled = False
    
    63 67
     
    
    64 68
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    65 69
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    ... ... @@ -128,7 +132,8 @@ class Job:
    128 132
                 queue (queue.Queue): the event queue to register.
    
    129 133
             """
    
    130 134
             self._operation_update_queues.append(queue)
    
    131
    -        queue.put(self._operation)
    
    135
    +
    
    136
    +        queue.put(self)
    
    132 137
     
    
    133 138
         def unregister_client(self, queue):
    
    134 139
             """Unsubscribes to the job's :class:`Operation` stage change events.
    
    ... ... @@ -153,6 +158,8 @@ class Job:
    153 158
             """
    
    154 159
             if self._lease is not None:
    
    155 160
                 return None
    
    161
    +        elif self.__lease_canceled:
    
    162
    +            return None
    
    156 163
     
    
    157 164
             self._lease = bots_pb2.Lease()
    
    158 165
             self._lease.id = self._name
    
    ... ... @@ -196,6 +203,15 @@ class Job:
    196 203
                 self.__execute_response.cached_result = False
    
    197 204
                 self.__execute_response.status.CopyFrom(status)
    
    198 205
     
    
    206
    +    def cancel_lease(self):
    
    207
    +        """Triggers a job's :class:Lease cancellation.
    
    208
    +
    
    209
    +        This will not cancel the job's :class:Operation.
    
    210
    +        """
    
    211
    +        self.__lease_canceled = True
    
    212
    +        if self._lease is not None:
    
    213
    +            self.update_lease_state(LeaseState.CANCELLED)
    
    214
    +
    
    199 215
         def update_operation_stage(self, stage):
    
    200 216
             """Operates a stage transition for the job's :class:Operation.
    
    201 217
     
    
    ... ... @@ -218,4 +234,28 @@ class Job:
    218 234
             self._operation.metadata.Pack(self.__operation_metadata)
    
    219 235
     
    
    220 236
             for queue in self._operation_update_queues:
    
    221
    -            queue.put(self._operation)
    237
    +            queue.put(self)
    
    238
    +
    
    239
    +    def check_operation_status(self):
    
    240
    +        """Reports errors on unexpected job's :class:Operation state.
    
    241
    +
    
    242
    +        Raises:
    
    243
    +            CancelledError: if the job's :class:Operation was cancelled.
    
    244
    +        """
    
    245
    +        if self.__operation_canceled:
    
    246
    +            raise CancelledError(self.__execute_response.status.message)
    
    247
    +
    
    248
    +    def cancel_operation(self):
    
    249
    +        """Triggers a job's :class:Operation cancellation.
    
    250
    +
    
    251
    +        This will also cancel any job's :class:Lease that may have been issued.
    
    252
    +        """
    
    253
    +        self.__operation_canceled = True
    
    254
    +        if self._lease is not None:
    
    255
    +            self.cancel_lease()
    
    256
    +
    
    257
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    258
    +        self.__execute_response.status.code = code_pb2.CANCELLED
    
    259
    +        self.__execute_response.status.message = "The operation was cancelled by the caller."
    
    260
    +
    
    261
    +        self.update_operation_stage(OperationStage.COMPLETED)

  • buildgrid/server/scheduler.py
    ... ... @@ -128,3 +128,13 @@ class Scheduler:
    128 128
         def get_job_operation(self, job_name):
    
    129 129
             """Returns the operation associated to job."""
    
    130 130
             return self.jobs[job_name].operation
    
    131
    +
    
    132
    +    def cancel_job_operation(self, job_name):
    
    133
    +        """"Cancels the underlying operation of a given job.
    
    134
    +
    
    135
    +        This will also cancel any job's lease that may have been issued.
    
    136
    +
    
    137
    +        Args:
    
    138
    +            job_name (str): name of the job holding the operation to cancel.
    
    139
    +        """
    
    140
    +        self.jobs[job_name].cancel_operation()



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]