[Notes] [Git][BuildStream/buildstream][725-job-cancellation-on-remote-builds] _sandboxremote.py: Add sigterm handler that sends CancelOperation



Title: GitLab

Raoul Hidalgo Charman pushed to branch 725-job-cancellation-on-remote-builds at BuildStream / buildstream

Commits:

1 changed file:

Changes:

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -19,6 +19,7 @@
    19 19
     #        Jim MacArthur <jim macarthur codethink co uk>
    
    20 20
     
    
    21 21
     import os
    
    22
    +import signal
    
    22 23
     from urllib.parse import urlparse
    
    23 24
     
    
    24 25
     import grpc
    
    ... ... @@ -26,8 +27,13 @@ import grpc
    26 27
     from . import Sandbox
    
    27 28
     from ..storage._filebaseddirectory import FileBasedDirectory
    
    28 29
     from ..storage._casbaseddirectory import CasBasedDirectory
    
    30
    +from .. import _signals
    
    29 31
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    30 32
     from .._protos.google.rpc import code_pb2
    
    33
    +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    34
    +
    
    35
    +# TODO remove this later
    
    36
    +from .._message import Message, MessageType
    
    31 37
     
    
    32 38
     
    
    33 39
     class SandboxError(Exception):
    
    ... ... @@ -54,6 +60,7 @@ class SandboxRemote(Sandbox):
    54 60
                                    "Only plain HTTP is currenlty supported (no HTTPS).")
    
    55 61
     
    
    56 62
             self.server_url = '{}:{}'.format(url.hostname, url.port)
    
    63
    +        self.operation_name = None
    
    57 64
     
    
    58 65
         def run_remote_command(self, command, input_root_digest, working_directory, environment):
    
    59 66
             # Sends an execution request to the remote execution server.
    
    ... ... @@ -104,11 +111,15 @@ class SandboxRemote(Sandbox):
    104 111
                         request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
    
    105 112
                         operation_iterator = stub.WaitExecution(request)
    
    106 113
     
    
    107
    -                for operation in operation_iterator:
    
    108
    -                    if operation.done:
    
    109
    -                        return operation
    
    110
    -                    else:
    
    111
    -                        last_operation = operation
    
    114
    +                with _signals.terminator(self.cancel_operation):
    
    115
    +                    with _signals.blocked([signal.SIGTERM], ignore=False):
    
    116
    +                        operation = next(operation_iterator)
    
    117
    +                        operation_name = operation.name
    
    118
    +                    for operation in operation_iterator:
    
    119
    +                        if operation.done:
    
    120
    +                            return operation
    
    121
    +                        else:
    
    122
    +                            last_operation = operation
    
    112 123
                 except grpc.RpcError as e:
    
    113 124
                     status_code = e.code()
    
    114 125
                     if status_code == grpc.StatusCode.UNAVAILABLE:
    
    ... ... @@ -135,12 +146,25 @@ class SandboxRemote(Sandbox):
    135 146
                     return None
    
    136 147
                 elif operation.done:
    
    137 148
                     return operation
    
    138
    -
    
    139 149
                 while operation is not None and not operation.done:
    
    140 150
                     operation = __run_remote_command(stub, running_operation=operation)
    
    141 151
     
    
    142 152
             return operation
    
    143 153
     
    
    154
    +    def cancel_operation(self):
    
    155
    +        channel = grpc.insecure_channel(self.server_url)
    
    156
    +        stub = operations_pb2_grpc.OperationsStub(channel)
    
    157
    +        request = operations_pb2.CancelOperationRequest(name="/{}".format(self.operation_name))
    
    158
    +        self._get_context().message(
    
    159
    +            Message(None, MessageType.WARN,
    
    160
    +                    "CancelOperation request: {}".format(str(request))))
    
    161
    +
    
    162
    +        try:
    
    163
    +            stub.CancelOperation(request)
    
    164
    +        except grpc.RpcError as e:
    
    165
    +            if (e.code() != grpc.StatusCode.UNIMPLEMENTED):
    
    166
    +                raise SandboxError("{} ({})".format(e.details(), e.code().name))
    
    167
    +
    
    144 168
         def process_job_output(self, output_directories, output_files):
    
    145 169
             # Reads the remote execution server response to an execution request.
    
    146 170
             #
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]