[Notes] [Git][BuildStream/buildstream][mablanch/630-remote-execution-reconn] _sandboxremote.py: Try to reopen operation steam on failure



Title: GitLab

Martin Blanchard pushed to branch mablanch/630-remote-execution-reconn at BuildStream / buildstream

Commits:

1 changed file:

Changes:

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -83,8 +83,7 @@ class SandboxRemote(Sandbox):
    83 83
             # Upload the Command message to the remote CAS server
    
    84 84
             command_digest = cascache.push_message(self._get_project(), remote_command)
    
    85 85
             if not command_digest or not cascache.verify_digest_pushed(self._get_project(), command_digest):
    
    86
    -            # Command push failed
    
    87
    -            return None
    
    86
    +            raise SandboxError("Failed pushing build command to remote CAS.")
    
    88 87
     
    
    89 88
             # Create and send the action.
    
    90 89
             action = remote_execution_pb2.Action(command_digest=command_digest,
    
    ... ... @@ -95,27 +94,49 @@ class SandboxRemote(Sandbox):
    95 94
             # Upload the Action message to the remote CAS server
    
    96 95
             action_digest = cascache.push_message(self._get_project(), action)
    
    97 96
             if not action_digest or not cascache.verify_digest_pushed(self._get_project(), action_digest):
    
    98
    -            # Action push failed
    
    99
    -            return None
    
    97
    +            raise SandboxError("Failed pushing build action to remote CAS.")
    
    100 98
     
    
    101 99
             # Next, try to create a communication channel to the BuildGrid server.
    
    102 100
             channel = grpc.insecure_channel(self.server_url)
    
    103 101
             stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    104 102
             request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
    
    105 103
                                                           skip_cache_lookup=False)
    
    106
    -        try:
    
    107
    -            operation_iterator = stub.Execute(request)
    
    108
    -        except grpc.RpcError:
    
    109
    -            return None
    
    104
    +
    
    105
    +        def __run_remote_command(stub, execute_request=None, running_operation=None):
    
    106
    +            try:
    
    107
    +                last_operation = None
    
    108
    +                if execute_request is not None:
    
    109
    +                    operation_iterator = stub.Execute(execute_request)
    
    110
    +                else:
    
    111
    +                    request = remote_execution_pb2.WaitExecutionRequest(name=operation.name)
    
    112
    +                    operation_iterator = stub.WaitExecution(request)
    
    113
    +
    
    114
    +                for operation in operation_iterator:
    
    115
    +                    if operation.done:
    
    116
    +                        return operation
    
    117
    +                    else:
    
    118
    +                        last_operation = operation
    
    119
    +            except grpc.RpcError as e:
    
    120
    +                status_code = e.code()
    
    121
    +                if status_code == grpc.StatusCode.UNAVAILABLE:
    
    122
    +                    raise SandboxError("Failed contacting remote execution server at {}."
    
    123
    +                                       .format(self.server_url))
    
    124
    +                elif running_operation and status_code == grpc.StatusCode.UNIMPLEMENTED:
    
    125
    +                    raise SandboxError("Failed trying to recover from connection loss: "
    
    126
    +                                       "server does not support operation status polling recovery.")
    
    127
    +                else:
    
    128
    +                    raise SandboxError("{} ({}).".format(e.details(), status_code.name))
    
    129
    +
    
    130
    +            return last_operation
    
    110 131
     
    
    111 132
             operation = None
    
    112 133
             with self._get_context().timed_activity("Waiting for the remote build to complete"):
    
    113
    -            # It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here,
    
    114
    -            # which will check the server is actually contactable. However, calling it when the
    
    115
    -            # server is available seems to cause .code() to hang forever.
    
    116
    -            for operation in operation_iterator:
    
    117
    -                if operation.done:
    
    118
    -                    break
    
    134
    +            operation = __run_remote_command(stub, execute_request=request)
    
    135
    +            if operation and operation.done:
    
    136
    +                return operation
    
    137
    +
    
    138
    +            while not operation.done:
    
    139
    +                operation = __run_remote_command(stub, running_operation=operation)
    
    119 140
     
    
    120 141
             return operation
    
    121 142
     
    
    ... ... @@ -201,7 +222,6 @@ class SandboxRemote(Sandbox):
    201 222
     
    
    202 223
             if operation is None:
    
    203 224
                 # Failure of remote execution, usually due to an error in BuildStream
    
    204
    -            # NB This error could be raised in __run_remote_command
    
    205 225
                 raise SandboxError("No response returned from server")
    
    206 226
     
    
    207 227
             assert not operation.HasField('error') and operation.HasField('response')
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]