[Notes] [Git][BuildGrid/buildgrid][master] 10 commits: Explictly state the OK code.



Title: GitLab

finn pushed to branch master at BuildGrid / buildgrid

Commits:

9 changed files:

Changes:

  • buildgrid/_app/commands/cmd_operation.py
    ... ... @@ -153,6 +153,18 @@ def status(context, operation_name, json):
    153 153
             click.echo(json_format.MessageToJson(operation))
    
    154 154
     
    
    155 155
     
    
    156
    +@cli.command('cancel', short_help="Cancel an operation.")
    
    157
    +@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
    
    158
    +@pass_context
    
    159
    +def cancel(context, operation_name):
    
    160
    +    click.echo("Cancelling an operation...")
    
    161
    +    stub = operations_pb2_grpc.OperationsStub(context.channel)
    
    162
    +    request = operations_pb2.CancelOperationRequest(name=operation_name)
    
    163
    +
    
    164
    +    stub.CancelOperation(request)
    
    165
    +    click.echo("Operation cancelled: [{}]".format(request))
    
    166
    +
    
    167
    +
    
    156 168
     @cli.command('list', short_help="List operations.")
    
    157 169
     @click.option('--json', is_flag=True, show_default=True,
    
    158 170
                   help="Print operations list in JSON format.")
    

  • buildgrid/_exceptions.py
    ... ... @@ -52,6 +52,12 @@ class BotError(BgdError):
    52 52
             super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
    
    53 53
     
    
    54 54
     
    
    55
    +class CancelledError(BgdError):
    
    56
    +    """The job was cancelled and any callers should be notified"""
    
    57
    +    def __init__(self, message, detail=None, reason=None):
    
    58
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    59
    +
    
    60
    +
    
    55 61
     class InvalidArgumentError(BgdError):
    
    56 62
         """A bad argument was passed, such as a name which doesn't exist."""
    
    57 63
         def __init__(self, message, detail=None, reason=None):
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -72,8 +72,10 @@ class ExecutionInstance:
    72 72
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    73 73
     
    
    74 74
         def stream_operation_updates(self, message_queue, operation_name):
    
    75
    -        operation = message_queue.get()
    
    76
    -        while not operation.done:
    
    77
    -            yield operation
    
    78
    -            operation = message_queue.get()
    
    79
    -        yield operation
    75
    +        job = message_queue.get()
    
    76
    +        while not job.operation.done:
    
    77
    +            yield job.operation
    
    78
    +            job = message_queue.get()
    
    79
    +            job.check_operation_status()
    
    80
    +
    
    81
    +        yield job.operation

  • buildgrid/server/execution/service.py
    ... ... @@ -26,7 +26,7 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    29
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
    
    30 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32 32
     
    
    ... ... @@ -79,6 +79,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    79 79
                 context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    80 80
                 yield operations_pb2.Operation()
    
    81 81
     
    
    82
    +        except CancelledError as e:
    
    83
    +            self.__logger.error(e)
    
    84
    +            context.set_details(str(e))
    
    85
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    86
    +            yield operations_pb2.Operation()
    
    87
    +
    
    82 88
         def WaitExecution(self, request, context):
    
    83 89
             self.__logger.debug("WaitExecution request from [%s]", context.peer())
    
    84 90
     
    
    ... ... @@ -111,6 +117,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    111 117
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    112 118
                 yield operations_pb2.Operation()
    
    113 119
     
    
    120
    +        except CancelledError as e:
    
    121
    +            self.__logger.error(e)
    
    122
    +            context.set_details(str(e))
    
    123
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    124
    +            yield operations_pb2.Operation()
    
    125
    +
    
    114 126
         def _get_instance(self, name):
    
    115 127
             try:
    
    116 128
                 return self._instances[name]
    

  • buildgrid/server/job.py
    ... ... @@ -19,9 +19,11 @@ import uuid
    19 19
     from google.protobuf import timestamp_pb2
    
    20 20
     
    
    21 21
     from buildgrid._enums import LeaseState, OperationStage
    
    22
    +from buildgrid._exceptions import CancelledError
    
    22 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    23 24
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    24 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    26
    +from buildgrid._protos.google.rpc import code_pb2
    
    25 27
     
    
    26 28
     
    
    27 29
     class Job:
    
    ... ... @@ -36,10 +38,14 @@ class Job:
    36 38
     
    
    37 39
             self.__execute_response = None
    
    38 40
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    41
    +
    
    39 42
             self.__queued_timestamp = timestamp_pb2.Timestamp()
    
    40 43
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    41 44
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    42 45
     
    
    46
    +        self.__operation_cancelled = False
    
    47
    +        self.__lease_cancelled = False
    
    48
    +
    
    43 49
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    44 50
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    45 51
     
    
    ... ... @@ -103,11 +109,13 @@ class Job:
    103 109
         def register_client(self, queue):
    
    104 110
             """Subscribes to the job's :class:`Operation` stage change events.
    
    105 111
     
    
    112
    +        Queues this :object:`Job` instance.
    
    113
    +
    
    106 114
             Args:
    
    107 115
                 queue (queue.Queue): the event queue to register.
    
    108 116
             """
    
    109 117
             self._operation_update_queues.append(queue)
    
    110
    -        queue.put(self._operation)
    
    118
    +        queue.put(self)
    
    111 119
     
    
    112 120
         def unregister_client(self, queue):
    
    113 121
             """Unsubscribes to the job's :class:`Operation` stage change events.
    
    ... ... @@ -130,7 +138,9 @@ class Job:
    130 138
             Only one :class:`Lease` can be emitted for a given job. This method
    
    131 139
             should only be used once, any furhter calls are ignored.
    
    132 140
             """
    
    133
    -        if self._lease is not None:
    
    141
    +        if self.__operation_cancelled:
    
    142
    +            return None
    
    143
    +        elif self._lease is not None:
    
    134 144
                 return None
    
    135 145
     
    
    136 146
             self._lease = bots_pb2.Lease()
    
    ... ... @@ -171,7 +181,7 @@ class Job:
    171 181
                 action_result = remote_execution_pb2.ActionResult()
    
    172 182
     
    
    173 183
                 # TODO: Make a distinction between build and bot failures!
    
    174
    -            if status.code != 0:
    
    184
    +            if status.code != code_pb2.OK:
    
    175 185
                     self._do_not_cache = True
    
    176 186
     
    
    177 187
                 if result is not None:
    
    ... ... @@ -188,6 +198,15 @@ class Job:
    188 198
                 self.__execute_response.cached_result = False
    
    189 199
                 self.__execute_response.status.CopyFrom(status)
    
    190 200
     
    
    201
    +    def cancel_lease(self):
    
    202
    +        """Triggers a job's :class:Lease cancellation.
    
    203
    +
    
    204
    +        This will not cancel the job's :class:Operation.
    
    205
    +        """
    
    206
    +        self.__lease_cancelled = True
    
    207
    +        if self._lease is not None:
    
    208
    +            self.update_lease_state(LeaseState.CANCELLED)
    
    209
    +
    
    191 210
         def update_operation_stage(self, stage):
    
    192 211
             """Operates a stage transition for the job's :class:Operation.
    
    193 212
     
    
    ... ... @@ -212,4 +231,28 @@ class Job:
    212 231
             self._operation.metadata.Pack(self.__operation_metadata)
    
    213 232
     
    
    214 233
             for queue in self._operation_update_queues:
    
    215
    -            queue.put(self._operation)
    234
    +            queue.put(self)
    
    235
    +
    
    236
    +    def check_operation_status(self):
    
    237
    +        """Reports errors on unexpected job's :class:Operation state.
    
    238
    +
    
    239
    +        Raises:
    
    240
    +            CancelledError: if the job's :class:Operation was cancelled.
    
    241
    +        """
    
    242
    +        if self.__operation_cancelled:
    
    243
    +            raise CancelledError(self.__execute_response.status.message)
    
    244
    +
    
    245
    +    def cancel_operation(self):
    
    246
    +        """Triggers a job's :class:Operation cancellation.
    
    247
    +
    
    248
    +        This will also cancel any job's :class:Lease that may have been issued.
    
    249
    +        """
    
    250
    +        self.__operation_cancelled = True
    
    251
    +        if self._lease is not None:
    
    252
    +            self.cancel_lease()
    
    253
    +
    
    254
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    255
    +        self.__execute_response.status.code = code_pb2.CANCELLED
    
    256
    +        self.__execute_response.status.message = "Operation cancelled by client."
    
    257
    +
    
    258
    +        self.update_operation_stage(OperationStage.COMPLETED)

  • buildgrid/server/operations/instance.py
    ... ... @@ -65,6 +65,13 @@ class OperationsInstance:
    65 65
             except KeyError:
    
    66 66
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    67 67
     
    
    68
    +    def cancel_operation(self, name):
    
    69
    +        try:
    
    70
    +            self._scheduler.cancel_job_operation(name)
    
    71
    +
    
    72
    +        except KeyError:
    
    73
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    74
    +
    
    68 75
         def register_message_client(self, name, queue):
    
    69 76
             try:
    
    70 77
                 self._scheduler.register_client(name, queue)
    
    ... ... @@ -80,12 +87,10 @@ class OperationsInstance:
    80 87
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    81 88
     
    
    82 89
         def stream_operation_updates(self, message_queue, operation_name):
    
    83
    -        operation = message_queue.get()
    
    84
    -        while not operation.done:
    
    85
    -            yield operation
    
    86
    -            operation = message_queue.get()
    
    87
    -        yield operation
    
    90
    +        job = message_queue.get()
    
    91
    +        while not job.operation.done:
    
    92
    +            yield job.operation
    
    93
    +            job = message_queue.get()
    
    94
    +            job.check_operation_status()
    
    88 95
     
    
    89
    -    def cancel_operation(self, name):
    
    90
    -        # TODO: Cancel leases
    
    91
    -        raise NotImplementedError("Cancelled operations not supported")
    96
    +        yield job.operation

  • buildgrid/server/operations/service.py
    ... ... @@ -120,11 +120,6 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    120 120
                 operation_name = self._parse_operation_name(name)
    
    121 121
                 instance.cancel_operation(operation_name)
    
    122 122
     
    
    123
    -        except NotImplementedError as e:
    
    124
    -            self.__logger.error(e)
    
    125
    -            context.set_details(str(e))
    
    126
    -            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    127
    -
    
    128 123
             except InvalidArgumentError as e:
    
    129 124
                 self.__logger.error(e)
    
    130 125
                 context.set_details(str(e))
    

  • buildgrid/server/scheduler.py
    ... ... @@ -97,7 +97,10 @@ class Scheduler:
    97 97
             # For now, one lease at a time:
    
    98 98
             lease = job.create_lease()
    
    99 99
     
    
    100
    -        return [lease]
    
    100
    +        if lease:
    
    101
    +            return [lease]
    
    102
    +
    
    103
    +        return None
    
    101 104
     
    
    102 105
         def update_job_lease_state(self, job_name, lease_state, lease_status=None, lease_result=None):
    
    103 106
             """Requests a state transition for a job's current :class:Lease.
    
    ... ... @@ -136,3 +139,13 @@ class Scheduler:
    136 139
         def get_job_operation(self, job_name):
    
    137 140
             """Returns the operation associated to job."""
    
    138 141
             return self.jobs[job_name].operation
    
    142
    +
    
    143
    +    def cancel_job_operation(self, job_name):
    
    144
    +        """"Cancels the underlying operation of a given job.
    
    145
    +
    
    146
    +        This will also cancel any job's lease that may have been issued.
    
    147
    +
    
    148
    +        Args:
    
    149
    +            job_name (str): name of the job holding the operation to cancel.
    
    150
    +        """
    
    151
    +        self.jobs[job_name].cancel_operation()

  • tests/integration/operations_service.py
    ... ... @@ -24,6 +24,7 @@ import grpc
    24 24
     from grpc._server import _Context
    
    25 25
     import pytest
    
    26 26
     
    
    27
    +from buildgrid._enums import OperationStage
    
    27 28
     from buildgrid._exceptions import InvalidArgumentError
    
    28 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 30
     from buildgrid._protos.google.longrunning import operations_pb2
    
    ... ... @@ -236,12 +237,24 @@ def test_delete_operation_fail(instance, context):
    236 237
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    237 238
     
    
    238 239
     
    
    239
    -def test_cancel_operation(instance, context):
    
    240
    +def test_cancel_operation(instance, controller, execute_request, context):
    
    241
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    242
    +                                                             execute_request.skip_cache_lookup)
    
    243
    +
    
    240 244
         request = operations_pb2.CancelOperationRequest()
    
    241
    -    request.name = "{}/{}".format(instance_name, "runner")
    
    245
    +    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    246
    +
    
    242 247
         instance.CancelOperation(request, context)
    
    243 248
     
    
    244
    -    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    249
    +    request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    250
    +    response = instance.ListOperations(request, context)
    
    251
    +
    
    252
    +    assert len(response.operations) is 1
    
    253
    +
    
    254
    +    for operation in response.operations:
    
    255
    +        operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    256
    +        operation.metadata.Unpack(operation_metadata)
    
    257
    +        assert operation_metadata.stage == OperationStage.COMPLETED.value
    
    245 258
     
    
    246 259
     
    
    247 260
     def test_cancel_operation_blank(blank_instance, context):
    
    ... ... @@ -249,7 +262,7 @@ def test_cancel_operation_blank(blank_instance, context):
    249 262
         request.name = "runner"
    
    250 263
         blank_instance.CancelOperation(request, context)
    
    251 264
     
    
    252
    -    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    265
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    253 266
     
    
    254 267
     
    
    255 268
     def test_cancel_operation_instance_fail(instance, context):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]