[Notes] [Git][BuildStream/buildstream][725-job-cancellation-on-remote-builds] _sandboxremote.py: Add sigterm handler that sends CancelOperation



Title: GitLab

Raoul Hidalgo Charman pushed to branch 725-job-cancellation-on-remote-builds at BuildStream / buildstream

Commits:

1 changed file:

Changes:

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -20,15 +20,18 @@
    20 20
     
    
    21 21
     import os
    
    22 22
     from urllib.parse import urlparse
    
    23
    +from functools import partial
    
    23 24
     
    
    24 25
     import grpc
    
    25 26
     
    
    26 27
     from . import Sandbox
    
    27 28
     from ..storage._filebaseddirectory import FileBasedDirectory
    
    28 29
     from ..storage._casbaseddirectory import CasBasedDirectory
    
    30
    +from .. import _signals
    
    29 31
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    30 32
     from .._protos.google.rpc import code_pb2
    
    31 33
     from .._exceptions import SandboxError
    
    34
    +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    32 35
     
    
    33 36
     
    
    34 37
     # SandboxRemote()
    
    ... ... @@ -51,6 +54,7 @@ class SandboxRemote(Sandbox):
    51 54
                                    "Only plain HTTP is currenlty supported (no HTTPS).")
    
    52 55
     
    
    53 56
             self.server_url = '{}:{}'.format(url.hostname, url.port)
    
    57
    +        self.operation_name = None
    
    54 58
     
    
    55 59
         def run_remote_command(self, command, input_root_digest, working_directory, environment):
    
    56 60
             # Sends an execution request to the remote execution server.
    
    ... ... @@ -102,10 +106,13 @@ class SandboxRemote(Sandbox):
    102 106
                         operation_iterator = stub.WaitExecution(request)
    
    103 107
     
    
    104 108
                     for operation in operation_iterator:
    
    109
    +                    if not self.operation_name:
    
    110
    +                        self.operation_name = operation.name
    
    105 111
                         if operation.done:
    
    106 112
                             return operation
    
    107 113
                         else:
    
    108 114
                             last_operation = operation
    
    115
    +
    
    109 116
                 except grpc.RpcError as e:
    
    110 117
                     status_code = e.code()
    
    111 118
                     if status_code == grpc.StatusCode.UNAVAILABLE:
    
    ... ... @@ -125,19 +132,38 @@ class SandboxRemote(Sandbox):
    125 132
     
    
    126 133
                 return last_operation
    
    127 134
     
    
    135
    +        # Set up signal handler to trigger cancel_operation on SIGTERM
    
    128 136
             operation = None
    
    129
    -        with self._get_context().timed_activity("Waiting for the remote build to complete"):
    
    137
    +        with self._get_context().timed_activity("Waiting for the remote build to complete"), \
    
    138
    +            _signals.terminator(partial(self.cancel_operation, channel)):
    
    130 139
                 operation = __run_remote_command(stub, execute_request=request)
    
    131 140
                 if operation is None:
    
    132 141
                     return None
    
    133 142
                 elif operation.done:
    
    134 143
                     return operation
    
    135
    -
    
    136 144
                 while operation is not None and not operation.done:
    
    137 145
                     operation = __run_remote_command(stub, running_operation=operation)
    
    138 146
     
    
    139 147
             return operation
    
    140 148
     
    
    149
    +    def cancel_operation(self, channel):
    
    150
    +        # If we don't have the name can't send request.
    
    151
    +        if self.operation_name is None:
    
    152
    +            return
    
    153
    +
    
    154
    +        stub = operations_pb2_grpc.OperationsStub(channel)
    
    155
    +        request = operations_pb2.CancelOperationRequest(
    
    156
    +            name=str(self.operation_name))
    
    157
    +
    
    158
    +        try:
    
    159
    +            stub.CancelOperation(request)
    
    160
    +        except grpc.RpcError as e:
    
    161
    +            if (e.code() == grpc.StatusCode.UNIMPLEMENTED or
    
    162
    +                    e.code() == grpc.StatusCode.INVALID_ARGUMENT):
    
    163
    +                pass
    
    164
    +            else:
    
    165
    +                raise SandboxError("{} ({})".format(e.details(), e.code().name))
    
    166
    +
    
    141 167
         def process_job_output(self, output_directories, output_files):
    
    142 168
             # Reads the remote execution server response to an execution request.
    
    143 169
             #
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]