[Notes] [Git][BuildGrid/buildgrid][finn/74-operation-cancelation] 6 commits: Catch cancelled operations in service.



Title: GitLab

finn pushed to branch finn/74-operation-cancelation at BuildGrid / buildgrid

Commits:

7 changed files:

Changes:

  • buildgrid/_app/commands/cmd_operation.py
    ... ... @@ -155,6 +155,19 @@ def status(context, operation_name, json):
    155 155
             click.echo(json_format.MessageToJson(operation))
    
    156 156
     
    
    157 157
     
    
    158
    +@cli.command('cancel', short_help="Cancel an operation.")
    
    159
    +@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
    
    160
    +@pass_context
    
    161
    +def cancel(context, operation_name):
    
    162
    +    context.logger.info("Cancelling an operation...")
    
    163
    +    stub = operations_pb2_grpc.OperationsStub(context.channel)
    
    164
    +
    
    165
    +    request = operations_pb2.CancelOperationRequest(name=operation_name)
    
    166
    +
    
    167
    +    stub.CancelOperation(request)
    
    168
    +    context.logger.info("Operation cancelled: [{}]".format(request))
    
    169
    +
    
    170
    +
    
    158 171
     @cli.command('list', short_help="List operations.")
    
    159 172
     @click.option('--json', is_flag=True, show_default=True,
    
    160 173
                   help="Print operations list in JSON format.")
    

  • buildgrid/server/execution/service.py
    ... ... @@ -26,7 +26,7 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    29
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
    
    30 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32 32
     
    
    ... ... @@ -76,6 +76,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    76 76
                 context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    77 77
                 yield operations_pb2.Operation()
    
    78 78
     
    
    79
    +        except CancelledError as e:
    
    80
    +            self.logger.error(e)
    
    81
    +            context.set_details(str(e))
    
    82
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    83
    +            yield operations_pb2.Operation()
    
    84
    +
    
    79 85
         def WaitExecution(self, request, context):
    
    80 86
             try:
    
    81 87
                 names = request.name.split("/")
    
    ... ... @@ -106,6 +112,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    106 112
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    107 113
                 yield operations_pb2.Operation()
    
    108 114
     
    
    115
    +        except CancelledError as e:
    
    116
    +            self.logger.error(e)
    
    117
    +            context.set_details(str(e))
    
    118
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    119
    +            yield operations_pb2.Operation()
    
    120
    +
    
    109 121
         def _get_instance(self, name):
    
    110 122
             try:
    
    111 123
                 return self._instances[name]
    

  • buildgrid/server/job.py
    ... ... @@ -19,6 +19,7 @@ import uuid
    19 19
     from google.protobuf import timestamp_pb2
    
    20 20
     
    
    21 21
     from buildgrid._enums import LeaseState, OperationStage
    
    22
    +from buildgrid._exceptions import CancelledError
    
    22 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    23 24
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    24 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    ... ... @@ -37,10 +38,14 @@ class Job:
    37 38
     
    
    38 39
             self.__execute_response = None
    
    39 40
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    41
    +
    
    40 42
             self.__queued_timestamp = timestamp_pb2.Timestamp()
    
    41 43
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    42 44
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    43 45
     
    
    46
    +        self.__operation_cancelled = False
    
    47
    +        self.__lease_cancelled = False
    
    48
    +
    
    44 49
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    45 50
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    46 51
     
    
    ... ... @@ -131,7 +136,9 @@ class Job:
    131 136
             Only one :class:`Lease` can be emitted for a given job. This method
    
    132 137
             should only be used once, any furhter calls are ignored.
    
    133 138
             """
    
    134
    -        if self._lease is not None:
    
    139
    +        if self.__operation_cancelled:
    
    140
    +            return None
    
    141
    +        elif self._lease is not None:
    
    135 142
                 return None
    
    136 143
     
    
    137 144
             self._lease = bots_pb2.Lease()
    
    ... ... @@ -189,6 +196,15 @@ class Job:
    189 196
                 self.__execute_response.cached_result = False
    
    190 197
                 self.__execute_response.status.CopyFrom(status)
    
    191 198
     
    
    199
    +    def cancel_lease(self):
    
    200
    +        """Triggers a job's :class:Lease cancellation.
    
    201
    +
    
    202
    +        This will not cancel the job's :class:Operation.
    
    203
    +        """
    
    204
    +        self.__lease_cancelled = True
    
    205
    +        if self._lease is not None:
    
    206
    +            self.update_lease_state(LeaseState.CANCELLED)
    
    207
    +
    
    192 208
         def update_operation_stage(self, stage):
    
    193 209
             """Operates a stage transition for the job's :class:Operation.
    
    194 210
     
    
    ... ... @@ -214,3 +230,20 @@ class Job:
    214 230
     
    
    215 231
             for queue in self._operation_update_queues:
    
    216 232
                 queue.put(self._operation)
    
    233
    +
    
    234
    +    def cancel_operation(self):
    
    235
    +        """Triggers a job's :class:Operation cancellation.
    
    236
    +
    
    237
    +        This will also cancel any job's :class:Lease that may have been issued.
    
    238
    +        """
    
    239
    +        self.__operation_cancelled = True
    
    240
    +        if self._lease is not None:
    
    241
    +            self.cancel_lease()
    
    242
    +
    
    243
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    244
    +        self.__execute_response.status.code = code_pb2.CANCELLED
    
    245
    +        self.__execute_response.status.message = "Operation cancelled by client."
    
    246
    +
    
    247
    +        self.update_operation_stage(OperationStage.COMPLETED)
    
    248
    +
    
    249
    +        raise CancelledError("Operation cancelled: {}".format(self._name))

  • buildgrid/server/operations/instance.py
    ... ... @@ -64,6 +64,13 @@ class OperationsInstance:
    64 64
             except KeyError:
    
    65 65
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    66 66
     
    
    67
    +    def cancel_operation(self, name):
    
    68
    +        try:
    
    69
    +            self._scheduler.cancel_job_operation(name)
    
    70
    +
    
    71
    +        except KeyError:
    
    72
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    73
    +
    
    67 74
         def register_message_client(self, name, queue):
    
    68 75
             try:
    
    69 76
                 self._scheduler.register_client(name, queue)
    
    ... ... @@ -84,7 +91,3 @@ class OperationsInstance:
    84 91
                 yield operation
    
    85 92
                 operation = message_queue.get()
    
    86 93
             yield operation
    87
    -
    
    88
    -    def cancel_operation(self, name):
    
    89
    -        # TODO: Cancel leases
    
    90
    -        raise NotImplementedError("Cancelled operations not supported")

  • buildgrid/server/operations/service.py
    ... ... @@ -25,7 +25,7 @@ import grpc
    25 25
     
    
    26 26
     from google.protobuf.empty_pb2 import Empty
    
    27 27
     
    
    28
    -from buildgrid._exceptions import InvalidArgumentError
    
    28
    +from buildgrid._exceptions import CancelledError, InvalidArgumentError
    
    29 29
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    30 30
     
    
    31 31
     
    
    ... ... @@ -112,10 +112,10 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    112 112
                 operation_name = self._parse_operation_name(name)
    
    113 113
                 instance.cancel_operation(operation_name)
    
    114 114
     
    
    115
    -        except NotImplementedError as e:
    
    115
    +        except CancelledError as e:
    
    116 116
                 self.logger.error(e)
    
    117 117
                 context.set_details(str(e))
    
    118
    -            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    118
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    119 119
     
    
    120 120
             except InvalidArgumentError as e:
    
    121 121
                 self.logger.error(e)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -94,7 +94,10 @@ class Scheduler:
    94 94
             # For now, one lease at a time:
    
    95 95
             lease = job.create_lease()
    
    96 96
     
    
    97
    -        return [lease]
    
    97
    +        if lease:
    
    98
    +            return [lease]
    
    99
    +
    
    100
    +        return None
    
    98 101
     
    
    99 102
         def update_job_lease_state(self, job_name, lease_state, lease_status=None, lease_result=None):
    
    100 103
             """Requests a state transition for a job's current :class:Lease.
    
    ... ... @@ -133,3 +136,13 @@ class Scheduler:
    133 136
         def get_job_operation(self, job_name):
    
    134 137
             """Returns the operation associated to job."""
    
    135 138
             return self.jobs[job_name].operation
    
    139
    +
    
    140
    +    def cancel_job_operation(self, job_name):
    
    141
    +        """"Cancels the underlying operation of a given job.
    
    142
    +
    
    143
    +        This will also cancel any job's lease that may have been issued.
    
    144
    +
    
    145
    +        Args:
    
    146
    +            job_name (str): name of the job holding the operation to cancel.
    
    147
    +        """
    
    148
    +        self.jobs[job_name].cancel_operation()

  • tests/integration/operations_service.py
    ... ... @@ -24,6 +24,7 @@ import grpc
    24 24
     from grpc._server import _Context
    
    25 25
     import pytest
    
    26 26
     
    
    27
    +from buildgrid._enums import OperationStage
    
    27 28
     from buildgrid._exceptions import InvalidArgumentError
    
    28 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 30
     from buildgrid._protos.google.longrunning import operations_pb2
    
    ... ... @@ -236,12 +237,26 @@ def test_delete_operation_fail(instance, context):
    236 237
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    237 238
     
    
    238 239
     
    
    239
    -def test_cancel_operation(instance, context):
    
    240
    +def test_cancel_operation(instance, controller, execute_request, context):
    
    241
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    242
    +                                                             execute_request.skip_cache_lookup)
    
    243
    +
    
    240 244
         request = operations_pb2.CancelOperationRequest()
    
    241
    -    request.name = "{}/{}".format(instance_name, "runner")
    
    245
    +    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    246
    +
    
    242 247
         instance.CancelOperation(request, context)
    
    243 248
     
    
    244
    -    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    249
    +    context.set_code.assert_called_once_with(grpc.StatusCode.CANCELLED)
    
    250
    +
    
    251
    +    request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    252
    +    response = instance.ListOperations(request, context)
    
    253
    +
    
    254
    +    assert len(response.operations) is 1
    
    255
    +
    
    256
    +    for operation in response.operations:
    
    257
    +        operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    258
    +        operation.metadata.Unpack(operation_metadata)
    
    259
    +        assert operation_metadata.stage == OperationStage.COMPLETED.value
    
    245 260
     
    
    246 261
     
    
    247 262
     def test_cancel_operation_blank(blank_instance, context):
    
    ... ... @@ -249,7 +264,7 @@ def test_cancel_operation_blank(blank_instance, context):
    249 264
         request.name = "runner"
    
    250 265
         blank_instance.CancelOperation(request, context)
    
    251 266
     
    
    252
    -    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    267
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    253 268
     
    
    254 269
     
    
    255 270
     def test_cancel_operation_instance_fail(instance, context):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]