[Notes] [Git][BuildStream/buildstream][raoul/628-RE-flow-optimisation] _sandboxremote.py: Add checks to action cache before attempting to push.



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/628-RE-flow-optimisation at BuildStream / buildstream

Commits:

1 changed file:

Changes:

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -37,6 +37,7 @@ from .._exceptions import SandboxError
    37 37
     from .. import _yaml
    
    38 38
     from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    39 39
     from .._artifactcache.cascache import CASRemote, CASRemoteSpec
    
    40
    +from ..utils import message_digest
    
    40 41
     
    
    41 42
     
    
    42 43
     class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service')):
    
    ... ... @@ -117,52 +118,12 @@ class SandboxRemote(Sandbox):
    117 118
             spec = RemoteExecutionSpec(remote_config['execution-service'], remote_config['storage-service'])
    
    118 119
             return spec
    
    119 120
     
    
    120
    -    def run_remote_command(self, command, input_root_digest, working_directory, environment):
    
    121
    +    def run_remote_command(self, channel, action_digest):
    
    121 122
             # Sends an execution request to the remote execution server.
    
    122 123
             #
    
    123 124
             # This function blocks until it gets a response from the server.
    
    124
    -        #
    
    125
    -        environment_variables = [remote_execution_pb2.Command.
    
    126
    -                                 EnvironmentVariable(name=k, value=v)
    
    127
    -                                 for (k, v) in environment.items()]
    
    128
    -
    
    129
    -        # Create and send the Command object.
    
    130
    -        remote_command = remote_execution_pb2.Command(arguments=command,
    
    131
    -                                                      working_directory=working_directory,
    
    132
    -                                                      environment_variables=environment_variables,
    
    133
    -                                                      output_files=[],
    
    134
    -                                                      output_directories=[self._output_directory],
    
    135
    -                                                      platform=None)
    
    136
    -        context = self._get_context()
    
    137
    -        cascache = context.get_cascache()
    
    138
    -        casremote = CASRemote(self.storage_remote_spec)
    
    139
    -
    
    140
    -        # Upload the Command message to the remote CAS server
    
    141
    -        command_digest = cascache.push_message(casremote, remote_command)
    
    142
    -        if not command_digest or not cascache.verify_digest_on_remote(casremote, command_digest):
    
    143
    -            raise SandboxError("Failed pushing build command to remote CAS.")
    
    144
    -        # Create and send the action.
    
    145
    -        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    146
    -                                             input_root_digest=input_root_digest,
    
    147
    -                                             timeout=None,
    
    148
    -                                             do_not_cache=False)
    
    149
    -
    
    150
    -        # Upload the Action message to the remote CAS server
    
    151
    -        action_digest = cascache.push_message(casremote, action)
    
    152
    -        if not action_digest or not cascache.verify_digest_on_remote(casremote, action_digest):
    
    153
    -            raise SandboxError("Failed pushing build action to remote CAS.")
    
    154
    -
    
    155
    -        # Next, try to create a communication channel to the BuildGrid server.
    
    156
    -        url = urlparse(self.exec_url)
    
    157
    -        if not url.port:
    
    158
    -            raise SandboxError("You must supply a protocol and port number in the execution-service url, "
    
    159
    -                               "for example: http://buildservice:50051.")
    
    160
    -        if url.scheme == 'http':
    
    161
    -            channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
    
    162
    -        else:
    
    163
    -            raise SandboxError("Remote execution currently only supports the 'http' protocol "
    
    164
    -                               "and '{}' was supplied.".format(url.scheme))
    
    165 125
     
    
    126
    +        # Try to create a communication channel to the BuildGrid server.
    
    166 127
             stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    167 128
             request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
    
    168 129
                                                           skip_cache_lookup=False)
    
    ... ... @@ -282,13 +243,12 @@ class SandboxRemote(Sandbox):
    282 243
             # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
    
    283 244
             # from another hash will be interesting, though...
    
    284 245
     
    
    285
    -        new_dir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=dir_digest)
    
    246
    +        new_dir = CasBasedDirectory(context.artifactcache.cas, ref=dir_digest)
    
    286 247
             self._set_virtual_directory(new_dir)
    
    287 248
     
    
    288 249
         def _run(self, command, flags, *, cwd, env):
    
    289
    -        # Upload sources
    
    250
    +        # set up virtual dircetory
    
    290 251
             upload_vdir = self.get_virtual_directory()
    
    291
    -
    
    292 252
             cascache = self._get_context().get_cascache()
    
    293 253
             if isinstance(upload_vdir, FileBasedDirectory):
    
    294 254
                 # Make a new temporary directory to put source in
    
    ... ... @@ -297,20 +257,96 @@ class SandboxRemote(Sandbox):
    297 257
     
    
    298 258
             upload_vdir.recalculate_hash()
    
    299 259
     
    
    300
    -        casremote = CASRemote(self.storage_remote_spec)
    
    301
    -        # Now, push that key (without necessarily needing a ref) to the remote.
    
    260
    +        # Generate action_digest first
    
    261
    +        input_root_digest = upload_vdir.ref
    
    262
    +        command_proto = self._create_command(command, cwd, env)
    
    263
    +        command_digest = message_digest(command_proto.SerializeToString())
    
    264
    +        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    265
    +                                             input_root_digest=input_root_digest)
    
    266
    +        action_digest = message_digest(action.SerializeToString())
    
    267
    +
    
    268
    +        # Next, try to create a communication channel to the BuildGrid server.
    
    269
    +        url = urlparse(self.exec_url)
    
    270
    +        if not url.port:
    
    271
    +            raise SandboxError("You must supply a protocol and port number in the execution-service url, "
    
    272
    +                               "for example: http://buildservice:50051.")
    
    273
    +        if url.scheme == 'http':
    
    274
    +            channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
    
    275
    +        else:
    
    276
    +            raise SandboxError("Remote execution currently only supports the 'http' protocol "
    
    277
    +                               "and '{}' was supplied.".format(url.scheme))
    
    278
    +
    
    279
    +        # check action cache download and download if there
    
    280
    +        action_result = self._check_action_cache(channel, action_digest)
    
    281
    +
    
    282
    +        if not action_result:
    
    283
    +            casremote = CASRemote(self.storage_remote_spec)
    
    302 284
     
    
    285
    +            # Now, push that key (without necessarily needing a ref) to the remote.
    
    286
    +            try:
    
    287
    +                cascache.push_directory(casremote, upload_vdir)
    
    288
    +            except grpc.RpcError as e:
    
    289
    +                raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
    
    290
    +
    
    291
    +            if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
    
    292
    +                raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    293
    +
    
    294
    +            # Push command and action
    
    295
    +            try:
    
    296
    +                cascache.push_message(casremote, command_proto)
    
    297
    +            except grpc.RpcError as e:
    
    298
    +                raise SandboxError("Failed to push command to remote: {}".format(e))
    
    299
    +
    
    300
    +            try:
    
    301
    +                cascache.push_message(casremote, action)
    
    302
    +            except grpc.RpcError as e:
    
    303
    +                raise SandboxError("Failed to push action to remote: {}".format(e))
    
    304
    +
    
    305
    +            # Now request to execute the action
    
    306
    +            operation = self.run_remote_command(channel, action_digest)
    
    307
    +            action_result = self._extract_action_result(operation)
    
    308
    +
    
    309
    +        if action_result.exit_code != 0:
    
    310
    +            # A normal error during the build: the remote execution system
    
    311
    +            # has worked correctly but the command failed.
    
    312
    +            # action_result.stdout and action_result.stderr also contains
    
    313
    +            # build command outputs which we ignore at the moment.
    
    314
    +            return action_result.exit_code
    
    315
    +
    
    316
    +        # Get output of build
    
    317
    +        self.process_job_output(action_result.output_directories, action_result.output_files)
    
    318
    +
    
    319
    +        return 0
    
    320
    +
    
    321
    +    def _check_action_cache(self, channel, action_digest):
    
    322
    +        # Checks the action cache to see if this artifact has already been built
    
    323
    +        #
    
    324
    +        # Should return either the action response or None if not found, raise
    
    325
    +        # Sandboxerror if other grpc error was raised
    
    326
    +        request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
    
    327
    +        stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
    
    303 328
             try:
    
    304
    -            cascache.push_directory(casremote, upload_vdir)
    
    329
    +            return stub.GetActionResult(request)
    
    305 330
             except grpc.RpcError as e:
    
    306
    -            raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
    
    307
    -
    
    308
    -        if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
    
    309
    -            raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    331
    +            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    332
    +                raise SandboxError("{} ({})".format(e.code(), e.details()))
    
    333
    +            else:
    
    334
    +                return None
    
    310 335
     
    
    311
    -        # Now transmit the command to execute
    
    312
    -        operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
    
    336
    +    def _create_command(self, command, working_directory, environment):
    
    337
    +        # Creates a command proto
    
    338
    +        environment_variables = [remote_execution_pb2.Command.
    
    339
    +                                 EnvironmentVariable(name=k, value=v)
    
    340
    +                                 for (k, v) in environment.items()]
    
    341
    +        return remote_execution_pb2.Command(arguments=command,
    
    342
    +                                            working_directory=working_directory,
    
    343
    +                                            environment_variables=environment_variables,
    
    344
    +                                            output_files=[],
    
    345
    +                                            output_directories=[self._output_directory],
    
    346
    +                                            platform=None)
    
    313 347
     
    
    348
    +    @staticmethod
    
    349
    +    def _extract_action_result(operation):
    
    314 350
             if operation is None:
    
    315 351
                 # Failure of remote execution, usually due to an error in BuildStream
    
    316 352
                 raise SandboxError("No response returned from server")
    
    ... ... @@ -331,18 +367,7 @@ class SandboxRemote(Sandbox):
    331 367
                 else:
    
    332 368
                     raise SandboxError("Remote server failed at executing the build request.")
    
    333 369
     
    
    334
    -        action_result = execution_response.result
    
    335
    -
    
    336
    -        if action_result.exit_code != 0:
    
    337
    -            # A normal error during the build: the remote execution system
    
    338
    -            # has worked correctly but the command failed.
    
    339
    -            # action_result.stdout and action_result.stderr also contains
    
    340
    -            # build command outputs which we ignore at the moment.
    
    341
    -            return action_result.exit_code
    
    342
    -
    
    343
    -        self.process_job_output(action_result.output_directories, action_result.output_files)
    
    344
    -
    
    345
    -        return 0
    
    370
    +        return execution_response.result
    
    346 371
     
    
    347 372
         def _create_batch(self, main_group, flags, *, collect=None):
    
    348 373
             return _SandboxRemoteBatch(self, main_group, flags, collect=collect)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]