[Notes] [Git][BuildGrid/buildgrid][finn/update-execution-api] Added WaitExecution method for Remote Execution



Title: GitLab

finnball pushed to branch finn/update-execution-api at BuildGrid / buildgrid

Commits:

2 changed files:

Changes:

  • app/commands/cmd_execute.py
    ... ... @@ -61,24 +61,16 @@ def request(context, number, instance_name, wait_for_completion):
    61 61
         request = remote_execution_pb2.ExecuteRequest(instance_name = instance_name,
    
    62 62
                                                       action_digest = action_digest,
    
    63 63
                                                       skip_cache_lookup = True)
    
    64
    +    responses = []
    
    64 65
         for i in range(0, number):
    
    65
    -        response = stub.Execute(request)
    
    66
    -        for r in response:
    
    67
    -            context.logger.info(r)
    
    68
    -
    
    69
    -    try:
    
    70
    -        while wait_for_completion:
    
    71
    -            request = operations_pb2.ListOperationsRequest()
    
    72
    -            context.logger.debug('Querying to see if jobs are complete.')
    
    73
    -            stub = operations_pb2_grpc.OperationsStub(context.channel)
    
    74
    -            response = stub.ListOperations(request)
    
    75
    -            if all(operation.done for operation in response.operations):
    
    76
    -                context.logger.info('Jobs complete')
    
    77
    -                break
    
    78
    -            time.sleep(1)
    
    79
    -
    
    80
    -    except KeyboardInterrupt:
    
    81
    -        pass
    
    66
    +        responses.append(stub.Execute(request))
    
    67
    +
    
    68
    +    for response in responses:
    
    69
    +        if wait_for_completion:
    
    70
    +            for stream in response:
    
    71
    +                context.logger.info(stream)
    
    72
    +        else:
    
    73
    +            context.logger.info(next(response))
    
    82 74
     
    
    83 75
     @cli.command('status', short_help='Get the status of an operation')
    
    84 76
     @click.argument('operation-name')
    
    ... ... @@ -108,3 +100,15 @@ def list_operations(context):
    108 100
     
    
    109 101
         for op in response.operations:
    
    110 102
             context.logger.info(op)
    
    103
    +
    
    104
    +@cli.command('wait', short_help='Streams an operation until it is complete')
    
    105
    +@click.argument('operation-name')
    
    106
    +@pass_context
    
    107
    +def wait_execution(context, operation_name):
    
    108
    +    stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
    
    109
    +    request = remote_execution_pb2.WaitExecutionRequest(name=operation_name)
    
    110
    +
    
    111
    +    response = stub.WaitExecution(request)
    
    112
    +
    
    113
    +    for stream in response:
    
    114
    +        context.logger.info(stream)

  • buildgrid/server/execution/execution_service.py
    ... ... @@ -22,6 +22,7 @@ ExecutionService
    22 22
     Serves remote execution requests.
    
    23 23
     """
    
    24 24
     
    
    25
    +import copy
    
    25 26
     import grpc
    
    26 27
     import logging
    
    27 28
     
    
    ... ... @@ -40,8 +41,16 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    40 41
             # Ignore request.instance_name for now
    
    41 42
             # Have only one instance
    
    42 43
             try:
    
    43
    -            yield self._instance.execute(request.action_digest,
    
    44
    -                                         request.skip_cache_lookup)
    
    44
    +            operation = self._instance.execute(request.action_digest,
    
    45
    +                                               request.skip_cache_lookup)
    
    46
    +
    
    47
    +            stream_previous = None
    
    48
    +            while True:
    
    49
    +                stream = self._instance.get_operation(operation.name)
    
    50
    +                if stream != stream_previous:
    
    51
    +                    yield stream
    
    52
    +                    if stream.done == True: break
    
    53
    +                    stream_previous = copy.deepcopy(stream)
    
    45 54
     
    
    46 55
             except InvalidArgumentError as e:
    
    47 56
                 self.logger.error(e)
    
    ... ... @@ -54,3 +63,19 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    54 63
                 context.set_details(str(e))
    
    55 64
                 context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    56 65
                 yield operations_pb2.Operation()
    
    66
    +
    
    67
    +    def WaitExecution(self, request, context):
    
    68
    +        try:
    
    69
    +            stream_previous = None
    
    70
    +            while True:
    
    71
    +                stream = self._instance.get_operation(request.name)
    
    72
    +                if stream != stream_previous:
    
    73
    +                    yield stream
    
    74
    +                    if stream.done == True: break
    
    75
    +                    stream_previous = copy.deepcopy(stream)
    
    76
    +
    
    77
    +        except InvalidArgumentError as e:
    
    78
    +            self.logger.error(e)
    
    79
    +            context.set_details(str(e))
    
    80
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    81
    +            yield operations_pb2.Operation()



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]