[Notes] [Git][BuildGrid/buildgrid][mablanch/74-operation-cancelation] 5 commits: job.py: Implement operation cancellation



Title: GitLab

Martin Blanchard pushed to branch mablanch/74-operation-cancelation at BuildGrid / buildgrid

Commits:

5 changed files:

Changes:

  • buildgrid/_exceptions.py
    ... ... @@ -52,6 +52,11 @@ class BotError(BgdError):
    52 52
             super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
    
    53 53
     
    
    54 54
     
    
    55
    +class CancelledError(BgdError):
    
    56
    +    def __init__(self, message, detail=None, reason=None):
    
    57
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    58
    +
    
    59
    +
    
    55 60
     class InvalidArgumentError(BgdError):
    
    56 61
         """A bad argument was passed, such as a name which doesn't exist."""
    
    57 62
         def __init__(self, message, detail=None, reason=None):
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,8 +21,9 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    25
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    24
    +from buildgrid._exceptions import CancelledError, FailedPreconditionError, InvalidArgumentError
    
    25
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    26
    +from buildgrid._protos.google.rpc import code_pb2
    
    26 27
     
    
    27 28
     from ..job import Job
    
    28 29
     
    
    ... ... @@ -43,7 +44,7 @@ class ExecutionInstance:
    43 44
             this action.
    
    44 45
             """
    
    45 46
     
    
    46
    -        action = self._storage.get_message(action_digest, Action)
    
    47
    +        action = self._storage.get_message(action_digest, remote_execution_pb2.Action)
    
    47 48
     
    
    48 49
             if not action:
    
    49 50
                 raise FailedPreconditionError("Could not get action from storage.")
    
    ... ... @@ -77,4 +78,11 @@ class ExecutionInstance:
    77 78
             while not operation.done:
    
    78 79
                 yield operation
    
    79 80
                 operation = message_queue.get()
    
    81
    +
    
    82
    +        response = remote_execution_pb2.ExecuteResponse()
    
    83
    +        operation.response.Unpack(response)
    
    84
    +
    
    85
    +        if response.status.code == code_pb2.CANCELLED:
    
    86
    +            raise CancelledError(response.status.message)
    
    87
    +
    
    80 88
             yield operation

  • buildgrid/server/execution/service.py
    ... ... @@ -26,7 +26,7 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    29
    +from buildgrid._exceptions import CancelledError, FailedPreconditionError, InvalidArgumentError
    
    30 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32 32
     
    
    ... ... @@ -67,6 +67,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    67 67
                 context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    68 68
                 yield operations_pb2.Operation()
    
    69 69
     
    
    70
    +        except CancelledError as e:
    
    71
    +            self.logger.error(e)
    
    72
    +            context.set_details(str(e))
    
    73
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    74
    +            yield operations_pb2.Operation()
    
    75
    +
    
    70 76
         def WaitExecution(self, request, context):
    
    71 77
             try:
    
    72 78
                 names = request.name.split("/")
    

  • buildgrid/server/job.py
    ... ... @@ -20,6 +20,7 @@ from enum import Enum
    20 20
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    21 21
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    22 22
     from buildgrid._protos.google.longrunning import operations_pb2
    
    23
    +from buildgrid._protos.google.rpc import code_pb2
    
    23 24
     
    
    24 25
     
    
    25 26
     class OperationStage(Enum):
    
    ... ... @@ -60,6 +61,7 @@ class Job:
    60 61
     
    
    61 62
             self.__execute_response = None
    
    62 63
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    64
    +        self.__operation_canceled = False
    
    63 65
     
    
    64 66
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    65 67
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    ... ... @@ -196,6 +198,10 @@ class Job:
    196 198
                 self.__execute_response.cached_result = False
    
    197 199
                 self.__execute_response.status.CopyFrom(status)
    
    198 200
     
    
    201
    +    def cancel_lease(self):
    
    202
    +        if self._lease is not None:
    
    203
    +            self.update_lease_state(LeaseState.CANCELLED)
    
    204
    +
    
    199 205
         def update_operation_stage(self, stage):
    
    200 206
             """Operates a stage transition for the job's :class:Operation.
    
    201 207
     
    
    ... ... @@ -219,3 +225,14 @@ class Job:
    219 225
     
    
    220 226
             for queue in self._operation_update_queues:
    
    221 227
                 queue.put(self._operation)
    
    228
    +
    
    229
    +    def cancel_operation(self):
    
    230
    +        self.__operation_canceled = True
    
    231
    +        if self._lease is not None:
    
    232
    +            self.cancel_lease()
    
    233
    +
    
    234
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    235
    +        self.__execute_response.status.code = code_pb2.CANCELLED
    
    236
    +        self.__execute_response.status.message = "The operation was cancelled by the caller."
    
    237
    +
    
    238
    +        self.update_operation_stage(OperationStage.COMPLETED)

  • buildgrid/server/scheduler.py
    ... ... @@ -128,3 +128,13 @@ class Scheduler:
    128 128
         def get_job_operation(self, job_name):
    
    129 129
             """Returns the operation associated to job."""
    
    130 130
             return self.jobs[job_name].operation
    
    131
    +
    
    132
    +    def cancel_job_operation(self, job_name):
    
    133
    +        """"Cancels the underlying operation of a given job.
    
    134
    +
    
    135
    +        This will also cancel any job's lease that may have been issued.
    
    136
    +
    
    137
    +        Args:
    
    138
    +            job_name (str): name of the job holding the operation to cancel.
    
    139
    +        """
    
    140
    +        self.jobs[job_name].cancel_operation()



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]