[Notes] [Git][BuildStream/buildstream][jmac/remote_execution_rebase] 15 commits: _casbaseddirectory: Alter _save to save the serialised digest object, not the text digest



Title: GitLab

Jim MacArthur pushed to branch jmac/remote_execution_rebase at BuildStream / buildstream

Commits:

5 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -213,6 +213,30 @@ class CASCache(ArtifactCache):
    213 213
                 remotes_for_project = self._remotes[element._get_project()]
    
    214 214
                 return any(remote.spec.push for remote in remotes_for_project)
    
    215 215
     
    
    216
    +
    
    217
    +    def pull_key(self, key, size_bytes, project):
    
    218
    +        """ Pull a single key rather than an artifact.
    
    219
    +        Does not update local refs. """
    
    220
    +
    
    221
    +        for remote in self._remotes[project]:
    
    222
    +            try:
    
    223
    +                remote.init()
    
    224
    +
    
    225
    +                tree = remote_execution_pb2.Digest()
    
    226
    +                tree.hash = key
    
    227
    +                tree.size_bytes = size_bytes
    
    228
    +
    
    229
    +                self._fetch_directory(remote, tree)
    
    230
    +
    
    231
    +                # no need to pull from additional remotes
    
    232
    +                return True
    
    233
    +
    
    234
    +            except grpc.RpcError as e:
    
    235
    +                if e.code() != grpc.StatusCode.NOT_FOUND:
    
    236
    +                    raise
    
    237
    +
    
    238
    +        return False
    
    239
    +
    
    216 240
         def pull(self, element, key, *, progress=None):
    
    217 241
             ref = self.get_artifact_fullname(element, key)
    
    218 242
     
    
    ... ... @@ -254,10 +278,93 @@ class CASCache(ArtifactCache):
    254 278
     
    
    255 279
             self.set_ref(newref, tree)
    
    256 280
     
    
    281
    +    def _push_refs_to_remote(self, refs, remote, may_have_dependencies):
    
    282
    +        skipped_remote = True
    
    283
    +
    
    284
    +        try:
    
    285
    +            for ref in refs:
    
    286
    +                tree = self.resolve_ref(ref)
    
    287
    +
    
    288
    +                # Check whether ref is already on the server in which case
    
    289
    +                # there is no need to push the artifact
    
    290
    +                try:
    
    291
    +                    request = buildstream_pb2.GetReferenceRequest()
    
    292
    +                    request.key = ref
    
    293
    +                    response = remote.ref_storage.GetReference(request)
    
    294
    +
    
    295
    +                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    296
    +                        # ref is already on the server with the same tree
    
    297
    +                        continue
    
    298
    +
    
    299
    +                except grpc.RpcError as e:
    
    300
    +                    if e.code() != grpc.StatusCode.NOT_FOUND:
    
    301
    +                        # Intentionally re-raise RpcError for outer except block.
    
    302
    +                        raise
    
    303
    +
    
    304
    +                missing_blobs = {}
    
    305
    +                required_blobs = self._required_blobs(tree)
    
    306
    +
    
    307
    +                # Limit size of FindMissingBlobs request
    
    308
    +                for required_blobs_group in _grouper(required_blobs, 512):
    
    309
    +                    request = remote_execution_pb2.FindMissingBlobsRequest()
    
    310
    +
    
    311
    +                    for required_digest in required_blobs_group:
    
    312
    +                        d = request.blob_digests.add()
    
    313
    +                        d.hash = required_digest.hash
    
    314
    +                        d.size_bytes = required_digest.size_bytes
    
    315
    +
    
    316
    +                    response = remote.cas.FindMissingBlobs(request)
    
    317
    +                    for digest in response.missing_blob_digests:
    
    318
    +                        d = remote_execution_pb2.Digest()
    
    319
    +                        d.hash = digest.hash
    
    320
    +                        d.size_bytes = digest.size_bytes
    
    321
    +                        missing_blobs[d.hash] = d
    
    322
    +
    
    323
    +                # Upload any blobs missing on the server
    
    324
    +                skipped_remote = False
    
    325
    +                for digest in missing_blobs.values():
    
    326
    +                    def request_stream():
    
    327
    +                        resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    
    328
    +                        with open(self.objpath(digest), 'rb') as f:
    
    329
    +                            assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    330
    +                            offset = 0
    
    331
    +                            finished = False
    
    332
    +                            remaining = digest.size_bytes
    
    333
    +                            while not finished:
    
    334
    +                                chunk_size = min(remaining, 64 * 1024)
    
    335
    +                                remaining -= chunk_size
    
    336
    +
    
    337
    +                                request = bytestream_pb2.WriteRequest()
    
    338
    +                                request.write_offset = offset
    
    339
    +                                # max. 64 kB chunks
    
    340
    +                                request.data = f.read(chunk_size)
    
    341
    +                                request.resource_name = resource_name
    
    342
    +                                request.finish_write = remaining <= 0
    
    343
    +                                yield request
    
    344
    +                                offset += chunk_size
    
    345
    +                                finished = request.finish_write
    
    346
    +                    response = remote.bytestream.Write(request_stream())
    
    347
    +
    
    348
    +                request = buildstream_pb2.UpdateReferenceRequest()
    
    349
    +                request.keys.append(ref)
    
    350
    +                request.digest.hash = tree.hash
    
    351
    +                request.digest.size_bytes = tree.size_bytes
    
    352
    +                remote.ref_storage.UpdateReference(request)
    
    353
    +
    
    354
    +        except grpc.RpcError as e:
    
    355
    +            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    356
    +                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    357
    +
    
    358
    +        return not skipped_remote
    
    359
    +
    
    257 360
         def push(self, element, keys):
    
    361
    +        keys = list(keys)
    
    258 362
             refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    259 363
     
    
    260 364
             project = element._get_project()
    
    365
    +        return self.push_refs(refs, project, element=element)
    
    366
    +
    
    367
    +    def push_refs(self, refs, project, may_have_dependencies=True, element=None):
    
    261 368
     
    
    262 369
             push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    263 370
     
    
    ... ... @@ -265,94 +372,53 @@ class CASCache(ArtifactCache):
    265 372
     
    
    266 373
             for remote in push_remotes:
    
    267 374
                 remote.init()
    
    268
    -            skipped_remote = True
    
    269
    -            element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
    
    270
    -
    
    271
    -            try:
    
    272
    -                for ref in refs:
    
    273
    -                    tree = self.resolve_ref(ref)
    
    274
    -
    
    275
    -                    # Check whether ref is already on the server in which case
    
    276
    -                    # there is no need to push the artifact
    
    277
    -                    try:
    
    278
    -                        request = buildstream_pb2.GetReferenceRequest()
    
    279
    -                        request.key = ref
    
    280
    -                        response = remote.ref_storage.GetReference(request)
    
    281
    -
    
    282
    -                        if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    283
    -                            # ref is already on the server with the same tree
    
    284
    -                            continue
    
    285
    -
    
    286
    -                    except grpc.RpcError as e:
    
    287
    -                        if e.code() != grpc.StatusCode.NOT_FOUND:
    
    288
    -                            # Intentionally re-raise RpcError for outer except block.
    
    289
    -                            raise
    
    290
    -
    
    291
    -                    missing_blobs = {}
    
    292
    -                    required_blobs = self._required_blobs(tree)
    
    293
    -
    
    294
    -                    # Limit size of FindMissingBlobs request
    
    295
    -                    for required_blobs_group in _grouper(required_blobs, 512):
    
    296
    -                        request = remote_execution_pb2.FindMissingBlobsRequest()
    
    297
    -
    
    298
    -                        for required_digest in required_blobs_group:
    
    299
    -                            d = request.blob_digests.add()
    
    300
    -                            d.hash = required_digest.hash
    
    301
    -                            d.size_bytes = required_digest.size_bytes
    
    302
    -
    
    303
    -                        response = remote.cas.FindMissingBlobs(request)
    
    304
    -                        for digest in response.missing_blob_digests:
    
    305
    -                            d = remote_execution_pb2.Digest()
    
    306
    -                            d.hash = digest.hash
    
    307
    -                            d.size_bytes = digest.size_bytes
    
    308
    -                            missing_blobs[d.hash] = d
    
    309
    -
    
    310
    -                    # Upload any blobs missing on the server
    
    311
    -                    skipped_remote = False
    
    312
    -                    for digest in missing_blobs.values():
    
    313
    -                        def request_stream():
    
    314
    -                            resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    
    315
    -                            with open(self.objpath(digest), 'rb') as f:
    
    316
    -                                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    317
    -                                offset = 0
    
    318
    -                                finished = False
    
    319
    -                                remaining = digest.size_bytes
    
    320
    -                                while not finished:
    
    321
    -                                    chunk_size = min(remaining, 64 * 1024)
    
    322
    -                                    remaining -= chunk_size
    
    323
    -
    
    324
    -                                    request = bytestream_pb2.WriteRequest()
    
    325
    -                                    request.write_offset = offset
    
    326
    -                                    # max. 64 kB chunks
    
    327
    -                                    request.data = f.read(chunk_size)
    
    328
    -                                    request.resource_name = resource_name
    
    329
    -                                    request.finish_write = remaining <= 0
    
    330
    -                                    yield request
    
    331
    -                                    offset += chunk_size
    
    332
    -                                    finished = request.finish_write
    
    333
    -                        response = remote.bytestream.Write(request_stream())
    
    334
    -
    
    335
    -                    request = buildstream_pb2.UpdateReferenceRequest()
    
    336
    -                    request.keys.append(ref)
    
    337
    -                    request.digest.hash = tree.hash
    
    338
    -                    request.digest.size_bytes = tree.size_bytes
    
    339
    -                    remote.ref_storage.UpdateReference(request)
    
    340
    -
    
    341
    -                    pushed = True
    
    342
    -
    
    343
    -            except grpc.RpcError as e:
    
    344
    -                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    345
    -                    raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    346
    -
    
    347
    -            if skipped_remote:
    
    375
    +            if self._push_refs_to_remote(refs, remote, may_have_dependencies):
    
    376
    +                pushed = True
    
    377
    +            elif element:
    
    348 378
                     self.context.message(Message(
    
    349 379
                         None,
    
    350 380
                         MessageType.SKIPPED,
    
    351 381
                         "Remote ({}) already has {} cached".format(
    
    352 382
                             remote.spec.url, element._get_brief_display_key())
    
    353 383
                     ))
    
    384
    +
    
    385
    +        return pushed
    
    386
    +
    
    387
    +    def verify_key_pushed(self, key, project):
    
    388
    +        ref = key
    
    389
    +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
    
    390
    +
    
    391
    +        pushed = False
    
    392
    +
    
    393
    +        for remote in push_remotes:
    
    394
    +            remote.init()
    
    395
    +
    
    396
    +            if self._verify_ref_on_remote(ref, remote):
    
    397
    +                pushed = True
    
    398
    +
    
    354 399
             return pushed
    
    355 400
     
    
    401
    +    def _verify_ref_on_remote(self, ref, remote):
    
    402
    +        pushed = False
    
    403
    +        tree = self.resolve_ref(ref)
    
    404
    +
    
    405
    +        # Check whether ref is already on the server in which case
    
    406
    +        # there is no need to push the artifact
    
    407
    +        try:
    
    408
    +            request = buildstream_pb2.GetArtifactRequest()
    
    409
    +            request.key = ref
    
    410
    +            response = remote.artifact_cache.GetArtifact(request)
    
    411
    +
    
    412
    +            if response.artifact.hash == tree.hash and response.artifact.size_bytes == tree.size_bytes:
    
    413
    +                # ref is already on the server with the same tree
    
    414
    +                return True
    
    415
    +
    
    416
    +        except grpc.RpcError as e:
    
    417
    +            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    418
    +                raise
    
    419
    +
    
    420
    +        return False
    
    421
    +
    
    356 422
         ################################################
    
    357 423
         #                API Private Methods           #
    
    358 424
         ################################################
    
    ... ... @@ -726,26 +792,27 @@ class CASCache(ArtifactCache):
    726 792
                 #
    
    727 793
                 q.put(str(e))
    
    728 794
     
    
    729
    -    def _required_blobs(self, tree):
    
    795
    +    def _required_blobs(self, tree, may_have_dependencies=True):
    
    730 796
             # parse directory, and recursively add blobs
    
    731 797
             d = remote_execution_pb2.Digest()
    
    732 798
             d.hash = tree.hash
    
    733 799
             d.size_bytes = tree.size_bytes
    
    734 800
             yield d
    
    735 801
     
    
    736
    -        directory = remote_execution_pb2.Directory()
    
    802
    +        if may_have_dependencies:
    
    803
    +            directory = remote_execution_pb2.Directory()
    
    737 804
     
    
    738
    -        with open(self.objpath(tree), 'rb') as f:
    
    739
    -            directory.ParseFromString(f.read())
    
    805
    +            with open(self.objpath(tree), 'rb') as f:
    
    806
    +                directory.ParseFromString(f.read())
    
    740 807
     
    
    741
    -        for filenode in directory.files:
    
    742
    -            d = remote_execution_pb2.Digest()
    
    743
    -            d.hash = filenode.digest.hash
    
    744
    -            d.size_bytes = filenode.digest.size_bytes
    
    745
    -            yield d
    
    808
    +            for filenode in directory.files:
    
    809
    +                d = remote_execution_pb2.Digest()
    
    810
    +                d.hash = filenode.digest.hash
    
    811
    +                d.size_bytes = filenode.digest.size_bytes
    
    812
    +                yield d
    
    746 813
     
    
    747
    -        for dirnode in directory.directories:
    
    748
    -            yield from self._required_blobs(dirnode.digest)
    
    814
    +            for dirnode in directory.directories:
    
    815
    +                yield from self._required_blobs(dirnode.digest)
    
    749 816
     
    
    750 817
         def _fetch_blob(self, remote, digest, out):
    
    751 818
             resource_name = os.path.join(digest.hash, str(digest.size_bytes))
    

  • buildstream/buildelement.py
    ... ... @@ -155,6 +155,9 @@ class BuildElement(Element):
    155 155
                 command_dir = build_root
    
    156 156
             sandbox.set_work_directory(command_dir)
    
    157 157
     
    
    158
    +        # Tell sandbox which directory is preserved in the finished artifact
    
    159
    +        sandbox.set_output_directory(install_root)
    
    160
    +
    
    158 161
             # Setup environment
    
    159 162
             sandbox.set_environment(self.get_environment())
    
    160 163
     
    

  • buildstream/sandbox/_sandboxremote.py
    1
    +#!/usr/bin/env python3
    
    2
    +#
    
    3
    +#  Copyright (C) 2016 Codethink Limited
    
    4
    +#
    
    5
    +#  This program is free software; you can redistribute it and/or
    
    6
    +#  modify it under the terms of the GNU Lesser General Public
    
    7
    +#  License as published by the Free Software Foundation; either
    
    8
    +#  version 2 of the License, or (at your option) any later version.
    
    9
    +#
    
    10
    +#  This library is distributed in the hope that it will be useful,
    
    11
    +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    12
    +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    13
    +#  Lesser General Public License for more details.
    
    14
    +#
    
    15
    +#  You should have received a copy of the GNU Lesser General Public
    
    16
    +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    17
    +#
    
    18
    +#  Authors:
    
    19
    +#        Andrew Leeming <andrew leeming codethink co uk>
    
    20
    +#        Tristan Van Berkom <tristan vanberkom codethink co uk>
    
    21
    +import os
    
    22
    +import sys
    
    23
    +import time
    
    24
    +import errno
    
    25
    +import signal
    
    26
    +import subprocess
    
    27
    +import shutil
    
    28
    +from contextlib import ExitStack
    
    29
    +
    
    30
    +import grpc
    
    31
    +import psutil
    
    32
    +
    
    33
    +from .. import utils, _signals
    
    34
    +from ._mount import MountMap
    
    35
    +from . import Sandbox, SandboxFlags
    
    36
    +from ..storage._filebaseddirectory import FileBasedDirectory
    
    37
    +from ..storage._casbaseddirectory import CasBasedDirectory
    
    38
    +from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    39
    +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    40
    +from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    41
    +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    42
    +
    
    43
    +from .._artifactcache.cascache import CASCache
    
    44
    +
    
    45
    +class SandboxError(Exception):
    
    46
    +    pass
    
    47
    +
    
    48
    +# SandboxRemote()
    
    49
    +#
    
    50
    +# This isn't really a sandbox, it's a stub which sends all the source to a remote server and retrieves the results from it.
    
    51
    +#
    
    52
    +class SandboxRemote(Sandbox):
    
    53
    +
    
    54
    +    def __init__(self, *args, **kwargs):
    
    55
    +        super().__init__(*args, **kwargs)
    
    56
    +        self.user_ns_available = kwargs['user_ns_available']
    
    57
    +        self.die_with_parent_available = kwargs['die_with_parent_available']
    
    58
    +        self.cascache = None
    
    59
    +
    
    60
    +    def _get_cascache(self):
    
    61
    +        if self.cascache is None:
    
    62
    +            self.cascache = CASCache(self._get_context())
    
    63
    +            self.cascache.setup_remotes(use_config=True)
    
    64
    +        return self.cascache
    
    65
    +
    
    66
    +    def __run_remote_command(self, cascache, command, input_root_digest, environment):
    
    67
    +        environment_variables = []
    
    68
    +        for(k,v) in environment.items():
    
    69
    +            environment_variables.append(remote_execution_pb2.Command.EnvironmentVariable(name=k, value=v))
    
    70
    +        remote_command = remote_execution_pb2.Command(arguments=command, environment_variables = environment_variables)
    
    71
    +
    
    72
    +        # Serialise this into the cascache...
    
    73
    +        command_digest = cascache.add_object(buffer=remote_command.SerializeToString())
    
    74
    +
    
    75
    +        command_ref = 'worker-command/{}'.format(command_digest.hash)
    
    76
    +        cascache.set_ref(command_ref, command_digest)
    
    77
    +
    
    78
    +        command_push_successful = cascache.push_refs([command_ref], self._get_project(), may_have_dependencies=False)
    
    79
    +        if command_push_successful or cascache.verify_key_pushed(command_ref, self._get_project()):
    
    80
    +            # Next, try to create a communication channel
    
    81
    +            port = 50051
    
    82
    +            channel = grpc.insecure_channel('dekatron.office.codethink.co.uk:{}'.format(port))
    
    83
    +            stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    84
    +            ops_stub = operations_pb2_grpc.OperationsStub(channel)
    
    85
    +
    
    86
    +            # Having done that, create and send the action.
    
    87
    +
    
    88
    +            action = remote_execution_pb2.Action(command_digest = command_digest,
    
    89
    +                                                 input_root_digest = input_root_digest,
    
    90
    +                                                 output_files = [],
    
    91
    +                                                 output_directories = [self._output_directory],
    
    92
    +                                                 platform = None,
    
    93
    +                                                 timeout = None,
    
    94
    +                                                 do_not_cache = True)
    
    95
    +
    
    96
    +            request = remote_execution_pb2.ExecuteRequest(instance_name = 'default',
    
    97
    +                                                          action = action,
    
    98
    +                                                          skip_cache_lookup = True)
    
    99
    +
    
    100
    +            operation = stub.Execute(request) # Returns Operation
    
    101
    +            job_name = operation.name
    
    102
    +        else:
    
    103
    +            # Source push failed
    
    104
    +            return None
    
    105
    +        while True:
    
    106
    +            # TODO: Timeout
    
    107
    +            # Refresh the operation data periodically using the name
    
    108
    +            request = operations_pb2.GetOperationRequest(name=job_name)
    
    109
    +            operation = ops_stub.GetOperation(request)
    
    110
    +            sys.stderr.write("Operation {} is in stage <{}>\n".format(operation.name, operation.metadata))
    
    111
    +            sys.stderr.write("......... {} has response <{}>\n".format(operation.name, operation.response))
    
    112
    +            time.sleep(1)
    
    113
    +            if operation.done:
    
    114
    +                break
    
    115
    +        return operation
    
    116
    +
    
    117
    +    """ output_directories is an array of OutputDirectory objects
    
    118
    +    output_files is an array of OutputFile objects """
    
    119
    +    def process_job_output(self, output_directories, output_files):
    
    120
    +        # We only specify one output_directory, so it's an error
    
    121
    +        # for there to be any output files or more than one directory at the moment.
    
    122
    +
    
    123
    +        if len(output_files)>0:
    
    124
    +            raise SandboxError("Output files were returned when we didn't request any.")
    
    125
    +        if len(output_directories)>1:
    
    126
    +            raise SandboxError("More than one output directory was returned from the build server: {}".format(output_directories))
    
    127
    +
    
    128
    +        digest = output_directories[0].tree_digest
    
    129
    +        if digest is None or digest.hash is None or digest.hash=="":
    
    130
    +            raise SandboxError("Output directory structure had no digest attached.")
    
    131
    +
    
    132
    +        # Now do a pull to ensure we have the necessary parts.
    
    133
    +        cascache = self._get_cascache()
    
    134
    +        cascache.pull_key(digest.hash, digest.size_bytes, self._get_project())
    
    135
    +        path_components = os.path.split(self._output_directory)
    
    136
    +
    
    137
    +        # Now what we have is a digest for the output. Once we return, the calling process will
    
    138
    +        # attempt to descend into our directory and find that directory, so we need to overwrite
    
    139
    +        # that.
    
    140
    +
    
    141
    +        if len(path_components)==0:
    
    142
    +            # The artifact wants the whole directory; we could just return the returned hash in its
    
    143
    +            # place, but we don't have a means to do that yet.
    
    144
    +            raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
    
    145
    +
    
    146
    +        # At the moment, we will get the whole directory back in the first directory argument and we need
    
    147
    +        # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
    
    148
    +        # from another hash will be interesting, though...
    
    149
    +
    
    150
    +        new_dir = CasBasedDirectory(self._get_context(), ref=digest)
    
    151
    +        self.set_virtual_directory(new_dir)
    
    152
    +
    
    153
    +    def run(self, command, flags, *, cwd=None, env=None):
    
    154
    +        stdout, stderr = self._get_output()
    
    155
    +        sys.stderr.write("Attempting run with remote sandbox...\n")
    
    156
    +        # Upload sources
    
    157
    +        upload_vdir = self.get_virtual_directory()
    
    158
    +        if isinstance(upload_vdir, FileBasedDirectory):
    
    159
    +            # Make a new temporary directory to put source in
    
    160
    +            upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
    
    161
    +            upload_vdir.import_files(self.get_virtual_directory().get_underlying_directory())
    
    162
    +
    
    163
    +        # Now, push that key (without necessarily needing a ref) to the remote.
    
    164
    +        cascache = self._get_cascache()
    
    165
    +
    
    166
    +        ref = 'worker-source/{}'.format(upload_vdir.ref.hash)
    
    167
    +        upload_vdir._save(ref)
    
    168
    +        source_push_successful = cascache.push_refs([ref], self._get_project())
    
    169
    +        # Fallback to the sandbox default settings for
    
    170
    +        # the cwd and environment.
    
    171
    +
    
    172
    +        if env is None:
    
    173
    +            env = self._get_environment()
    
    174
    +
    
    175
    +        # We want command args as a list of strings
    
    176
    +        if isinstance(command, str):
    
    177
    +            command = [command]
    
    178
    +
    
    179
    +        # Now transmit the command to execute
    
    180
    +        if source_push_successful or cascache.verify_key_pushed(ref, self._get_project()):
    
    181
    +            response = self.__run_remote_command(cascache, command, upload_vdir.ref, env)
    
    182
    +
    
    183
    +            if response is None or response.HasField("error"):
    
    184
    +                # Build failed, so return a failure code
    
    185
    +                return 1
    
    186
    +            else:
    
    187
    +
    
    188
    +                # At the moment, response can either be an ExecutionResponse containing an ActionResult, or an ActionResult directly.
    
    189
    +                executeResponse = remote_execution_pb2.ExecuteResponse()
    
    190
    +                if response.response.Is(executeResponse.DESCRIPTOR):
    
    191
    +                    # Unpack ExecuteResponse and set response to its response
    
    192
    +                    response.response.Unpack(executeResponse)
    
    193
    +                    response = executeResponse
    
    194
    +
    
    195
    +                actionResult = remote_execution_pb2.ActionResult()
    
    196
    +                if response.response.Is(actionResult.DESCRIPTOR):
    
    197
    +                    response.response.Unpack(actionResult)
    
    198
    +                    self.process_job_output(actionResult.output_directories, actionResult.output_files)
    
    199
    +                else:
    
    200
    +                    sys.stderr.write("Received unknown message from server.\n")
    
    201
    +                    return 1
    
    202
    +        else:
    
    203
    +            sys.stderr.write("Failed to verify source on remote artifact cache.\n")
    
    204
    +            return 1
    
    205
    +        # TODO: Pull the results
    
    206
    +        sys.stderr.write("Completed remote run with sandbox.\n")
    
    207
    +        return 0

  • buildstream/sandbox/sandbox.py
    ... ... @@ -99,9 +99,11 @@ class Sandbox():
    99 99
             self.__stdout = kwargs['stdout']
    
    100 100
             self.__stderr = kwargs['stderr']
    
    101 101
     
    
    102
    -        # Setup the directories. Root should be available to subclasses, hence
    
    103
    -        # being single-underscore. The others are private to this class.
    
    102
    +        # Setup the directories. Root and output_directory should be
    
    103
    +        # available to subclasses, hence being single-underscore. The
    
    104
    +        # others are private to this class.
    
    104 105
             self._root = os.path.join(directory, 'root')
    
    106
    +        self._output_directory = None
    
    105 107
             self.__directory = directory
    
    106 108
             self.__scratch = os.path.join(self.__directory, 'scratch')
    
    107 109
             for directory_ in [self._root, self.__scratch]:
    
    ... ... @@ -142,11 +144,29 @@ class Sandbox():
    142 144
                     self._vdir = FileBasedDirectory(self._root)
    
    143 145
             return self._vdir
    
    144 146
     
    
    147
    +    def set_virtual_directory(self, vdir):
    
    148
    +        """ Sets virtual directory. Useful after remote execution
    
    149
    +        has rewritten the working directory. """
    
    150
    +        self.__vdir = vdir
    
    151
    +
    
    152
    +    def get_virtual_toplevel_directory(self):
    
    153
    +        """Fetches the sandbox's toplevel directory
    
    154
    +
    
    155
    +        The toplevel directory contains 'root', 'scratch' and later
    
    156
    +        'artifact' where output is copied to.
    
    157
    +
    
    158
    +        Returns:
    
    159
    +           (str): The sandbox toplevel directory
    
    160
    +
    
    161
    +        """
    
    162
    +        # For now, just create a new Directory every time we're asked
    
    163
    +        return FileBasedDirectory(self.__directory)
    
    164
    +
    
    145 165
         def set_environment(self, environment):
    
    146 166
             """Sets the environment variables for the sandbox
    
    147 167
     
    
    148 168
             Args:
    
    149
    -           directory (dict): The environment variables to use in the sandbox
    
    169
    +           environment (dict): The environment variables to use in the sandbox
    
    150 170
             """
    
    151 171
             self.__env = environment
    
    152 172
     
    
    ... ... @@ -158,6 +178,15 @@ class Sandbox():
    158 178
             """
    
    159 179
             self.__cwd = directory
    
    160 180
     
    
    181
    +    def set_output_directory(self, directory):
    
    182
    +        """Sets the output directory - the directory which is preserved
    
    183
    +        as an artifact after assembly.
    
    184
    +
    
    185
    +        Args:
    
    186
    +           directory (str): An absolute path within the sandbox
    
    187
    +        """
    
    188
    +        self._output_directory = directory
    
    189
    +
    
    161 190
         def mark_directory(self, directory, *, artifact=False):
    
    162 191
             """Marks a sandbox directory and ensures it will exist
    
    163 192
     
    

  • buildstream/storage/_casbaseddirectory.py
    ... ... @@ -343,6 +343,20 @@ class CasBasedDirectory(Directory):
    343 343
                         result.files_written.append(relative_pathname)
    
    344 344
             return result
    
    345 345
     
    
    346
    +    def _save(self, name):
    
    347
    +        """ Saves this directory into the content cache as a named ref. This function is not
    
    348
    +        currently in use, but may be useful later. """
    
    349
    +        self._recalculate_recursing_up()
    
    350
    +        self._recalculate_recursing_down()
    
    351
    +        (rel_refpath, refname) = os.path.split(name)
    
    352
    +        refdir = os.path.join(self.cas_directory, 'refs', 'heads', rel_refpath)
    
    353
    +        refname = os.path.join(refdir, refname)
    
    354
    +
    
    355
    +        if not os.path.exists(refdir):
    
    356
    +            os.makedirs(refdir)
    
    357
    +        with open(refname, "wb") as f:
    
    358
    +            f.write(self.ref.SerializeToString())
    
    359
    +
    
    346 360
         def import_files(self, external_pathspec, *, files=None,
    
    347 361
                          report_written=True, update_utimes=False,
    
    348 362
                          can_link=False):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]