[Notes] [Git][BuildStream/buildstream][725-job-cancellation-on-remote-builds] _sandboxremote.py: Add sigterm handler that sends CancelOperation



Title: GitLab

Raoul Hidalgo Charman pushed to branch 725-job-cancellation-on-remote-builds at BuildStream / buildstream

Commits:

1 changed file:

Changes:

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -20,14 +20,20 @@
    20 20
     
    
    21 21
     import os
    
    22 22
     from urllib.parse import urlparse
    
    23
    +from functools import partial
    
    23 24
     
    
    24 25
     import grpc
    
    25 26
     
    
    26 27
     from . import Sandbox
    
    27 28
     from ..storage._filebaseddirectory import FileBasedDirectory
    
    28 29
     from ..storage._casbaseddirectory import CasBasedDirectory
    
    30
    +from .. import _signals
    
    29 31
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    30 32
     from .._protos.google.rpc import code_pb2
    
    33
    +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    34
    +
    
    35
    +# TODO remove this later
    
    36
    +from .._message import Message, MessageType
    
    31 37
     
    
    32 38
     
    
    33 39
     class SandboxError(Exception):
    
    ... ... @@ -104,11 +110,14 @@ class SandboxRemote(Sandbox):
    104 110
                         request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
    
    105 111
                         operation_iterator = stub.WaitExecution(request)
    
    106 112
     
    
    107
    -                for operation in operation_iterator:
    
    108
    -                    if operation.done:
    
    109
    -                        return operation
    
    110
    -                    else:
    
    111
    -                        last_operation = operation
    
    113
    +                operation = next(operation_iterator)
    
    114
    +                operation_name = operation.name
    
    115
    +                with _signals.terminator(partial(self.cancel_operation, operation_name)):
    
    116
    +                    for operation in operation_iterator:
    
    117
    +                        if operation.done:
    
    118
    +                            return operation
    
    119
    +                        else:
    
    120
    +                            last_operation = operation
    
    112 121
                 except grpc.RpcError as e:
    
    113 122
                     status_code = e.code()
    
    114 123
                     if status_code == grpc.StatusCode.UNAVAILABLE:
    
    ... ... @@ -135,12 +144,25 @@ class SandboxRemote(Sandbox):
    135 144
                     return None
    
    136 145
                 elif operation.done:
    
    137 146
                     return operation
    
    138
    -
    
    139 147
                 while operation is not None and not operation.done:
    
    140 148
                     operation = __run_remote_command(stub, running_operation=operation)
    
    141 149
     
    
    142 150
             return operation
    
    143 151
     
    
    152
    +    def cancel_operation(self, operation_name):
    
    153
    +        channel = grpc.insecure_channel(self.server_url)
    
    154
    +        stub = operations_pb2_grpc.OperationsStub(channel)
    
    155
    +        request = operations_pb2.CancelOperationRequest(name="/{}".format(operation_name))
    
    156
    +        self._get_context().message(
    
    157
    +            Message(None, MessageType.WARN,
    
    158
    +                    "CancelOperation request: {}".format(str(request))))
    
    159
    +
    
    160
    +        try:
    
    161
    +            stub.CancelOperation(request)
    
    162
    +        except grpc.RpcError as e:
    
    163
    +            if (e.code() != grpc.StatusCode.UNIMPLEMENTED):
    
    164
    +                raise SandboxError("{} ({})".format(e.details(), e.code().name))
    
    165
    +
    
    144 166
         def process_job_output(self, output_directories, output_files):
    
    145 167
             # Reads the remote execution server response to an execution request.
    
    146 168
             #
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]