[Notes] [Git][BuildGrid/buildgrid][finn/update-execution-api] Added WaitExecution method for Remote Execution



Title: GitLab

finnball pushed to branch finn/update-execution-api at BuildGrid / buildgrid

Commits:

4 changed files:

Changes:

  • app/commands/cmd_execute.py
    ... ... @@ -53,32 +53,23 @@ def cli(context, host, port):
    53 53
     @pass_context
    
    54 54
     def request(context, number, instance_name, wait_for_completion):
    
    55 55
         action_digest = remote_execution_pb2.Digest()
    
    56
    -    action_digest.hash = 'zhora'
    
    57 56
     
    
    58 57
         context.logger.info("Sending execution request...\n")
    
    59 58
         stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
    
    60 59
     
    
    61 60
         request = remote_execution_pb2.ExecuteRequest(instance_name = instance_name,
    
    62 61
                                                       action_digest = action_digest,
    
    63
    -                                                  skip_cache_lookup = True)
    
    62
    +                                                  skip_cache_lookup = False)
    
    63
    +    responses = []
    
    64 64
         for i in range(0, number):
    
    65
    -        response = stub.Execute(request)
    
    66
    -        for r in response:
    
    67
    -            context.logger.info(r)
    
    68
    -
    
    69
    -    try:
    
    70
    -        while wait_for_completion:
    
    71
    -            request = operations_pb2.ListOperationsRequest()
    
    72
    -            context.logger.debug('Querying to see if jobs are complete.')
    
    73
    -            stub = operations_pb2_grpc.OperationsStub(context.channel)
    
    74
    -            response = stub.ListOperations(request)
    
    75
    -            if all(operation.done for operation in response.operations):
    
    76
    -                context.logger.info('Jobs complete')
    
    77
    -                break
    
    78
    -            time.sleep(1)
    
    79
    -
    
    80
    -    except KeyboardInterrupt:
    
    81
    -        pass
    
    65
    +        responses.append(stub.Execute(request))
    
    66
    +
    
    67
    +    for response in responses:
    
    68
    +        if wait_for_completion:
    
    69
    +            for stream in response:
    
    70
    +                context.logger.info(stream)
    
    71
    +        else:
    
    72
    +            context.logger.info(next(response))
    
    82 73
     
    
    83 74
     @cli.command('status', short_help='Get the status of an operation')
    
    84 75
     @click.argument('operation-name')
    
    ... ... @@ -108,3 +99,15 @@ def list_operations(context):
    108 99
     
    
    109 100
         for op in response.operations:
    
    110 101
             context.logger.info(op)
    
    102
    +
    
    103
    +@cli.command('wait', short_help='Streams an operation until it is complete')
    
    104
    +@click.argument('operation-name')
    
    105
    +@pass_context
    
    106
    +def wait_execution(context, operation_name):
    
    107
    +    stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
    
    108
    +    request = remote_execution_pb2.WaitExecutionRequest(name=operation_name)
    
    109
    +
    
    110
    +    response = stub.WaitExecution(request)
    
    111
    +
    
    112
    +    for stream in response:
    
    113
    +        context.logger.info(stream)

  • buildgrid/server/execution/execution_instance.py
    ... ... @@ -50,7 +50,6 @@ class ExecutionInstance():
    50 50
             return job.get_operation()
    
    51 51
     
    
    52 52
         def get_operation(self, name):
    
    53
    -        self.logger.debug("Getting operation: {}".format(name))
    
    54 53
             operation = self._scheduler.jobs.get(name)
    
    55 54
             if operation is None:
    
    56 55
                 raise InvalidArgumentError("Operation name does not exist: {}".format(name))
    
    ... ... @@ -60,11 +59,9 @@ class ExecutionInstance():
    60 59
         def list_operations(self, name, list_filter, page_size, page_token):
    
    61 60
             # TODO: Pages
    
    62 61
             # Spec says number of pages and length of a page are optional
    
    63
    -        self.logger.debug("Listing operations")
    
    64 62
             return self._scheduler.get_operations()
    
    65 63
     
    
    66 64
         def delete_operation(self, name):
    
    67
    -        self.logger.debug("Deleting operation {}".format(name))
    
    68 65
             try:
    
    69 66
                 self._scheduler.jobs.pop(name)
    
    70 67
             except KeyError:
    

  • buildgrid/server/execution/execution_service.py
    ... ... @@ -22,8 +22,10 @@ ExecutionService
    22 22
     Serves remote execution requests.
    
    23 23
     """
    
    24 24
     
    
    25
    +import copy
    
    25 26
     import grpc
    
    26 27
     import logging
    
    28
    +import time
    
    27 29
     
    
    28 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    29 31
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    ... ... @@ -40,17 +42,36 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    40 42
             # Ignore request.instance_name for now
    
    41 43
             # Have only one instance
    
    42 44
             try:
    
    43
    -            yield self._instance.execute(request.action_digest,
    
    44
    -                                         request.skip_cache_lookup)
    
    45
    +            operation = self._instance.execute(request.action_digest,
    
    46
    +                                               request.skip_cache_lookup)
    
    47
    +
    
    48
    +            yield from self._stream_operation_updates(operation.name)
    
    45 49
     
    
    46 50
             except InvalidArgumentError as e:
    
    47 51
                 self.logger.error(e)
    
    48 52
                 context.set_details(str(e))
    
    49 53
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    50
    -            yield operations_pb2.Operation()
    
    51 54
     
    
    52 55
             except NotImplementedError as e:
    
    53 56
                 self.logger.error(e)
    
    54 57
                 context.set_details(str(e))
    
    55 58
                 context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    56
    -            yield operations_pb2.Operation()
    59
    +
    
    60
    +    def WaitExecution(self, request, context):
    
    61
    +        try:
    
    62
    +            yield from self._stream_operation_updates(request.name)
    
    63
    +
    
    64
    +        except InvalidArgumentError as e:
    
    65
    +            self.logger.error(e)
    
    66
    +            context.set_details(str(e))
    
    67
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    68
    +
    
    69
    +    def _stream_operation_updates(self, name):
    
    70
    +        stream_previous = None
    
    71
    +        while True:
    
    72
    +            stream = self._instance.get_operation(name)
    
    73
    +            if stream != stream_previous:
    
    74
    +                yield stream
    
    75
    +                if stream.done == True: break
    
    76
    +                stream_previous = copy.deepcopy(stream)
    
    77
    +                time.sleep(1)

  • tests/integration/execution_service.py
    ... ... @@ -55,16 +55,32 @@ def test_execute(skip_cache_lookup, instance, context):
    55 55
         request = remote_execution_pb2.ExecuteRequest(instance_name = '',
    
    56 56
                                                       action_digest = action_digest,
    
    57 57
                                                       skip_cache_lookup = skip_cache_lookup)
    
    58
    -    response = instance.Execute(request, context)
    
    59
    -
    
    60
    -    for result in response:
    
    61
    -        assert isinstance(result, operations_pb2.Operation)
    
    62
    -
    
    63
    -        if skip_cache_lookup is False:
    
    64
    -            context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    65
    -        else:
    
    66
    -            metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    67
    -            result.metadata.Unpack(metadata)
    
    68
    -            assert metadata.stage == job.ExecuteStage.QUEUED.value
    
    69
    -            assert uuid.UUID(result.name, version=4)
    
    70
    -            assert result.done is False
    58
    +    response = next(instance.Execute(request, context))
    
    59
    +
    
    60
    +
    
    61
    +    assert isinstance(response, operations_pb2.Operation)
    
    62
    +
    
    63
    +    if skip_cache_lookup is False:
    
    64
    +        context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    65
    +    else:
    
    66
    +        metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    67
    +        response.metadata.Unpack(metadata)
    
    68
    +        assert metadata.stage == job.ExecuteStage.QUEUED.value
    
    69
    +        assert uuid.UUID(response.name, version=4)
    
    70
    +        assert response.done is False
    
    71
    +
    
    72
    +def test_wait_execution(instance, context):
    
    73
    +    action_digest = remote_execution_pb2.Digest()
    
    74
    +    action_digest.hash = 'zhora'
    
    75
    +
    
    76
    +    execution_request = remote_execution_pb2.ExecuteRequest(instance_name = '',
    
    77
    +                                                            action_digest = action_digest,
    
    78
    +                                                            skip_cache_lookup = True)
    
    79
    +    execution_response = next(instance.Execute(execution_request, context))
    
    80
    +
    
    81
    +
    
    82
    +    request = remote_execution_pb2.WaitExecutionRequest(name=execution_response.name)
    
    83
    +
    
    84
    +    response = next(instance.WaitExecution(request, context))
    
    85
    +
    
    86
    +    assert response == execution_response



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]