[Notes] [Git][BuildStream/buildstream][raoul/628-RE-flow-optimisation] 4 commits: utils.py: Add message digest function



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/628-RE-flow-optimisation at BuildStream / buildstream

Commits:

4 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -427,10 +427,7 @@ class CASCache():
    427 427
         def push_message(self, remote, message):
    
    428 428
     
    
    429 429
             message_buffer = message.SerializeToString()
    
    430
    -        message_sha = hashlib.sha256(message_buffer)
    
    431
    -        message_digest = remote_execution_pb2.Digest()
    
    432
    -        message_digest.hash = message_sha.hexdigest()
    
    433
    -        message_digest.size_bytes = len(message_buffer)
    
    430
    +        message_digest = utils._message_digest(message_buffer)
    
    434 431
     
    
    435 432
             remote.init()
    
    436 433
     
    

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -26,6 +26,8 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    +from buildstream._message import Message, MessageType
    
    30
    +from buildstream import utils
    
    29 31
     from . import Sandbox, SandboxCommandError
    
    30 32
     from .sandbox import _SandboxBatch
    
    31 33
     from ..storage._filebaseddirectory import FileBasedDirectory
    
    ... ... @@ -39,7 +41,7 @@ from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    39 41
     from .._artifactcache.cascache import CASRemote, CASRemoteSpec
    
    40 42
     
    
    41 43
     
    
    42
    -class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service')):
    
    44
    +class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')):
    
    43 45
         pass
    
    44 46
     
    
    45 47
     
    
    ... ... @@ -59,6 +61,10 @@ class SandboxRemote(Sandbox):
    59 61
     
    
    60 62
             self.storage_url = config.storage_service['url']
    
    61 63
             self.exec_url = config.exec_service['url']
    
    64
    +        if config.action_service:
    
    65
    +            self.action_url = config.action_service['url']
    
    66
    +        else:
    
    67
    +            self.action_url = None
    
    62 68
     
    
    63 69
             self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True,
    
    64 70
                                                      server_cert=config.storage_service['server-cert'],
    
    ... ... @@ -66,6 +72,9 @@ class SandboxRemote(Sandbox):
    66 72
                                                      client_cert=config.storage_service['client-cert'])
    
    67 73
             self.operation_name = None
    
    68 74
     
    
    75
    +    def info(self, msg):
    
    76
    +        self._get_context().message(Message(None, MessageType.INFO, msg))
    
    77
    +
    
    69 78
         @staticmethod
    
    70 79
         def specs_from_config_node(config_node, basedir):
    
    71 80
     
    
    ... ... @@ -88,12 +97,19 @@ class SandboxRemote(Sandbox):
    88 97
     
    
    89 98
             tls_keys = ['client-key', 'client-cert', 'server-cert']
    
    90 99
     
    
    91
    -        _yaml.node_validate(remote_config, ['execution-service', 'storage-service', 'url'])
    
    100
    +        _yaml.node_validate(
    
    101
    +            remote_config,
    
    102
    +            ['execution-service', 'storage-service', 'url', 'action-cache-service'])
    
    92 103
             remote_exec_service_config = require_node(remote_config, 'execution-service')
    
    93 104
             remote_exec_storage_config = require_node(remote_config, 'storage-service')
    
    105
    +        remote_exec_action_config = remote_config.get('action-cache-service')
    
    94 106
     
    
    95 107
             _yaml.node_validate(remote_exec_service_config, ['url'])
    
    96 108
             _yaml.node_validate(remote_exec_storage_config, ['url'] + tls_keys)
    
    109
    +        if remote_exec_action_config:
    
    110
    +            _yaml.node_validate(remote_exec_action_config, ['url'])
    
    111
    +        else:
    
    112
    +            remote_config['action-service'] = None
    
    97 113
     
    
    98 114
             if 'url' in remote_config:
    
    99 115
                 if 'execution-service' not in remote_config:
    
    ... ... @@ -114,52 +130,17 @@ class SandboxRemote(Sandbox):
    114 130
                                           "remote-execution configuration. Your config is missing '{}'."
    
    115 131
                                           .format(str(provenance), tls_keys, key))
    
    116 132
     
    
    117
    -        spec = RemoteExecutionSpec(remote_config['execution-service'], remote_config['storage-service'])
    
    133
    +        spec = RemoteExecutionSpec(remote_config['execution-service'],
    
    134
    +                                   remote_config['storage-service'],
    
    135
    +                                   remote_config['action-cache-service'])
    
    118 136
             return spec
    
    119 137
     
    
    120
    -    def run_remote_command(self, command, input_root_digest, working_directory, environment):
    
    138
    +    def run_remote_command(self, channel, action_digest):
    
    121 139
             # Sends an execution request to the remote execution server.
    
    122 140
             #
    
    123 141
             # This function blocks until it gets a response from the server.
    
    124
    -        #
    
    125
    -        environment_variables = [remote_execution_pb2.Command.
    
    126
    -                                 EnvironmentVariable(name=k, value=v)
    
    127
    -                                 for (k, v) in environment.items()]
    
    128
    -
    
    129
    -        # Create and send the Command object.
    
    130
    -        remote_command = remote_execution_pb2.Command(arguments=command,
    
    131
    -                                                      working_directory=working_directory,
    
    132
    -                                                      environment_variables=environment_variables,
    
    133
    -                                                      output_files=[],
    
    134
    -                                                      output_directories=[self._output_directory],
    
    135
    -                                                      platform=None)
    
    136
    -        context = self._get_context()
    
    137
    -        cascache = context.get_cascache()
    
    138
    -        casremote = CASRemote(self.storage_remote_spec)
    
    139
    -
    
    140
    -        # Upload the Command message to the remote CAS server
    
    141
    -        command_digest = cascache.push_message(casremote, remote_command)
    
    142
    -
    
    143
    -        # Create and send the action.
    
    144
    -        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    145
    -                                             input_root_digest=input_root_digest,
    
    146
    -                                             timeout=None,
    
    147
    -                                             do_not_cache=False)
    
    148
    -
    
    149
    -        # Upload the Action message to the remote CAS server
    
    150
    -        action_digest = cascache.push_message(casremote, action)
    
    151
    -
    
    152
    -        # Next, try to create a communication channel to the BuildGrid server.
    
    153
    -        url = urlparse(self.exec_url)
    
    154
    -        if not url.port:
    
    155
    -            raise SandboxError("You must supply a protocol and port number in the execution-service url, "
    
    156
    -                               "for example: http://buildservice:50051.")
    
    157
    -        if url.scheme == 'http':
    
    158
    -            channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
    
    159
    -        else:
    
    160
    -            raise SandboxError("Remote execution currently only supports the 'http' protocol "
    
    161
    -                               "and '{}' was supplied.".format(url.scheme))
    
    162 142
     
    
    143
    +        # Try to create a communication channel to the BuildGrid server.
    
    163 144
             stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    164 145
             request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
    
    165 146
                                                           skip_cache_lookup=False)
    
    ... ... @@ -279,13 +260,12 @@ class SandboxRemote(Sandbox):
    279 260
             # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
    
    280 261
             # from another hash will be interesting, though...
    
    281 262
     
    
    282
    -        new_dir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=dir_digest)
    
    263
    +        new_dir = CasBasedDirectory(context.artifactcache.cas, ref=dir_digest)
    
    283 264
             self._set_virtual_directory(new_dir)
    
    284 265
     
    
    285 266
         def _run(self, command, flags, *, cwd, env):
    
    286
    -        # Upload sources
    
    267
    +        # set up virtual dircetory
    
    287 268
             upload_vdir = self.get_virtual_directory()
    
    288
    -
    
    289 269
             cascache = self._get_context().get_cascache()
    
    290 270
             if isinstance(upload_vdir, FileBasedDirectory):
    
    291 271
                 # Make a new temporary directory to put source in
    
    ... ... @@ -294,16 +274,111 @@ class SandboxRemote(Sandbox):
    294 274
     
    
    295 275
             upload_vdir.recalculate_hash()
    
    296 276
     
    
    297
    -        casremote = CASRemote(self.storage_remote_spec)
    
    298
    -        # Now, push that key (without necessarily needing a ref) to the remote.
    
    277
    +        # Generate action_digest first
    
    278
    +        input_root_digest = upload_vdir.ref
    
    279
    +        command_proto = self._create_command(command, cwd, env)
    
    280
    +        command_digest = utils._message_digest(command_proto.SerializeToString())
    
    281
    +        action = remote_execution_pb2.Action(command_digest=command_digest,
    
    282
    +                                             input_root_digest=input_root_digest)
    
    283
    +        action_digest = utils._message_digest(action.SerializeToString())
    
    284
    +
    
    285
    +        # Next, try to create a communication channel to the BuildGrid server.
    
    286
    +        url = urlparse(self.exec_url)
    
    287
    +        if not url.port:
    
    288
    +            raise SandboxError("You must supply a protocol and port number in the execution-service url, "
    
    289
    +                               "for example: http://buildservice:50051.")
    
    290
    +        if url.scheme == 'http':
    
    291
    +            channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
    
    292
    +        else:
    
    293
    +            raise SandboxError("Remote execution currently only supports the 'http' protocol "
    
    294
    +                               "and '{}' was supplied.".format(url.scheme))
    
    295
    +
    
    296
    +        # check action cache download and download if there
    
    297
    +        action_result = self._check_action_cache(action_digest)
    
    298
    +
    
    299
    +        if not action_result:
    
    300
    +            casremote = CASRemote(self.storage_remote_spec)
    
    301
    +
    
    302
    +            # Now, push that key (without necessarily needing a ref) to the remote.
    
    303
    +            try:
    
    304
    +                cascache.push_directory(casremote, upload_vdir)
    
    305
    +            except grpc.RpcError as e:
    
    306
    +                raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
    
    307
    +
    
    308
    +            if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
    
    309
    +                raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
    
    310
    +
    
    311
    +            # Push command and action
    
    312
    +            try:
    
    313
    +                cascache.push_message(casremote, command_proto)
    
    314
    +            except grpc.RpcError as e:
    
    315
    +                raise SandboxError("Failed to push command to remote: {}".format(e))
    
    316
    +
    
    317
    +            try:
    
    318
    +                cascache.push_message(casremote, action)
    
    319
    +            except grpc.RpcError as e:
    
    320
    +                raise SandboxError("Failed to push action to remote: {}".format(e))
    
    321
    +
    
    322
    +            # Now request to execute the action
    
    323
    +            operation = self.run_remote_command(channel, action_digest)
    
    324
    +            action_result = self._extract_action_result(operation)
    
    325
    +
    
    326
    +        if action_result.exit_code != 0:
    
    327
    +            # A normal error during the build: the remote execution system
    
    328
    +            # has worked correctly but the command failed.
    
    329
    +            # action_result.stdout and action_result.stderr also contains
    
    330
    +            # build command outputs which we ignore at the moment.
    
    331
    +            return action_result.exit_code
    
    332
    +
    
    333
    +        # Get output of build
    
    334
    +        self.process_job_output(action_result.output_directories, action_result.output_files)
    
    335
    +
    
    336
    +        return 0
    
    337
    +
    
    338
    +    def _check_action_cache(self, action_digest):
    
    339
    +        # Checks the action cache to see if this artifact has already been built
    
    340
    +        #
    
    341
    +        # Should return either the action response or None if not found, raise
    
    342
    +        # Sandboxerror if other grpc error was raised
    
    343
    +        if not self.action_url:
    
    344
    +            return None
    
    345
    +        url = urlparse(self.action_url)
    
    346
    +        if not url.port:
    
    347
    +            raise SandboxError("You must supply a protocol and port number in the action-cache-service url, "
    
    348
    +                               "for example: http://buildservice:50051.")
    
    349
    +        if not url.scheme == "http":
    
    350
    +            raise SandboxError("Currently only support http for the action cache"
    
    351
    +                               "and {} was supplied".format(url.scheme))
    
    352
    +
    
    353
    +        channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
    
    354
    +        request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
    
    355
    +        stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
    
    299 356
             try:
    
    300
    -            cascache.push_directory(casremote, upload_vdir)
    
    357
    +            result = stub.GetActionResult(request)
    
    301 358
             except grpc.RpcError as e:
    
    302
    -            raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
    
    359
    +            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    360
    +                raise SandboxError("Failed to query action cache: {} ({})"
    
    361
    +                                   .format(e.code(), e.details()))
    
    362
    +            else:
    
    363
    +                return None
    
    364
    +        else:
    
    365
    +            self.info("Action result found in action cache")
    
    366
    +            return result
    
    303 367
     
    
    304
    -        # Now transmit the command to execute
    
    305
    -        operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
    
    368
    +    def _create_command(self, command, working_directory, environment):
    
    369
    +        # Creates a command proto
    
    370
    +        environment_variables = [remote_execution_pb2.Command.
    
    371
    +                                 EnvironmentVariable(name=k, value=v)
    
    372
    +                                 for (k, v) in environment.items()]
    
    373
    +        return remote_execution_pb2.Command(arguments=command,
    
    374
    +                                            working_directory=working_directory,
    
    375
    +                                            environment_variables=environment_variables,
    
    376
    +                                            output_files=[],
    
    377
    +                                            output_directories=[self._output_directory],
    
    378
    +                                            platform=None)
    
    306 379
     
    
    380
    +    @staticmethod
    
    381
    +    def _extract_action_result(operation):
    
    307 382
             if operation is None:
    
    308 383
                 # Failure of remote execution, usually due to an error in BuildStream
    
    309 384
                 raise SandboxError("No response returned from server")
    
    ... ... @@ -324,18 +399,7 @@ class SandboxRemote(Sandbox):
    324 399
                 else:
    
    325 400
                     raise SandboxError("Remote server failed at executing the build request.")
    
    326 401
     
    
    327
    -        action_result = execution_response.result
    
    328
    -
    
    329
    -        if action_result.exit_code != 0:
    
    330
    -            # A normal error during the build: the remote execution system
    
    331
    -            # has worked correctly but the command failed.
    
    332
    -            # action_result.stdout and action_result.stderr also contains
    
    333
    -            # build command outputs which we ignore at the moment.
    
    334
    -            return action_result.exit_code
    
    335
    -
    
    336
    -        self.process_job_output(action_result.output_directories, action_result.output_files)
    
    337
    -
    
    338
    -        return 0
    
    402
    +        return execution_response.result
    
    339 403
     
    
    340 404
         def _create_batch(self, main_group, flags, *, collect=None):
    
    341 405
             return _SandboxRemoteBatch(self, main_group, flags, collect=collect)
    

  • buildstream/utils.py
    ... ... @@ -41,6 +41,7 @@ import psutil
    41 41
     
    
    42 42
     from . import _signals
    
    43 43
     from ._exceptions import BstError, ErrorDomain
    
    44
    +from ._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    44 45
     
    
    45 46
     # The magic number for timestamps: 2011-11-11 11:11:11
    
    46 47
     _magic_timestamp = calendar.timegm([2011, 11, 11, 11, 11, 11])
    
    ... ... @@ -1242,3 +1243,19 @@ def _deduplicate(iterable, key=None):
    1242 1243
     def _get_link_mtime(path):
    
    1243 1244
         path_stat = os.lstat(path)
    
    1244 1245
         return path_stat.st_mtime
    
    1246
    +
    
    1247
    +
    
    1248
    +# _message_digest()
    
    1249
    +#
    
    1250
    +# Args:
    
    1251
    +#    message_buffer (str): String to create digest of
    
    1252
    +#
    
    1253
    +# Returns:
    
    1254
    +#    (remote_execution_pb2.Digest): Content digest
    
    1255
    +#
    
    1256
    +def _message_digest(message_buffer):
    
    1257
    +    sha = hashlib.sha256(message_buffer)
    
    1258
    +    digest = remote_execution_pb2.Digest()
    
    1259
    +    digest.hash = sha.hexdigest()
    
    1260
    +    digest.size_bytes = len(message_buffer)
    
    1261
    +    return digest

  • doc/source/format_project.rst
    ... ... @@ -238,6 +238,8 @@ using the `remote-execution` option:
    238 238
           server-cert: server.crt
    
    239 239
           client-cert: client.crt
    
    240 240
           client-key: client.key
    
    241
    +    action-cache-service:
    
    242
    +      url: http://bar.action.com:50052
    
    241 243
     
    
    242 244
     The execution-service part of remote execution does not support encrypted
    
    243 245
     connections yet, so the protocol must always be http.
    
    ... ... @@ -245,6 +247,11 @@ connections yet, so the protocol must always be http.
    245 247
     storage-service specifies a remote CAS store and the parameters are the
    
    246 248
     same as those used to specify an :ref:`artifact server <artifacts>`.
    
    247 249
     
    
    250
    +The action-cache-service specifies where built actions are cached, allowing
    
    251
    +buildstream to check whether an action has already been executed and download it
    
    252
    +if so. This is similar to the artifact cache but REAPI specified, and is
    
    253
    +optional for remote execution to work.
    
    254
    +
    
    248 255
     The storage service may be the same endpoint used for artifact
    
    249 256
     caching. Remote execution cannot work without push access to the
    
    250 257
     storage endpoint, so you must specify a client certificate and key,
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]