[Notes] [Git][BuildStream/buildstream][mac_fixes] 21 commits: element.py: Fix cache check in non-strict mode



Title: GitLab

Phillip Smyth pushed to branch mac_fixes at BuildStream / buildstream

Commits:

17 changed files:

Changes:

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -44,6 +44,11 @@ from .._exceptions import ArtifactError
    44 44
     from . import ArtifactCache
    
    45 45
     
    
    46 46
     
    
    47
    +# The default limit for gRPC messages is 4 MiB.
    
    48
    +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    49
    +_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    50
    +
    
    51
    +
    
    47 52
     # A CASCache manages artifacts in a CAS repository as specified in the
    
    48 53
     # Remote Execution API.
    
    49 54
     #
    
    ... ... @@ -854,6 +859,80 @@ class CASCache(ArtifactCache):
    854 859
     
    
    855 860
             assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    856 861
     
    
    862
    +    # _ensure_blob():
    
    863
    +    #
    
    864
    +    # Fetch and add blob if it's not already local.
    
    865
    +    #
    
    866
    +    # Args:
    
    867
    +    #     remote (Remote): The remote to use.
    
    868
    +    #     digest (Digest): Digest object for the blob to fetch.
    
    869
    +    #
    
    870
    +    # Returns:
    
    871
    +    #     (str): The path of the object
    
    872
    +    #
    
    873
    +    def _ensure_blob(self, remote, digest):
    
    874
    +        objpath = self.objpath(digest)
    
    875
    +        if os.path.exists(objpath):
    
    876
    +            # already in local repository
    
    877
    +            return objpath
    
    878
    +
    
    879
    +        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    880
    +            self._fetch_blob(remote, digest, f)
    
    881
    +
    
    882
    +            added_digest = self.add_object(path=f.name)
    
    883
    +            assert added_digest.hash == digest.hash
    
    884
    +
    
    885
    +        return objpath
    
    886
    +
    
    887
    +    def _batch_download_complete(self, batch):
    
    888
    +        for digest, data in batch.send():
    
    889
    +            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    890
    +                f.write(data)
    
    891
    +                f.flush()
    
    892
    +
    
    893
    +                added_digest = self.add_object(path=f.name)
    
    894
    +                assert added_digest.hash == digest.hash
    
    895
    +
    
    896
    +    # Helper function for _fetch_directory().
    
    897
    +    def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
    
    898
    +        self._batch_download_complete(batch)
    
    899
    +
    
    900
    +        # All previously scheduled directories are now locally available,
    
    901
    +        # move them to the processing queue.
    
    902
    +        fetch_queue.extend(fetch_next_queue)
    
    903
    +        fetch_next_queue.clear()
    
    904
    +        return _CASBatchRead(remote)
    
    905
    +
    
    906
    +    # Helper function for _fetch_directory().
    
    907
    +    def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
    
    908
    +        in_local_cache = os.path.exists(self.objpath(digest))
    
    909
    +
    
    910
    +        if in_local_cache:
    
    911
    +            # Skip download, already in local cache.
    
    912
    +            pass
    
    913
    +        elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    914
    +                not remote.batch_read_supported):
    
    915
    +            # Too large for batch request, download in independent request.
    
    916
    +            self._ensure_blob(remote, digest)
    
    917
    +            in_local_cache = True
    
    918
    +        else:
    
    919
    +            if not batch.add(digest):
    
    920
    +                # Not enough space left in batch request.
    
    921
    +                # Complete pending batch first.
    
    922
    +                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    923
    +                batch.add(digest)
    
    924
    +
    
    925
    +        if recursive:
    
    926
    +            if in_local_cache:
    
    927
    +                # Add directory to processing queue.
    
    928
    +                fetch_queue.append(digest)
    
    929
    +            else:
    
    930
    +                # Directory will be available after completing pending batch.
    
    931
    +                # Add directory to deferred processing queue.
    
    932
    +                fetch_next_queue.append(digest)
    
    933
    +
    
    934
    +        return batch
    
    935
    +
    
    857 936
         # _fetch_directory():
    
    858 937
         #
    
    859 938
         # Fetches remote directory and adds it to content addressable store.
    
    ... ... @@ -867,39 +946,32 @@ class CASCache(ArtifactCache):
    867 946
         #     dir_digest (Digest): Digest object for the directory to fetch.
    
    868 947
         #
    
    869 948
         def _fetch_directory(self, remote, dir_digest):
    
    870
    -        objpath = self.objpath(dir_digest)
    
    871
    -        if os.path.exists(objpath):
    
    872
    -            # already in local cache
    
    873
    -            return
    
    874
    -
    
    875
    -        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    876
    -            self._fetch_blob(remote, dir_digest, out)
    
    877
    -
    
    878
    -            directory = remote_execution_pb2.Directory()
    
    949
    +        fetch_queue = [dir_digest]
    
    950
    +        fetch_next_queue = []
    
    951
    +        batch = _CASBatchRead(remote)
    
    879 952
     
    
    880
    -            with open(out.name, 'rb') as f:
    
    881
    -                directory.ParseFromString(f.read())
    
    953
    +        while len(fetch_queue) + len(fetch_next_queue) > 0:
    
    954
    +            if len(fetch_queue) == 0:
    
    955
    +                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    882 956
     
    
    883
    -            for filenode in directory.files:
    
    884
    -                fileobjpath = self.objpath(filenode.digest)
    
    885
    -                if os.path.exists(fileobjpath):
    
    886
    -                    # already in local cache
    
    887
    -                    continue
    
    957
    +            dir_digest = fetch_queue.pop(0)
    
    888 958
     
    
    889
    -                with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    890
    -                    self._fetch_blob(remote, filenode.digest, f)
    
    959
    +            objpath = self._ensure_blob(remote, dir_digest)
    
    891 960
     
    
    892
    -                    digest = self.add_object(path=f.name)
    
    893
    -                    assert digest.hash == filenode.digest.hash
    
    961
    +            directory = remote_execution_pb2.Directory()
    
    962
    +            with open(objpath, 'rb') as f:
    
    963
    +                directory.ParseFromString(f.read())
    
    894 964
     
    
    895 965
                 for dirnode in directory.directories:
    
    896
    -                self._fetch_directory(remote, dirnode.digest)
    
    966
    +                batch = self._fetch_directory_node(remote, dirnode.digest, batch,
    
    967
    +                                                   fetch_queue, fetch_next_queue, recursive=True)
    
    968
    +
    
    969
    +            for filenode in directory.files:
    
    970
    +                batch = self._fetch_directory_node(remote, filenode.digest, batch,
    
    971
    +                                                   fetch_queue, fetch_next_queue)
    
    897 972
     
    
    898
    -            # Place directory blob only in final location when we've
    
    899
    -            # downloaded all referenced blobs to avoid dangling
    
    900
    -            # references in the repository.
    
    901
    -            digest = self.add_object(path=out.name)
    
    902
    -            assert digest.hash == dir_digest.hash
    
    973
    +        # Fetch final batch
    
    974
    +        self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    903 975
     
    
    904 976
         def _fetch_tree(self, remote, digest):
    
    905 977
             # download but do not store the Tree object
    
    ... ... @@ -914,16 +986,7 @@ class CASCache(ArtifactCache):
    914 986
                 tree.children.extend([tree.root])
    
    915 987
                 for directory in tree.children:
    
    916 988
                     for filenode in directory.files:
    
    917
    -                    fileobjpath = self.objpath(filenode.digest)
    
    918
    -                    if os.path.exists(fileobjpath):
    
    919
    -                        # already in local cache
    
    920
    -                        continue
    
    921
    -
    
    922
    -                    with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    923
    -                        self._fetch_blob(remote, filenode.digest, f)
    
    924
    -
    
    925
    -                        added_digest = self.add_object(path=f.name)
    
    926
    -                        assert added_digest.hash == filenode.digest.hash
    
    989
    +                    self._ensure_blob(remote, filenode.digest)
    
    927 990
     
    
    928 991
                     # place directory blob only in final location when we've downloaded
    
    929 992
                     # all referenced blobs to avoid dangling references in the repository
    
    ... ... @@ -942,12 +1005,12 @@ class CASCache(ArtifactCache):
    942 1005
                 finished = False
    
    943 1006
                 remaining = digest.size_bytes
    
    944 1007
                 while not finished:
    
    945
    -                chunk_size = min(remaining, 64 * 1024)
    
    1008
    +                chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    946 1009
                     remaining -= chunk_size
    
    947 1010
     
    
    948 1011
                     request = bytestream_pb2.WriteRequest()
    
    949 1012
                     request.write_offset = offset
    
    950
    -                # max. 64 kB chunks
    
    1013
    +                # max. _MAX_PAYLOAD_BYTES chunks
    
    951 1014
                     request.data = instream.read(chunk_size)
    
    952 1015
                     request.resource_name = resname
    
    953 1016
                     request.finish_write = remaining <= 0
    
    ... ... @@ -1035,11 +1098,78 @@ class _CASRemote():
    1035 1098
     
    
    1036 1099
                 self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    1037 1100
                 self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    1101
    +            self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
    
    1038 1102
                 self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
    
    1039 1103
     
    
    1104
    +            self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    1105
    +            try:
    
    1106
    +                request = remote_execution_pb2.GetCapabilitiesRequest()
    
    1107
    +                response = self.capabilities.GetCapabilities(request)
    
    1108
    +                server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
    
    1109
    +                if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
    
    1110
    +                    self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
    
    1111
    +            except grpc.RpcError as e:
    
    1112
    +                # Simply use the defaults for servers that don't implement GetCapabilities()
    
    1113
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1114
    +                    raise
    
    1115
    +
    
    1116
    +            # Check whether the server supports BatchReadBlobs()
    
    1117
    +            self.batch_read_supported = False
    
    1118
    +            try:
    
    1119
    +                request = remote_execution_pb2.BatchReadBlobsRequest()
    
    1120
    +                response = self.cas.BatchReadBlobs(request)
    
    1121
    +                self.batch_read_supported = True
    
    1122
    +            except grpc.RpcError as e:
    
    1123
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    1124
    +                    raise
    
    1125
    +
    
    1040 1126
                 self._initialized = True
    
    1041 1127
     
    
    1042 1128
     
    
    1129
    +# Represents a batch of blobs queued for fetching.
    
    1130
    +#
    
    1131
    +class _CASBatchRead():
    
    1132
    +    def __init__(self, remote):
    
    1133
    +        self._remote = remote
    
    1134
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1135
    +        self._request = remote_execution_pb2.BatchReadBlobsRequest()
    
    1136
    +        self._size = 0
    
    1137
    +        self._sent = False
    
    1138
    +
    
    1139
    +    def add(self, digest):
    
    1140
    +        assert not self._sent
    
    1141
    +
    
    1142
    +        new_batch_size = self._size + digest.size_bytes
    
    1143
    +        if new_batch_size > self._max_total_size_bytes:
    
    1144
    +            # Not enough space left in current batch
    
    1145
    +            return False
    
    1146
    +
    
    1147
    +        request_digest = self._request.digests.add()
    
    1148
    +        request_digest.hash = digest.hash
    
    1149
    +        request_digest.size_bytes = digest.size_bytes
    
    1150
    +        self._size = new_batch_size
    
    1151
    +        return True
    
    1152
    +
    
    1153
    +    def send(self):
    
    1154
    +        assert not self._sent
    
    1155
    +        self._sent = True
    
    1156
    +
    
    1157
    +        if len(self._request.digests) == 0:
    
    1158
    +            return
    
    1159
    +
    
    1160
    +        batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    1161
    +
    
    1162
    +        for response in batch_response.responses:
    
    1163
    +            if response.status.code != grpc.StatusCode.OK.value[0]:
    
    1164
    +                raise ArtifactError("Failed to download blob {}: {}".format(
    
    1165
    +                    response.digest.hash, response.status.code))
    
    1166
    +            if response.digest.size_bytes != len(response.data):
    
    1167
    +                raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    1168
    +                    response.digest.hash, response.digest.size_bytes, len(response.data)))
    
    1169
    +
    
    1170
    +            yield (response.digest, response.data)
    
    1171
    +
    
    1172
    +
    
    1043 1173
     def _grouper(iterable, n):
    
    1044 1174
         while True:
    
    1045 1175
             try:
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -38,8 +38,9 @@ from .._context import Context
    38 38
     from .cascache import CASCache
    
    39 39
     
    
    40 40
     
    
    41
    -# The default limit for gRPC messages is 4 MiB
    
    42
    -_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
    
    41
    +# The default limit for gRPC messages is 4 MiB.
    
    42
    +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    43
    +_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    43 44
     
    
    44 45
     
    
    45 46
     # Trying to push an artifact that is too large
    
    ... ... @@ -158,7 +159,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    158 159
     
    
    159 160
                     remaining = client_digest.size_bytes - request.read_offset
    
    160 161
                     while remaining > 0:
    
    161
    -                    chunk_size = min(remaining, 64 * 1024)
    
    162
    +                    chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    162 163
                         remaining -= chunk_size
    
    163 164
     
    
    164 165
                         response = bytestream_pb2.ReadResponse()
    
    ... ... @@ -242,7 +243,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    242 243
     
    
    243 244
             for digest in request.digests:
    
    244 245
                 batch_size += digest.size_bytes
    
    245
    -            if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
    
    246
    +            if batch_size > _MAX_PAYLOAD_BYTES:
    
    246 247
                     context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    247 248
                     return response
    
    248 249
     
    
    ... ... @@ -269,7 +270,7 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
    269 270
             cache_capabilities = response.cache_capabilities
    
    270 271
             cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
    
    271 272
             cache_capabilities.action_cache_update_capabilities.update_enabled = False
    
    272
    -        cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
    
    273
    +        cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    273 274
             cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
    
    274 275
     
    
    275 276
             response.deprecated_api_version.major = 2
    

  • buildstream/_frontend/app.py
    ... ... @@ -115,14 +115,6 @@ class App():
    115 115
             else:
    
    116 116
                 self.colors = False
    
    117 117
     
    
    118
    -        # Increase the soft limit for open file descriptors to the maximum.
    
    119
    -        # SafeHardlinks FUSE needs to hold file descriptors for all processes in the sandbox.
    
    120
    -        # Avoid hitting the limit too quickly.
    
    121
    -        limits = resource.getrlimit(resource.RLIMIT_NOFILE)
    
    122
    -        if limits[0] != limits[1]:
    
    123
    -            # Set soft limit to hard limit
    
    124
    -            resource.setrlimit(resource.RLIMIT_NOFILE, (limits[1], limits[1]))
    
    125
    -
    
    126 118
         # create()
    
    127 119
         #
    
    128 120
         # Should be used instead of the regular constructor.
    

  • buildstream/_platform/darwin.py
    1
    +#
    
    2
    +#  Copyright (C) 2017 Codethink Limited
    
    3
    +#  Copyright (C) 2018 Bloomberg Finance LP
    
    4
    +#
    
    5
    +#  This program is free software; you can redistribute it and/or
    
    6
    +#  modify it under the terms of the GNU Lesser General Public
    
    7
    +#  License as published by the Free Software Foundation; either
    
    8
    +#  version 2 of the License, or (at your option) any later version.
    
    9
    +#
    
    10
    +#  This library is distributed in the hope that it will be useful,
    
    11
    +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    12
    +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    13
    +#  Lesser General Public License for more details.
    
    14
    +#
    
    15
    +#  You should have received a copy of the GNU Lesser General Public
    
    16
    +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    17
    +
    
    18
    +import os
    
    19
    +import resource
    
    20
    +
    
    21
    +from .._exceptions import PlatformError
    
    22
    +from ..sandbox import SandboxDummy
    
    23
    +
    
    24
    +from . import Platform
    
    25
    +
    
    26
    +
    
    27
    +class Darwin(Platform):
    
    28
    +
    
    29
    +    # This value comes from OPEN_MAX in syslimits.h
    
    30
    +    OPEN_MAX = 10240
    
    31
    +
    
    32
    +    def __init__(self, context):
    
    33
    +
    
    34
    +        super().__init__(context)
    
    35
    +
    
    36
    +    @property
    
    37
    +    def artifactcache(self):
    
    38
    +        return self._artifact_cache
    
    39
    +
    
    40
    +    def create_sandbox(self, *args, **kwargs):
    
    41
    +        return SandboxDummy(*args, **kwargs)
    
    42
    +
    
    43
    +    def get_cpu_count(self, cap=None):
    
    44
    +        if cap < os.cpu_count():
    
    45
    +            return cap
    
    46
    +        else:
    
    47
    +            return os.cpu_count()
    
    48
    +
    
    49
    +    def set_resource_limits(self, soft_limit=OPEN_MAX, hard_limit=None):
    
    50
    +        super().set_resource_limits(soft_limit)

  • buildstream/_platform/linux.py
    ... ... @@ -17,13 +17,14 @@
    17 17
     #  Authors:
    
    18 18
     #        Tristan Maat <tristan maat codethink co uk>
    
    19 19
     
    
    20
    +import os
    
    20 21
     import subprocess
    
    21 22
     
    
    22 23
     from .. import _site
    
    23 24
     from .. import utils
    
    24 25
     from .._artifactcache.cascache import CASCache
    
    25 26
     from .._message import Message, MessageType
    
    26
    -from ..sandbox import SandboxBwrap
    
    27
    +from ..sandbox import SandboxDummy
    
    27 28
     
    
    28 29
     from . import Platform
    
    29 30
     
    
    ... ... @@ -32,27 +33,44 @@ class Linux(Platform):
    32 33
     
    
    33 34
         def __init__(self, context):
    
    34 35
     
    
    35
    -        super().__init__(context)
    
    36
    -
    
    37 36
             self._die_with_parent_available = _site.check_bwrap_version(0, 1, 8)
    
    38
    -        self._user_ns_available = self._check_user_ns_available(context)
    
    39
    -        self._artifact_cache = CASCache(context, enable_push=self._user_ns_available)
    
    37
    +
    
    38
    +        if self._local_sandbox_available():
    
    39
    +            self._user_ns_available = self._check_user_ns_available(context)
    
    40
    +        else:
    
    41
    +            self._user_ns_available = False
    
    42
    +
    
    43
    +        # _user_ns_available needs to be set before chaining up to the super class
    
    44
    +        # This is because it will call create_artifact_cache()
    
    45
    +        super().__init__(context)
    
    40 46
     
    
    41 47
         @property
    
    42 48
         def artifactcache(self):
    
    43 49
             return self._artifact_cache
    
    44 50
     
    
    45 51
         def create_sandbox(self, *args, **kwargs):
    
    46
    -        # Inform the bubblewrap sandbox as to whether it can use user namespaces or not
    
    47
    -        kwargs['user_ns_available'] = self._user_ns_available
    
    48
    -        kwargs['die_with_parent_available'] = self._die_with_parent_available
    
    49
    -        return SandboxBwrap(*args, **kwargs)
    
    52
    +        if not self._local_sandbox_available():
    
    53
    +            return SandboxDummy(*args, **kwargs)
    
    54
    +        else:
    
    55
    +            from ..sandbox._sandboxbwrap import SandboxBwrap
    
    56
    +            # Inform the bubblewrap sandbox as to whether it can use user namespaces or not
    
    57
    +            kwargs['user_ns_available'] = self._user_ns_available
    
    58
    +            kwargs['die_with_parent_available'] = self._die_with_parent_available
    
    59
    +            return SandboxBwrap(*args, **kwargs)
    
    60
    +
    
    61
    +    def create_artifact_cache(self, context, *, enable_push):
    
    62
    +        return super().create_artifact_cache(context=context, enable_push=self._user_ns_available)
    
    50 63
     
    
    51 64
         ################################################
    
    52 65
         #              Private Methods                 #
    
    53 66
         ################################################
    
    54
    -    def _check_user_ns_available(self, context):
    
    67
    +    def _local_sandbox_available(self):
    
    68
    +        try:
    
    69
    +            return os.path.exists(utils.get_host_tool('bwrap')) and os.path.exists('/dev/fuse')
    
    70
    +        except utils.ProgramNotFoundError:
    
    71
    +            return False
    
    55 72
     
    
    73
    +    def _check_user_ns_available(self, context):
    
    56 74
             # Here, lets check if bwrap is able to create user namespaces,
    
    57 75
             # issue a warning if it's not available, and save the state
    
    58 76
             # locally so that we can inform the sandbox to not try it
    

  • buildstream/_platform/platform.py
    ... ... @@ -19,8 +19,10 @@
    19 19
     
    
    20 20
     import os
    
    21 21
     import sys
    
    22
    +import resource
    
    22 23
     
    
    23 24
     from .._exceptions import PlatformError, ImplError
    
    25
    +from .._artifactcache.cascache import CASCache
    
    24 26
     
    
    25 27
     
    
    26 28
     class Platform():
    
    ... ... @@ -37,22 +39,28 @@ class Platform():
    37 39
         #
    
    38 40
         def __init__(self, context):
    
    39 41
             self.context = context
    
    42
    +        self.set_resource_limits()
    
    43
    +        self._artifact_cache = self.create_artifact_cache(context, enable_push=True)
    
    40 44
     
    
    41 45
         @classmethod
    
    42 46
         def create_instance(cls, *args, **kwargs):
    
    43
    -        if sys.platform.startswith('linux'):
    
    44
    -            backend = 'linux'
    
    45
    -        else:
    
    46
    -            backend = 'unix'
    
    47 47
     
    
    48 48
             # Meant for testing purposes and therefore hidden in the
    
    49 49
             # deepest corners of the source code. Try not to abuse this,
    
    50 50
             # please?
    
    51 51
             if os.getenv('BST_FORCE_BACKEND'):
    
    52 52
                 backend = os.getenv('BST_FORCE_BACKEND')
    
    53
    +        elif sys.platform.startswith('linux'):
    
    54
    +            backend = 'linux'
    
    55
    +        elif sys.platform.startswith('darwin'):
    
    56
    +            backend = 'darwin'
    
    57
    +        else:
    
    58
    +            backend = 'unix'
    
    53 59
     
    
    54 60
             if backend == 'linux':
    
    55 61
                 from .linux import Linux as PlatformImpl
    
    62
    +        elif backend == 'darwin':
    
    63
    +            from .darwin import Darwin as PlatformImpl
    
    56 64
             elif backend == 'unix':
    
    57 65
                 from .unix import Unix as PlatformImpl
    
    58 66
             else:
    
    ... ... @@ -66,6 +74,9 @@ class Platform():
    66 74
                 raise PlatformError("Platform needs to be initialized first")
    
    67 75
             return cls._instance
    
    68 76
     
    
    77
    +    def get_cpu_count(self, cap=None):
    
    78
    +        return min(len(os.sched_getaffinity(0)), cap)
    
    79
    +
    
    69 80
         ##################################################################
    
    70 81
         #                       Platform properties                      #
    
    71 82
         ##################################################################
    
    ... ... @@ -92,3 +103,18 @@ class Platform():
    92 103
         def create_sandbox(self, *args, **kwargs):
    
    93 104
             raise ImplError("Platform {platform} does not implement create_sandbox()"
    
    94 105
                             .format(platform=type(self).__name__))
    
    106
    +
    
    107
    +    def set_resource_limits(self, soft_limit=None, hard_limit=None):
    
    108
    +        # Need to set resources for _frontend/app.py as this is dependent on the platform
    
    109
    +        # SafeHardlinks FUSE needs to hold file descriptors for all processes in the sandbox.
    
    110
    +        # Avoid hitting the limit too quickly.
    
    111
    +        limits = resource.getrlimit(resource.RLIMIT_NOFILE)
    
    112
    +        if limits[0] != limits[1]:
    
    113
    +            if soft_limit is None:
    
    114
    +                soft_limit = limits[1]
    
    115
    +            if hard_limit is None:
    
    116
    +                hard_limit = limits[1]
    
    117
    +            resource.setrlimit(resource.RLIMIT_NOFILE, (soft_limit, hard_limit))
    
    118
    +
    
    119
    +    def create_artifact_cache(self, context, *, enable_push=True):
    
    120
    +        return CASCache(context=context, enable_push=enable_push)

  • buildstream/_platform/unix.py
    ... ... @@ -21,7 +21,6 @@ import os
    21 21
     
    
    22 22
     from .._artifactcache.cascache import CASCache
    
    23 23
     from .._exceptions import PlatformError
    
    24
    -from ..sandbox import SandboxChroot
    
    25 24
     
    
    26 25
     from . import Platform
    
    27 26
     
    
    ... ... @@ -31,7 +30,6 @@ class Unix(Platform):
    31 30
         def __init__(self, context):
    
    32 31
     
    
    33 32
             super().__init__(context)
    
    34
    -        self._artifact_cache = CASCache(context)
    
    35 33
     
    
    36 34
             # Not necessarily 100% reliable, but we want to fail early.
    
    37 35
             if os.geteuid() != 0:
    
    ... ... @@ -42,4 +40,5 @@ class Unix(Platform):
    42 40
             return self._artifact_cache
    
    43 41
     
    
    44 42
         def create_sandbox(self, *args, **kwargs):
    
    43
    +        from ..sandbox._sandboxchroot import SandboxChroot
    
    45 44
             return SandboxChroot(*args, **kwargs)

  • buildstream/_project.py
    ... ... @@ -38,6 +38,7 @@ from ._loader import Loader
    38 38
     from .element import Element
    
    39 39
     from ._message import Message, MessageType
    
    40 40
     from ._includes import Includes
    
    41
    +from ._platform import Platform
    
    41 42
     
    
    42 43
     
    
    43 44
     # Project Configuration file
    
    ... ... @@ -617,7 +618,8 @@ class Project():
    617 618
             # Based on some testing (mainly on AWS), maximum effective
    
    618 619
             # max-jobs value seems to be around 8-10 if we have enough cores
    
    619 620
             # users should set values based on workload and build infrastructure
    
    620
    -        output.base_variables['max-jobs'] = str(min(len(os.sched_getaffinity(0)), 8))
    
    621
    +        platform = Platform.get_platform()
    
    622
    +        output.base_variables['max-jobs'] = str(platform.get_cpu_count(8))
    
    621 623
     
    
    622 624
             # Export options into variables, if that was requested
    
    623 625
             output.options.export_variables(output.base_variables)
    

  • buildstream/_stream.py
    ... ... @@ -641,6 +641,9 @@ class Stream():
    641 641
                 }
    
    642 642
                 workspaces.append(workspace_detail)
    
    643 643
     
    
    644
    +        if not workspaces:
    
    645
    +            workspaces = "No workspaces found"
    
    646
    +
    
    644 647
             _yaml.dump({
    
    645 648
                 'workspaces': workspaces
    
    646 649
             })
    

  • buildstream/element.py
    ... ... @@ -1532,8 +1532,6 @@ class Element(Plugin):
    1532 1532
                 with _signals.terminator(cleanup_rootdir), \
    
    1533 1533
                     self.__sandbox(rootdir, output_file, output_file, self.__sandbox_config) as sandbox:  # nopep8
    
    1534 1534
     
    
    1535
    -                sandbox_vroot = sandbox.get_virtual_directory()
    
    1536
    -
    
    1537 1535
                     # By default, the dynamic public data is the same as the static public data.
    
    1538 1536
                     # The plugin's assemble() method may modify this, though.
    
    1539 1537
                     self.__dynamic_public = _yaml.node_copy(self.__public)
    
    ... ... @@ -1581,7 +1579,6 @@ class Element(Plugin):
    1581 1579
                     finally:
    
    1582 1580
                         if collect is not None:
    
    1583 1581
                             try:
    
    1584
    -                            # Sandbox will probably have replaced its virtual directory, so get it again
    
    1585 1582
                                 sandbox_vroot = sandbox.get_virtual_directory()
    
    1586 1583
                                 collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
    
    1587 1584
                             except VirtualDirectoryError:
    
    ... ... @@ -1606,6 +1603,7 @@ class Element(Plugin):
    1606 1603
                             collectvdir.export_files(filesdir, can_link=True)
    
    1607 1604
     
    
    1608 1605
                         try:
    
    1606
    +                        sandbox_vroot = sandbox.get_virtual_directory()
    
    1609 1607
                             sandbox_build_dir = sandbox_vroot.descend(
    
    1610 1608
                                 self.get_variable('build-root').lstrip(os.sep).split(os.sep))
    
    1611 1609
                             # Hard link files from build-root dir to buildtreedir directory
    
    ... ... @@ -2084,7 +2082,7 @@ class Element(Plugin):
    2084 2082
         #
    
    2085 2083
         # Raises an error if the artifact is not cached.
    
    2086 2084
         #
    
    2087
    -    def __assert_cached(self, keystrength=_KeyStrength.STRONG):
    
    2085
    +    def __assert_cached(self, keystrength=None):
    
    2088 2086
             assert self.__is_cached(keystrength=keystrength), "{}: Missing artifact {}".format(
    
    2089 2087
                 self, self._get_brief_display_key())
    
    2090 2088
     
    

  • buildstream/sandbox/__init__.py
    ... ... @@ -18,6 +18,5 @@
    18 18
     #        Tristan Maat <tristan maat codethink co uk>
    
    19 19
     
    
    20 20
     from .sandbox import Sandbox, SandboxFlags
    
    21
    -from ._sandboxchroot import SandboxChroot
    
    22
    -from ._sandboxbwrap import SandboxBwrap
    
    23 21
     from ._sandboxremote import SandboxRemote
    
    22
    +from ._sandboxdummy import SandboxDummy

  • buildstream/sandbox/_sandboxdummy.py
    1
    +#
    
    2
    +#  Copyright (C) 2017 Codethink Limited
    
    3
    +#
    
    4
    +#  This program is free software; you can redistribute it and/or
    
    5
    +#  modify it under the terms of the GNU Lesser General Public
    
    6
    +#  License as published by the Free Software Foundation; either
    
    7
    +#  version 2 of the License, or (at your option) any later version.
    
    8
    +#
    
    9
    +#  This library is distributed in the hope that it will be useful,
    
    10
    +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    
    11
    +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
    
    12
    +#  Lesser General Public License for more details.
    
    13
    +#
    
    14
    +#  You should have received a copy of the GNU Lesser General Public
    
    15
    +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
    
    16
    +#
    
    17
    +#  Authors:
    
    18
    +
    
    19
    +from .._exceptions import SandboxError
    
    20
    +from . import Sandbox
    
    21
    +
    
    22
    +
    
    23
    +class SandboxDummy(Sandbox):
    
    24
    +    def __init__(self, *args, **kwargs):
    
    25
    +        super().__init__(*args, **kwargs)
    
    26
    +
    
    27
    +    def run(self, command, flags, *, cwd=None, env=None):
    
    28
    +
    
    29
    +        # Fallback to the sandbox default settings for
    
    30
    +        # the cwd and env.
    
    31
    +        #
    
    32
    +        cwd = self._get_work_directory(cwd=cwd)
    
    33
    +        env = self._get_environment(cwd=cwd, env=env)
    
    34
    +
    
    35
    +        if not self._has_command(command[0], env):
    
    36
    +            raise SandboxError("Staged artifacts do not provide command "
    
    37
    +                               "'{}'".format(command[0]),
    
    38
    +                               reason='missing-command')
    
    39
    +
    
    40
    +        raise SandboxError("This platform does not support local builds")

  • buildstream/sandbox/sandbox.py
    ... ... @@ -110,6 +110,10 @@ class Sandbox():
    110 110
                 os.makedirs(directory_, exist_ok=True)
    
    111 111
             self._vdir = None
    
    112 112
     
    
    113
    +        # This is set if anyone requests access to the underlying
    
    114
    +        # directory via get_directory.
    
    115
    +        self._never_cache_vdirs = False
    
    116
    +
    
    113 117
         def get_directory(self):
    
    114 118
             """Fetches the sandbox root directory
    
    115 119
     
    
    ... ... @@ -122,24 +126,28 @@ class Sandbox():
    122 126
     
    
    123 127
             """
    
    124 128
             if self.__allow_real_directory:
    
    129
    +            self._never_cache_vdirs = True
    
    125 130
                 return self._root
    
    126 131
             else:
    
    127 132
                 raise BstError("You can't use get_directory")
    
    128 133
     
    
    129 134
         def get_virtual_directory(self):
    
    130
    -        """Fetches the sandbox root directory
    
    135
    +        """Fetches the sandbox root directory as a virtual Directory.
    
    131 136
     
    
    132 137
             The root directory is where artifacts for the base
    
    133
    -        runtime environment should be staged. Only works if
    
    134
    -        BST_VIRTUAL_DIRECTORY is not set.
    
    138
    +        runtime environment should be staged.
    
    139
    +
    
    140
    +        Use caution if you use get_directory and
    
    141
    +        get_virtual_directory.  If you alter the contents of the
    
    142
    +        directory returned by get_directory, all objects returned by
    
    143
    +        get_virtual_directory or derived from them are invalid and you
    
    144
    +        must call get_virtual_directory again to get a new copy.
    
    135 145
     
    
    136 146
             Returns:
    
    137
    -           (str): The sandbox root directory
    
    147
    +           (Directory): The sandbox root directory
    
    138 148
     
    
    139 149
             """
    
    140
    -        if not self._vdir:
    
    141
    -            # BST_CAS_DIRECTORIES is a deliberately hidden environment variable which
    
    142
    -            # can be used to switch on CAS-based directories for testing.
    
    150
    +        if self._vdir is None or self._never_cache_vdirs:
    
    143 151
                 if 'BST_CAS_DIRECTORIES' in os.environ:
    
    144 152
                     self._vdir = CasBasedDirectory(self.__context, ref=None)
    
    145 153
                 else:
    

  • buildstream/utils.py
    ... ... @@ -35,6 +35,7 @@ import tempfile
    35 35
     import itertools
    
    36 36
     import functools
    
    37 37
     from contextlib import contextmanager
    
    38
    +from stat import S_ISDIR
    
    38 39
     
    
    39 40
     import psutil
    
    40 41
     
    
    ... ... @@ -328,27 +329,25 @@ def safe_remove(path):
    328 329
         Raises:
    
    329 330
            UtilError: In the case of unexpected system call failures
    
    330 331
         """
    
    331
    -    if os.path.lexists(path):
    
    332
    -
    
    333
    -        # Try to remove anything that is in the way, but issue
    
    334
    -        # a warning instead if it removes a non empty directory
    
    335
    -        try:
    
    332
    +    try:
    
    333
    +        if S_ISDIR(os.lstat(path).st_mode):
    
    334
    +            os.rmdir(path)
    
    335
    +        else:
    
    336 336
                 os.unlink(path)
    
    337
    -        except OSError as e:
    
    338
    -            if e.errno != errno.EISDIR:
    
    339
    -                raise UtilError("Failed to remove '{}': {}"
    
    340
    -                                .format(path, e))
    
    341
    -
    
    342
    -            try:
    
    343
    -                os.rmdir(path)
    
    344
    -            except OSError as e:
    
    345
    -                if e.errno == errno.ENOTEMPTY:
    
    346
    -                    return False
    
    347
    -                else:
    
    348
    -                    raise UtilError("Failed to remove '{}': {}"
    
    349
    -                                    .format(path, e))
    
    350 337
     
    
    351
    -    return True
    
    338
    +        # File removed/unlinked successfully
    
    339
    +        return True
    
    340
    +
    
    341
    +    except OSError as e:
    
    342
    +        if e.errno == errno.ENOTEMPTY:
    
    343
    +            # Path is non-empty directory
    
    344
    +            return False
    
    345
    +        elif e.errno == errno.ENOENT:
    
    346
    +            # Path does not exist
    
    347
    +            return True
    
    348
    +
    
    349
    +    raise UtilError("Failed to remove '{}': {}"
    
    350
    +                    .format(path, e))
    
    352 351
     
    
    353 352
     
    
    354 353
     def copy_files(src, dest, *, files=None, ignore_missing=False, report_written=False):
    

  • tests/frontend/cross_junction_workspace.py
    ... ... @@ -93,9 +93,10 @@ def test_close_cross_junction(cli, tmpdir):
    93 93
         result.assert_success()
    
    94 94
     
    
    95 95
         loaded = _yaml.load_data(result.output)
    
    96
    -    assert isinstance(loaded.get('workspaces'), list)
    
    97
    -    workspaces = loaded['workspaces']
    
    98
    -    assert len(workspaces) == 0
    
    96
    +    if not loaded['workspaces'] == "No workspaces found":
    
    97
    +        assert isinstance(loaded.get('workspaces'), list)
    
    98
    +        workspaces = loaded['workspaces']
    
    99
    +        assert len(workspaces) == 0
    
    99 100
     
    
    100 101
     
    
    101 102
     def test_close_all_cross_junction(cli, tmpdir):
    
    ... ... @@ -112,6 +113,7 @@ def test_close_all_cross_junction(cli, tmpdir):
    112 113
         result.assert_success()
    
    113 114
     
    
    114 115
         loaded = _yaml.load_data(result.output)
    
    115
    -    assert isinstance(loaded.get('workspaces'), list)
    
    116
    -    workspaces = loaded['workspaces']
    
    117
    -    assert len(workspaces) == 0
    116
    +    if not loaded['workspaces'] == "No workspaces found":
    
    117
    +        assert isinstance(loaded.get('workspaces'), list)
    
    118
    +        workspaces = loaded['workspaces']
    
    119
    +        assert len(workspaces) == 0

  • tests/frontend/project/elements/rebuild-target.bst
    1
    +kind: compose
    
    2
    +
    
    3
    +build-depends:
    
    4
    +- target.bst

  • tests/frontend/rebuild.py
    1
    +import os
    
    2
    +import pytest
    
    3
    +from tests.testutils import cli
    
    4
    +
    
    5
    +# Project directory
    
    6
    +DATA_DIR = os.path.join(
    
    7
    +    os.path.dirname(os.path.realpath(__file__)),
    
    8
    +    "project",
    
    9
    +)
    
    10
    +
    
    11
    +
    
    12
    +def strict_args(args, strict):
    
    13
    +    if strict != "strict":
    
    14
    +        return ['--no-strict'] + args
    
    15
    +    return args
    
    16
    +
    
    17
    +
    
    18
    +@pytest.mark.datafiles(DATA_DIR)
    
    19
    +@pytest.mark.parametrize("strict", ["strict", "non-strict"])
    
    20
    +def test_rebuild(datafiles, cli, strict):
    
    21
    +    project = os.path.join(datafiles.dirname, datafiles.basename)
    
    22
    +    checkout = os.path.join(cli.directory, 'checkout')
    
    23
    +
    
    24
    +    # First build intermediate target.bst
    
    25
    +    result = cli.run(project=project, args=strict_args(['build', 'target.bst'], strict))
    
    26
    +    result.assert_success()
    
    27
    +
    
    28
    +    # Modify base import
    
    29
    +    with open(os.path.join(project, 'files', 'dev-files', 'usr', 'include', 'new.h'), "w") as f:
    
    30
    +        f.write("#define NEW")
    
    31
    +
    
    32
    +    # Rebuild base import and build top-level rebuild-target.bst
    
    33
    +    # In non-strict mode, this does not rebuild intermediate target.bst,
    
    34
    +    # which means that a weakly cached target.bst will be staged as dependency.
    
    35
    +    result = cli.run(project=project, args=strict_args(['build', 'rebuild-target.bst'], strict))
    
    36
    +    result.assert_success()



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]