[Notes] [Git][BuildStream/buildstream][725-job-cancellation-on-remote-builds] _sandboxremote.py: Add sigterm handler that sends CancelOperation



Title: GitLab

Raoul Hidalgo Charman pushed to branch 725-job-cancellation-on-remote-builds at BuildStream / buildstream

Commits:

1 changed file:

Changes:

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -19,6 +19,7 @@
    19 19
     #        Jim MacArthur <jim macarthur codethink co uk>
    
    20 20
     
    
    21 21
     import os
    
    22
    +import signal
    
    22 23
     from urllib.parse import urlparse
    
    23 24
     
    
    24 25
     import grpc
    
    ... ... @@ -26,8 +27,10 @@ import grpc
    26 27
     from . import Sandbox
    
    27 28
     from ..storage._filebaseddirectory import FileBasedDirectory
    
    28 29
     from ..storage._casbaseddirectory import CasBasedDirectory
    
    30
    +from .. import _signals
    
    29 31
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    30 32
     from .._protos.google.rpc import code_pb2
    
    33
    +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    31 34
     
    
    32 35
     
    
    33 36
     class SandboxError(Exception):
    
    ... ... @@ -54,6 +57,7 @@ class SandboxRemote(Sandbox):
    54 57
                                    "Only plain HTTP is currenlty supported (no HTTPS).")
    
    55 58
     
    
    56 59
             self.server_url = '{}:{}'.format(url.hostname, url.port)
    
    60
    +        self.operation_name = None
    
    57 61
     
    
    58 62
         def run_remote_command(self, command, input_root_digest, working_directory, environment):
    
    59 63
             # Sends an execution request to the remote execution server.
    
    ... ... @@ -104,11 +108,15 @@ class SandboxRemote(Sandbox):
    104 108
                         request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
    
    105 109
                         operation_iterator = stub.WaitExecution(request)
    
    106 110
     
    
    107
    -                for operation in operation_iterator:
    
    108
    -                    if operation.done:
    
    109
    -                        return operation
    
    110
    -                    else:
    
    111
    -                        last_operation = operation
    
    111
    +                with _signals.terminator(self.cancel_operation):
    
    112
    +                    with _signals.blocked([signal.SIGTERM], ignore=False):
    
    113
    +                        operation = next(operation_iterator)
    
    114
    +                        self.operation_name = operation.name
    
    115
    +                    for operation in operation_iterator:
    
    116
    +                        if operation.done:
    
    117
    +                            return operation
    
    118
    +                        else:
    
    119
    +                            last_operation = operation
    
    112 120
                 except grpc.RpcError as e:
    
    113 121
                     status_code = e.code()
    
    114 122
                     if status_code == grpc.StatusCode.UNAVAILABLE:
    
    ... ... @@ -135,12 +143,23 @@ class SandboxRemote(Sandbox):
    135 143
                     return None
    
    136 144
                 elif operation.done:
    
    137 145
                     return operation
    
    138
    -
    
    139 146
                 while operation is not None and not operation.done:
    
    140 147
                     operation = __run_remote_command(stub, running_operation=operation)
    
    141 148
     
    
    142 149
             return operation
    
    143 150
     
    
    151
    +    def cancel_operation(self):
    
    152
    +        channel = grpc.insecure_channel(self.server_url)
    
    153
    +        stub = operations_pb2_grpc.OperationsStub(channel)
    
    154
    +        request = operations_pb2.CancelOperationRequest(
    
    155
    +            name="{}".format(self.operation_name))
    
    156
    +
    
    157
    +        try:
    
    158
    +            stub.CancelOperation(request)
    
    159
    +        except grpc.RpcError as e:
    
    160
    +            if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    161
    +                raise SandboxError("{} ({})".format(e.details(), e.code().name))
    
    162
    +
    
    144 163
         def process_job_output(self, output_directories, output_files):
    
    145 164
             # Reads the remote execution server response to an execution request.
    
    146 165
             #
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]