[Notes] [Git][BuildGrid/buildgrid][finn/74-operation-cancelation] 3 commits: Message queue now queues the job.



Title: GitLab

finn pushed to branch finn/74-operation-cancelation at BuildGrid / buildgrid

Commits:

5 changed files:

Changes:

  • buildgrid/_app/commands/cmd_operation.py
    ... ... @@ -155,6 +155,25 @@ def status(context, operation_name, json):
    155 155
             click.echo(json_format.MessageToJson(operation))
    
    156 156
     
    
    157 157
     
    
    158
    +@cli.command('cancel', short_help="Cancel an operation.")
    
    159
    +@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
    
    160
    +@pass_context
    
    161
    +def cancel(context, operation_name):
    
    162
    +    context.logger.info("Cancelling an operation...")
    
    163
    +    stub = operations_pb2_grpc.OperationsStub(context.channel)
    
    164
    +
    
    165
    +    request = operations_pb2.CancelOperationRequest(name=operation_name)
    
    166
    +
    
    167
    +    try:
    
    168
    +        stub.CancelOperation(request)
    
    169
    +    except grpc.RpcError as e:
    
    170
    +        status_code = e.code()
    
    171
    +        if status_code != grpc.StatusCode.CANCELLED:
    
    172
    +            raise
    
    173
    +
    
    174
    +    context.logger.info("Operation cancelled: [{}]".format(request))
    
    175
    +
    
    176
    +
    
    158 177
     @cli.command('list', short_help="List operations.")
    
    159 178
     @click.option('--json', is_flag=True, show_default=True,
    
    160 179
                   help="Print operations list in JSON format.")
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -71,8 +71,11 @@ class ExecutionInstance:
    71 71
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    72 72
     
    
    73 73
         def stream_operation_updates(self, message_queue, operation_name):
    
    74
    -        operation = message_queue.get()
    
    75
    -        while not operation.done:
    
    76
    -            yield operation
    
    77
    -            operation = message_queue.get()
    
    78
    -        yield operation
    74
    +        job = message_queue.get()
    
    75
    +        while not job.operation.done:
    
    76
    +            yield job.operation
    
    77
    +            job = message_queue.get()
    
    78
    +
    
    79
    +        job.check_operation_status()
    
    80
    +
    
    81
    +        yield job.operation

  • buildgrid/server/job.py
    ... ... @@ -113,7 +113,7 @@ class Job:
    113 113
                 queue (queue.Queue): the event queue to register.
    
    114 114
             """
    
    115 115
             self._operation_update_queues.append(queue)
    
    116
    -        queue.put(self._operation)
    
    116
    +        queue.put(self)
    
    117 117
     
    
    118 118
         def unregister_client(self, queue):
    
    119 119
             """Unsubscribes to the job's :class:`Operation` stage change events.
    
    ... ... @@ -229,7 +229,16 @@ class Job:
    229 229
             self._operation.metadata.Pack(self.__operation_metadata)
    
    230 230
     
    
    231 231
             for queue in self._operation_update_queues:
    
    232
    -            queue.put(self._operation)
    
    232
    +            queue.put(self)
    
    233
    +
    
    234
    +    def check_operation_status(self):
    
    235
    +        """Reports errors on unexpected job's :class:Operation state.
    
    236
    +
    
    237
    +        Raises:
    
    238
    +            CancelledError: if the job's :class:Operation was cancelled.
    
    239
    +        """
    
    240
    +        if self.__operation_cancelled:
    
    241
    +            raise CancelledError(self.__execute_response.status.message)
    
    233 242
     
    
    234 243
         def cancel_operation(self):
    
    235 244
             """Triggers a job's :class:Operation cancellation.
    

  • buildgrid/server/operations/instance.py
    ... ... @@ -86,8 +86,11 @@ class OperationsInstance:
    86 86
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    87 87
     
    
    88 88
         def stream_operation_updates(self, message_queue, operation_name):
    
    89
    -        operation = message_queue.get()
    
    90
    -        while not operation.done:
    
    91
    -            yield operation
    
    92
    -            operation = message_queue.get()
    
    93
    -        yield operation
    89
    +        job = message_queue.get()
    
    90
    +        while not job.operation.done:
    
    91
    +            yield job.operation
    
    92
    +            job = message_queue.get()
    
    93
    +
    
    94
    +        job.check_operation_status()
    
    95
    +
    
    96
    +        yield job.operation

  • tests/integration/operations_service.py
    ... ... @@ -24,6 +24,7 @@ import grpc
    24 24
     from grpc._server import _Context
    
    25 25
     import pytest
    
    26 26
     
    
    27
    +from buildgrid._enums import OperationStage
    
    27 28
     from buildgrid._exceptions import InvalidArgumentError
    
    28 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 30
     from buildgrid._protos.google.longrunning import operations_pb2
    
    ... ... @@ -236,12 +237,26 @@ def test_delete_operation_fail(instance, context):
    236 237
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    237 238
     
    
    238 239
     
    
    239
    -def test_cancel_operation(instance, context):
    
    240
    +def test_cancel_operation(instance, controller, execute_request, context):
    
    241
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    242
    +                                                             execute_request.skip_cache_lookup)
    
    243
    +
    
    240 244
         request = operations_pb2.CancelOperationRequest()
    
    241
    -    request.name = "{}/{}".format(instance_name, "runner")
    
    245
    +    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    246
    +
    
    242 247
         instance.CancelOperation(request, context)
    
    243 248
     
    
    244
    -    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    249
    +    context.set_code.assert_called_once_with(grpc.StatusCode.CANCELLED)
    
    250
    +
    
    251
    +    request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    252
    +    response = instance.ListOperations(request, context)
    
    253
    +
    
    254
    +    assert len(response.operations) is 1
    
    255
    +
    
    256
    +    for operation in response.operations:
    
    257
    +        operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    258
    +        operation.metadata.Unpack(operation_metadata)
    
    259
    +        assert operation_metadata.stage == OperationStage.COMPLETED.value
    
    245 260
     
    
    246 261
     
    
    247 262
     def test_cancel_operation_blank(blank_instance, context):
    
    ... ... @@ -249,7 +264,7 @@ def test_cancel_operation_blank(blank_instance, context):
    249 264
         request.name = "runner"
    
    250 265
         blank_instance.CancelOperation(request, context)
    
    251 266
     
    
    252
    -    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    267
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    253 268
     
    
    254 269
     
    
    255 270
     def test_cancel_operation_instance_fail(instance, context):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]