[Notes] [Git][BuildStream/buildstream][raoul/628-RE-flow-optimisation] 2 commits: utils.py: Add message digest function



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/628-RE-flow-optimisation at BuildStream / buildstream

Commits:

3 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -417,10 +417,7 @@ class CASCache():
    417 417
         def push_message(self, remote, message):
    
    418 418
     
    
    419 419
             message_buffer = message.SerializeToString()
    
    420
    -        message_sha = hashlib.sha256(message_buffer)
    
    421
    -        message_digest = remote_execution_pb2.Digest()
    
    422
    -        message_digest.hash = message_sha.hexdigest()
    
    423
    -        message_digest.size_bytes = len(message_buffer)
    
    420
    +        message_digest = utils._message_digest(message_buffer)
    
    424 421
     
    
    425 422
             remote.init()
    
    426 423
     
    

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -26,6 +26,7 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    +from buildstream import utils
    
    29 30
     from . import Sandbox, SandboxCommandError
    
    30 31
     from .sandbox import _SandboxBatch
    
    31 32
     from ..storage._filebaseddirectory import FileBasedDirectory
    
    ... ... @@ -117,52 +118,12 @@ class SandboxRemote(Sandbox):
    117 118
             spec = RemoteExecutionSpec(remote_config['execution-service'], remote_config['storage-service'])
    
    118 119
             return spec
    
    119 120
     
    
    120
    -    def run_remote_command(self, command, input_root_digest, working_directory, environment):
    
    121
    +    def run_remote_command(self, channel, action_digest):
    
    121 122
             # Sends an execution request to the remote execution server.
    
    122 123
             #
    
    123 124
             # This function blocks until it gets a response from the server.
    
    124
    -        #
    
    125
    -        environment_variables = [remote_execution_pb2.Command.
    
    126
    -                                 EnvironmentVariable(name=k, value=v)
    
    127
    -                                 for (k, v) in environment.items()]
    
    128
    -
    
    129
    -        # Create and send the Command object.
    
    130
    -        remote_command = remote_execution_pb2.Command(arguments=command,
    
    131
    -                                                      working_directory=working_directory,
    
    132
    -                                                      environment_variables=environment_variables,
    
    133
    -                                                      output_files=[],
    
    134
    -                                                      output_directories=[self._output_directory],
    
    135
    -                                                      platform=None)
    
    136
    -        context = self._get_context()
    
    137
    -        cascache = context.get_cascache()
    
    138
    -        casremote = CASRemote(self.storage_remote_spec)
    
    139
    -
    
    140
    -        # Upload the Command message to the remote CAS server
    
    141
    -        command_digest = cascache.push_message(casremote, remote_command)
    
    142
    -        if not command_digest or not cascache.verify_digest_on_remote(casremote, command_digest):
    
    143
    -            raise SandboxError("Failed pushing build command to remote CAS.")
    
    144
    -        # Create and send the action.
    
    145
    -        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    146
    -                                             input_root_digest=input_root_digest,
    
    147
    -                                             timeout=None,
    
    148
    -                                             do_not_cache=False)
    
    149
    -
    
    150
    -        # Upload the Action message to the remote CAS server
    
    151
    -        action_digest = cascache.push_message(casremote, action)
    
    152
    -        if not action_digest or not cascache.verify_digest_on_remote(casremote, action_digest):
    
    153
    -            raise SandboxError("Failed pushing build action to remote CAS.")
    
    154
    -
    
    155
    -        # Next, try to create a communication channel to the BuildGrid server.
    
    156
    -        url = urlparse(self.exec_url)
    
    157
    -        if not url.port:
    
    158
    -            raise SandboxError("You must supply a protocol and port number in the execution-service url, "
    
    159
    -                               "for example: http://buildservice:50051.")
    
    160
    -        if url.scheme == 'http':
    
    161
    -            channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
    
    162
    -        else:
    
    163
    -            raise SandboxError("Remote execution currently only supports the 'http' protocol "
    
    164
    -                               "and '{}' was supplied.".format(url.scheme))
    
    165 125
     
    
    126
    +        # Try to create a communication channel to the BuildGrid server.
    
    166 127
             stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    167 128
             request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
    
    168 129
                                                           skip_cache_lookup=False)
    
    ... ... @@ -282,13 +243,12 @@ class SandboxRemote(Sandbox):
    282 243
             # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
    
    283 244
             # from another hash will be interesting, though...
    
    284 245
     
    
    285
    -        new_dir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=dir_digest)
    
    246
    +        new_dir = CasBasedDirectory(context.artifactcache.cas, ref=dir_digest)
    
    286 247
             self._set_virtual_directory(new_dir)
    
    287 248
     
    
    288 249
         def _run(self, command, flags, *, cwd, env):
    
    289
    -        # Upload sources
    
    250
    +        # set up virtual dircetory
    
    290 251
             upload_vdir = self.get_virtual_directory()
    
    291
    -
    
    292 252
             cascache = self._get_context().get_cascache()
    
    293 253
             if isinstance(upload_vdir, FileBasedDirectory):
    
    294 254
                 # Make a new temporary directory to put source in
    
    ... ... @@ -297,20 +257,97 @@ class SandboxRemote(Sandbox):
    297 257
     
    
    298 258
             upload_vdir.recalculate_hash()
    
    299 259
     
    
    300
    -        casremote = CASRemote(self.storage_remote_spec)
    
    301
    -        # Now, push that key (without necessarily needing a ref) to the remote.
    
    260
    +        # Generate action_digest first
    
    261
    +        input_root_digest = upload_vdir.ref
    
    262
    +        command_proto = self._create_command(command, cwd, env)
    
    263
    +        command_digest = utils._message_digest(command_proto.SerializeToString())
    
    264
    +        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    265
    +                                             input_root_digest=input_root_digest)
    
    266
    +        action_digest = utils._message_digest(action.SerializeToString())
    
    267
    +
    
    268
    +        # Next, try to create a communication channel to the BuildGrid server.
    
    269
    +        url = urlparse(self.exec_url)
    
    270
    +        if not url.port:
    
    271
    +            raise SandboxError("You must supply a protocol and port number in the execution-service url, "
    
    272
    +                               "for example: http://buildservice:50051.")
    
    273
    +        if url.scheme == 'http':
    
    274
    +            channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
    
    275
    +        else:
    
    276
    +            raise SandboxError("Remote execution currently only supports the 'http' protocol "
    
    277
    +                               "and '{}' was supplied.".format(url.scheme))
    
    278
    +
    
    279
    +        # check action cache download and download if there
    
    280
    +        action_result = self._check_action_cache(channel, action_digest)
    
    281
    +
    
    282
    +        if not action_result:
    
    283
    +            casremote = CASRemote(self.storage_remote_spec)
    
    302 284
     
    
    285
    +            # Now, push that key (without necessarily needing a ref) to the remote.
    
    286
    +            try:
    
    287
    +                cascache.push_directory(casremote, upload_vdir)
    
    288
    +            except grpc.RpcError as e:
    
    289
    +                raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
    
    290
    +
    
    291
    +            if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
    
    292
    +                raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    293
    +
    
    294
    +            # Push command and action
    
    295
    +            try:
    
    296
    +                cascache.push_message(casremote, command_proto)
    
    297
    +            except grpc.RpcError as e:
    
    298
    +                raise SandboxError("Failed to push command to remote: {}".format(e))
    
    299
    +
    
    300
    +            try:
    
    301
    +                cascache.push_message(casremote, action)
    
    302
    +            except grpc.RpcError as e:
    
    303
    +                raise SandboxError("Failed to push action to remote: {}".format(e))
    
    304
    +
    
    305
    +            # Now request to execute the action
    
    306
    +            operation = self.run_remote_command(channel, action_digest)
    
    307
    +            action_result = self._extract_action_result(operation)
    
    308
    +
    
    309
    +        if action_result.exit_code != 0:
    
    310
    +            # A normal error during the build: the remote execution system
    
    311
    +            # has worked correctly but the command failed.
    
    312
    +            # action_result.stdout and action_result.stderr also contains
    
    313
    +            # build command outputs which we ignore at the moment.
    
    314
    +            return action_result.exit_code
    
    315
    +
    
    316
    +        # Get output of build
    
    317
    +        self.process_job_output(action_result.output_directories, action_result.output_files)
    
    318
    +
    
    319
    +        return 0
    
    320
    +
    
    321
    +    def _check_action_cache(self, channel, action_digest):
    
    322
    +        # Checks the action cache to see if this artifact has already been built
    
    323
    +        #
    
    324
    +        # Should return either the action response or None if not found, raise
    
    325
    +        # Sandboxerror if other grpc error was raised
    
    326
    +        request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
    
    327
    +        stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
    
    303 328
             try:
    
    304
    -            cascache.push_directory(casremote, upload_vdir)
    
    329
    +            return stub.GetActionResult(request)
    
    305 330
             except grpc.RpcError as e:
    
    306
    -            raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
    
    307
    -
    
    308
    -        if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
    
    309
    -            raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    331
    +            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    332
    +                raise SandboxError("Failed to query action cache: {} ({})"
    
    333
    +                                   .format(e.code(), e.details()))
    
    334
    +            else:
    
    335
    +                return None
    
    310 336
     
    
    311
    -        # Now transmit the command to execute
    
    312
    -        operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
    
    337
    +    def _create_command(self, command, working_directory, environment):
    
    338
    +        # Creates a command proto
    
    339
    +        environment_variables = [remote_execution_pb2.Command.
    
    340
    +                                 EnvironmentVariable(name=k, value=v)
    
    341
    +                                 for (k, v) in environment.items()]
    
    342
    +        return remote_execution_pb2.Command(arguments=command,
    
    343
    +                                            working_directory=working_directory,
    
    344
    +                                            environment_variables=environment_variables,
    
    345
    +                                            output_files=[],
    
    346
    +                                            output_directories=[self._output_directory],
    
    347
    +                                            platform=None)
    
    313 348
     
    
    349
    +    @staticmethod
    
    350
    +    def _extract_action_result(operation):
    
    314 351
             if operation is None:
    
    315 352
                 # Failure of remote execution, usually due to an error in BuildStream
    
    316 353
                 raise SandboxError("No response returned from server")
    
    ... ... @@ -331,18 +368,7 @@ class SandboxRemote(Sandbox):
    331 368
                 else:
    
    332 369
                     raise SandboxError("Remote server failed at executing the build request.")
    
    333 370
     
    
    334
    -        action_result = execution_response.result
    
    335
    -
    
    336
    -        if action_result.exit_code != 0:
    
    337
    -            # A normal error during the build: the remote execution system
    
    338
    -            # has worked correctly but the command failed.
    
    339
    -            # action_result.stdout and action_result.stderr also contains
    
    340
    -            # build command outputs which we ignore at the moment.
    
    341
    -            return action_result.exit_code
    
    342
    -
    
    343
    -        self.process_job_output(action_result.output_directories, action_result.output_files)
    
    344
    -
    
    345
    -        return 0
    
    371
    +        return execution_response.result
    
    346 372
     
    
    347 373
         def _create_batch(self, main_group, flags, *, collect=None):
    
    348 374
             return _SandboxRemoteBatch(self, main_group, flags, collect=collect)
    

  • buildstream/utils.py
    ... ... @@ -41,6 +41,7 @@ import psutil
    41 41
     
    
    42 42
     from . import _signals
    
    43 43
     from ._exceptions import BstError, ErrorDomain
    
    44
    +from ._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    44 45
     
    
    45 46
     # The magic number for timestamps: 2011-11-11 11:11:11
    
    46 47
     _magic_timestamp = calendar.timegm([2011, 11, 11, 11, 11, 11])
    
    ... ... @@ -1242,3 +1243,19 @@ def _deduplicate(iterable, key=None):
    1242 1243
     def _get_link_mtime(path):
    
    1243 1244
         path_stat = os.lstat(path)
    
    1244 1245
         return path_stat.st_mtime
    
    1246
    +
    
    1247
    +
    
    1248
    +# message_digest()
    
    1249
    +#
    
    1250
    +# Args:
    
    1251
    +#    message_buffer (str): String to create digest of
    
    1252
    +#
    
    1253
    +# Returns:
    
    1254
    +#    (remote_execution_pb2.Digest): Content digest
    
    1255
    +#
    
    1256
    +def _message_digest(message_buffer):
    
    1257
    +    sha = hashlib.sha256(message_buffer)
    
    1258
    +    digest = remote_execution_pb2.Digest()
    
    1259
    +    digest.hash = sha.hexdigest()
    
    1260
    +    digest.size_bytes = len(message_buffer)
    
    1261
    +    return digest



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]