[Notes] [Git][BuildStream/buildstream][725-job-cancellation-on-remote-builds] _sandboxremote.py: Add sigterm handler that sends CancelOperation



Title: GitLab

Raoul Hidalgo Charman pushed to branch 725-job-cancellation-on-remote-builds at BuildStream / buildstream

Commits:

1 changed file:

Changes:

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -20,15 +20,18 @@
    20 20
     
    
    21 21
     import os
    
    22 22
     from urllib.parse import urlparse
    
    23
    +from functools import partial
    
    23 24
     
    
    24 25
     import grpc
    
    25 26
     
    
    26 27
     from . import Sandbox
    
    27 28
     from ..storage._filebaseddirectory import FileBasedDirectory
    
    28 29
     from ..storage._casbaseddirectory import CasBasedDirectory
    
    30
    +from .. import _signals
    
    29 31
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    30 32
     from .._protos.google.rpc import code_pb2
    
    31 33
     from .._exceptions import SandboxError
    
    34
    +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    32 35
     
    
    33 36
     
    
    34 37
     # SandboxRemote()
    
    ... ... @@ -51,6 +54,7 @@ class SandboxRemote(Sandbox):
    51 54
                                    "Only plain HTTP is currenlty supported (no HTTPS).")
    
    52 55
     
    
    53 56
             self.server_url = '{}:{}'.format(url.hostname, url.port)
    
    57
    +        self.operation_name = None
    
    54 58
     
    
    55 59
         def run_remote_command(self, command, input_root_digest, working_directory, environment):
    
    56 60
             # Sends an execution request to the remote execution server.
    
    ... ... @@ -101,11 +105,18 @@ class SandboxRemote(Sandbox):
    101 105
                         request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
    
    102 106
                         operation_iterator = stub.WaitExecution(request)
    
    103 107
     
    
    108
    +                operation = next(operation_iterator)
    
    109
    +                self.operation_name = operation.name
    
    110
    +                if operation.done:
    
    111
    +                    return operation
    
    112
    +                else:
    
    113
    +                    last_operation = operation
    
    104 114
                     for operation in operation_iterator:
    
    105 115
                         if operation.done:
    
    106 116
                             return operation
    
    107 117
                         else:
    
    108 118
                             last_operation = operation
    
    119
    +
    
    109 120
                 except grpc.RpcError as e:
    
    110 121
                     status_code = e.code()
    
    111 122
                     if status_code == grpc.StatusCode.UNAVAILABLE:
    
    ... ... @@ -125,19 +136,35 @@ class SandboxRemote(Sandbox):
    125 136
     
    
    126 137
                 return last_operation
    
    127 138
     
    
    139
    +        # Set up signal handler to trigger cancel_operation on SIGTERM
    
    128 140
             operation = None
    
    129
    -        with self._get_context().timed_activity("Waiting for the remote build to complete"):
    
    141
    +        with self._get_context().timed_activity("Waiting for the remote build to complete"), \
    
    142
    +            _signals.terminator(partial(self.cancel_operation, channel)):
    
    130 143
                 operation = __run_remote_command(stub, execute_request=request)
    
    131 144
                 if operation is None:
    
    132 145
                     return None
    
    133 146
                 elif operation.done:
    
    134 147
                     return operation
    
    135
    -
    
    136 148
                 while operation is not None and not operation.done:
    
    137 149
                     operation = __run_remote_command(stub, running_operation=operation)
    
    138 150
     
    
    139 151
             return operation
    
    140 152
     
    
    153
    +    def cancel_operation(self, channel):
    
    154
    +        # If we don't have the name can't send request.
    
    155
    +        if self.operation_name is None:
    
    156
    +            return
    
    157
    +
    
    158
    +        stub = operations_pb2_grpc.OperationsStub(channel)
    
    159
    +        request = operations_pb2.CancelOperationRequest(
    
    160
    +            name=str(self.operation_name))
    
    161
    +
    
    162
    +        try:
    
    163
    +            stub.CancelOperation(request)
    
    164
    +        except grpc.RpcError as e:
    
    165
    +            if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    166
    +                raise SandboxError("{} ({})".format(e.details(), e.code().name))
    
    167
    +
    
    141 168
         def process_job_output(self, output_directories, output_files):
    
    142 169
             # Reads the remote execution server response to an execution request.
    
    143 170
             #
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]