[Notes] [Git][BuildStream/buildstream][bst-1.2] 9 commits: artifactcache: fix oversight



Title: GitLab

Jürg Billeter pushed to branch bst-1.2 at BuildStream / buildstream

Commits:

5 changed files:

Changes:

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -277,7 +277,7 @@ class ArtifactCache():
    277 277
                               "Please increase the cache-quota in {}."
    
    278 278
                               .format(self.context.config_origin or default_conf))
    
    279 279
     
    
    280
    -                if self.get_quota_exceeded():
    
    280
    +                if self.has_quota_exceeded():
    
    281 281
                         raise ArtifactError("Cache too full. Aborting.",
    
    282 282
                                             detail=detail,
    
    283 283
                                             reason="cache-too-full")
    
    ... ... @@ -364,14 +364,14 @@ class ArtifactCache():
    364 364
             self._cache_size = cache_size
    
    365 365
             self._write_cache_size(self._cache_size)
    
    366 366
     
    
    367
    -    # get_quota_exceeded()
    
    367
    +    # has_quota_exceeded()
    
    368 368
         #
    
    369 369
         # Checks if the current artifact cache size exceeds the quota.
    
    370 370
         #
    
    371 371
         # Returns:
    
    372 372
         #    (bool): True of the quota is exceeded
    
    373 373
         #
    
    374
    -    def get_quota_exceeded(self):
    
    374
    +    def has_quota_exceeded(self):
    
    375 375
             return self.get_cache_size() > self._cache_quota
    
    376 376
     
    
    377 377
         ################################################
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -43,6 +43,11 @@ from .._exceptions import ArtifactError
    43 43
     from . import ArtifactCache
    
    44 44
     
    
    45 45
     
    
    46
    +# The default limit for gRPC messages is 4 MiB.
    
    47
    +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    48
    +_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    49
    +
    
    50
    +
    
    46 51
     # A CASCache manages artifacts in a CAS repository as specified in the
    
    47 52
     # Remote Execution API.
    
    48 53
     #
    
    ... ... @@ -115,7 +120,7 @@ class CASCache(ArtifactCache):
    115 120
         def commit(self, element, content, keys):
    
    116 121
             refs = [self.get_artifact_fullname(element, key) for key in keys]
    
    117 122
     
    
    118
    -        tree = self._create_tree(content)
    
    123
    +        tree = self._commit_directory(content)
    
    119 124
     
    
    120 125
             for ref in refs:
    
    121 126
                 self.set_ref(ref, tree)
    
    ... ... @@ -330,12 +335,12 @@ class CASCache(ArtifactCache):
    330 335
                                     finished = False
    
    331 336
                                     remaining = digest.size_bytes
    
    332 337
                                     while not finished:
    
    333
    -                                    chunk_size = min(remaining, 64 * 1024)
    
    338
    +                                    chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    334 339
                                         remaining -= chunk_size
    
    335 340
     
    
    336 341
                                         request = bytestream_pb2.WriteRequest()
    
    337 342
                                         request.write_offset = offset
    
    338
    -                                    # max. 64 kB chunks
    
    343
    +                                    # max. _MAX_PAYLOAD_BYTES chunks
    
    339 344
                                         request.data = f.read(chunk_size)
    
    340 345
                                         request.resource_name = resname
    
    341 346
                                         request.finish_write = remaining <= 0
    
    ... ... @@ -623,7 +628,21 @@ class CASCache(ArtifactCache):
    623 628
         def _refpath(self, ref):
    
    624 629
             return os.path.join(self.casdir, 'refs', 'heads', ref)
    
    625 630
     
    
    626
    -    def _create_tree(self, path, *, digest=None):
    
    631
    +    # _commit_directory():
    
    632
    +    #
    
    633
    +    # Adds local directory to content addressable store.
    
    634
    +    #
    
    635
    +    # Adds files, symbolic links and recursively other directories in
    
    636
    +    # a local directory to the content addressable store.
    
    637
    +    #
    
    638
    +    # Args:
    
    639
    +    #     path (str): Path to the directory to add.
    
    640
    +    #     dir_digest (Digest): An optional Digest object to use.
    
    641
    +    #
    
    642
    +    # Returns:
    
    643
    +    #     (Digest): Digest object for the directory added.
    
    644
    +    #
    
    645
    +    def _commit_directory(self, path, *, dir_digest=None):
    
    627 646
             directory = remote_execution_pb2.Directory()
    
    628 647
     
    
    629 648
             for name in sorted(os.listdir(path)):
    
    ... ... @@ -632,7 +651,7 @@ class CASCache(ArtifactCache):
    632 651
                 if stat.S_ISDIR(mode):
    
    633 652
                     dirnode = directory.directories.add()
    
    634 653
                     dirnode.name = name
    
    635
    -                self._create_tree(full_path, digest=dirnode.digest)
    
    654
    +                self._commit_directory(full_path, dir_digest=dirnode.digest)
    
    636 655
                 elif stat.S_ISREG(mode):
    
    637 656
                     filenode = directory.files.add()
    
    638 657
                     filenode.name = name
    
    ... ... @@ -645,7 +664,8 @@ class CASCache(ArtifactCache):
    645 664
                 else:
    
    646 665
                     raise ArtifactError("Unsupported file type for {}".format(full_path))
    
    647 666
     
    
    648
    -        return self.add_object(digest=digest, buffer=directory.SerializeToString())
    
    667
    +        return self.add_object(digest=dir_digest,
    
    668
    +                               buffer=directory.SerializeToString())
    
    649 669
     
    
    650 670
         def _get_subdir(self, tree, subdir):
    
    651 671
             head, name = os.path.split(subdir)
    
    ... ... @@ -788,39 +808,119 @@ class CASCache(ArtifactCache):
    788 808
             out.flush()
    
    789 809
             assert digest.size_bytes == os.fstat(out.fileno()).st_size
    
    790 810
     
    
    791
    -    def _fetch_directory(self, remote, tree):
    
    792
    -        objpath = self.objpath(tree)
    
    811
    +    # _ensure_blob():
    
    812
    +    #
    
    813
    +    # Fetch and add blob if it's not already local.
    
    814
    +    #
    
    815
    +    # Args:
    
    816
    +    #     remote (Remote): The remote to use.
    
    817
    +    #     digest (Digest): Digest object for the blob to fetch.
    
    818
    +    #
    
    819
    +    # Returns:
    
    820
    +    #     (str): The path of the object
    
    821
    +    #
    
    822
    +    def _ensure_blob(self, remote, digest):
    
    823
    +        objpath = self.objpath(digest)
    
    793 824
             if os.path.exists(objpath):
    
    794
    -            # already in local cache
    
    795
    -            return
    
    825
    +            # already in local repository
    
    826
    +            return objpath
    
    796 827
     
    
    797
    -        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    798
    -            self._fetch_blob(remote, tree, out)
    
    828
    +        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    829
    +            self._fetch_blob(remote, digest, f)
    
    799 830
     
    
    800
    -            directory = remote_execution_pb2.Directory()
    
    831
    +            added_digest = self.add_object(path=f.name)
    
    832
    +            assert added_digest.hash == digest.hash
    
    801 833
     
    
    802
    -            with open(out.name, 'rb') as f:
    
    803
    -                directory.ParseFromString(f.read())
    
    834
    +        return objpath
    
    804 835
     
    
    805
    -            for filenode in directory.files:
    
    806
    -                fileobjpath = self.objpath(tree)
    
    807
    -                if os.path.exists(fileobjpath):
    
    808
    -                    # already in local cache
    
    809
    -                    continue
    
    836
    +    def _batch_download_complete(self, batch):
    
    837
    +        for digest, data in batch.send():
    
    838
    +            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    839
    +                f.write(data)
    
    840
    +                f.flush()
    
    841
    +
    
    842
    +                added_digest = self.add_object(path=f.name)
    
    843
    +                assert added_digest.hash == digest.hash
    
    844
    +
    
    845
    +    # Helper function for _fetch_directory().
    
    846
    +    def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
    
    847
    +        self._batch_download_complete(batch)
    
    810 848
     
    
    811
    -                with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    812
    -                    self._fetch_blob(remote, filenode.digest, f)
    
    849
    +        # All previously scheduled directories are now locally available,
    
    850
    +        # move them to the processing queue.
    
    851
    +        fetch_queue.extend(fetch_next_queue)
    
    852
    +        fetch_next_queue.clear()
    
    853
    +        return _CASBatchRead(remote)
    
    813 854
     
    
    814
    -                    digest = self.add_object(path=f.name)
    
    815
    -                    assert digest.hash == filenode.digest.hash
    
    855
    +    # Helper function for _fetch_directory().
    
    856
    +    def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
    
    857
    +        in_local_cache = os.path.exists(self.objpath(digest))
    
    858
    +
    
    859
    +        if in_local_cache:
    
    860
    +            # Skip download, already in local cache.
    
    861
    +            pass
    
    862
    +        elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    863
    +                not remote.batch_read_supported):
    
    864
    +            # Too large for batch request, download in independent request.
    
    865
    +            self._ensure_blob(remote, digest)
    
    866
    +            in_local_cache = True
    
    867
    +        else:
    
    868
    +            if not batch.add(digest):
    
    869
    +                # Not enough space left in batch request.
    
    870
    +                # Complete pending batch first.
    
    871
    +                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    872
    +                batch.add(digest)
    
    873
    +
    
    874
    +        if recursive:
    
    875
    +            if in_local_cache:
    
    876
    +                # Add directory to processing queue.
    
    877
    +                fetch_queue.append(digest)
    
    878
    +            else:
    
    879
    +                # Directory will be available after completing pending batch.
    
    880
    +                # Add directory to deferred processing queue.
    
    881
    +                fetch_next_queue.append(digest)
    
    882
    +
    
    883
    +        return batch
    
    884
    +
    
    885
    +    # _fetch_directory():
    
    886
    +    #
    
    887
    +    # Fetches remote directory and adds it to content addressable store.
    
    888
    +    #
    
    889
    +    # Fetches files, symbolic links and recursively other directories in
    
    890
    +    # the remote directory and adds them to the content addressable
    
    891
    +    # store.
    
    892
    +    #
    
    893
    +    # Args:
    
    894
    +    #     remote (Remote): The remote to use.
    
    895
    +    #     dir_digest (Digest): Digest object for the directory to fetch.
    
    896
    +    #
    
    897
    +    def _fetch_directory(self, remote, dir_digest):
    
    898
    +        fetch_queue = [dir_digest]
    
    899
    +        fetch_next_queue = []
    
    900
    +        batch = _CASBatchRead(remote)
    
    901
    +
    
    902
    +        while len(fetch_queue) + len(fetch_next_queue) > 0:
    
    903
    +            if len(fetch_queue) == 0:
    
    904
    +                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    905
    +
    
    906
    +            dir_digest = fetch_queue.pop(0)
    
    907
    +
    
    908
    +            objpath = self._ensure_blob(remote, dir_digest)
    
    909
    +
    
    910
    +            directory = remote_execution_pb2.Directory()
    
    911
    +            with open(objpath, 'rb') as f:
    
    912
    +                directory.ParseFromString(f.read())
    
    816 913
     
    
    817 914
                 for dirnode in directory.directories:
    
    818
    -                self._fetch_directory(remote, dirnode.digest)
    
    915
    +                batch = self._fetch_directory_node(remote, dirnode.digest, batch,
    
    916
    +                                                   fetch_queue, fetch_next_queue, recursive=True)
    
    917
    +
    
    918
    +            for filenode in directory.files:
    
    919
    +                batch = self._fetch_directory_node(remote, filenode.digest, batch,
    
    920
    +                                                   fetch_queue, fetch_next_queue)
    
    819 921
     
    
    820
    -            # place directory blob only in final location when we've downloaded
    
    821
    -            # all referenced blobs to avoid dangling references in the repository
    
    822
    -            digest = self.add_object(path=out.name)
    
    823
    -            assert digest.hash == tree.hash
    
    922
    +        # Fetch final batch
    
    923
    +        self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    824 924
     
    
    825 925
     
    
    826 926
     # Represents a single remote CAS cache.
    
    ... ... @@ -870,11 +970,78 @@ class _CASRemote():
    870 970
     
    
    871 971
                 self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    872 972
                 self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    973
    +            self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
    
    873 974
                 self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
    
    874 975
     
    
    976
    +            self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    977
    +            try:
    
    978
    +                request = remote_execution_pb2.GetCapabilitiesRequest()
    
    979
    +                response = self.capabilities.GetCapabilities(request)
    
    980
    +                server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
    
    981
    +                if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
    
    982
    +                    self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
    
    983
    +            except grpc.RpcError as e:
    
    984
    +                # Simply use the defaults for servers that don't implement GetCapabilities()
    
    985
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    986
    +                    raise
    
    987
    +
    
    988
    +            # Check whether the server supports BatchReadBlobs()
    
    989
    +            self.batch_read_supported = False
    
    990
    +            try:
    
    991
    +                request = remote_execution_pb2.BatchReadBlobsRequest()
    
    992
    +                response = self.cas.BatchReadBlobs(request)
    
    993
    +                self.batch_read_supported = True
    
    994
    +            except grpc.RpcError as e:
    
    995
    +                if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    996
    +                    raise
    
    997
    +
    
    875 998
                 self._initialized = True
    
    876 999
     
    
    877 1000
     
    
    1001
    +# Represents a batch of blobs queued for fetching.
    
    1002
    +#
    
    1003
    +class _CASBatchRead():
    
    1004
    +    def __init__(self, remote):
    
    1005
    +        self._remote = remote
    
    1006
    +        self._max_total_size_bytes = remote.max_batch_total_size_bytes
    
    1007
    +        self._request = remote_execution_pb2.BatchReadBlobsRequest()
    
    1008
    +        self._size = 0
    
    1009
    +        self._sent = False
    
    1010
    +
    
    1011
    +    def add(self, digest):
    
    1012
    +        assert not self._sent
    
    1013
    +
    
    1014
    +        new_batch_size = self._size + digest.size_bytes
    
    1015
    +        if new_batch_size > self._max_total_size_bytes:
    
    1016
    +            # Not enough space left in current batch
    
    1017
    +            return False
    
    1018
    +
    
    1019
    +        request_digest = self._request.digests.add()
    
    1020
    +        request_digest.hash = digest.hash
    
    1021
    +        request_digest.size_bytes = digest.size_bytes
    
    1022
    +        self._size = new_batch_size
    
    1023
    +        return True
    
    1024
    +
    
    1025
    +    def send(self):
    
    1026
    +        assert not self._sent
    
    1027
    +        self._sent = True
    
    1028
    +
    
    1029
    +        if len(self._request.digests) == 0:
    
    1030
    +            return
    
    1031
    +
    
    1032
    +        batch_response = self._remote.cas.BatchReadBlobs(self._request)
    
    1033
    +
    
    1034
    +        for response in batch_response.responses:
    
    1035
    +            if response.status.code != grpc.StatusCode.OK.value[0]:
    
    1036
    +                raise ArtifactError("Failed to download blob {}: {}".format(
    
    1037
    +                    response.digest.hash, response.status.code))
    
    1038
    +            if response.digest.size_bytes != len(response.data):
    
    1039
    +                raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
    
    1040
    +                    response.digest.hash, response.digest.size_bytes, len(response.data)))
    
    1041
    +
    
    1042
    +            yield (response.digest, response.data)
    
    1043
    +
    
    1044
    +
    
    878 1045
     def _grouper(iterable, n):
    
    879 1046
         while True:
    
    880 1047
             try:
    

  • buildstream/_artifactcache/casserver.py
    ... ... @@ -38,8 +38,9 @@ from .._context import Context
    38 38
     from .cascache import CASCache
    
    39 39
     
    
    40 40
     
    
    41
    -# The default limit for gRPC messages is 4 MiB
    
    42
    -_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
    
    41
    +# The default limit for gRPC messages is 4 MiB.
    
    42
    +# Limit payload to 1 MiB to leave sufficient headroom for metadata.
    
    43
    +_MAX_PAYLOAD_BYTES = 1024 * 1024
    
    43 44
     
    
    44 45
     
    
    45 46
     # Trying to push an artifact that is too large
    
    ... ... @@ -158,7 +159,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
    158 159
     
    
    159 160
                     remaining = client_digest.size_bytes - request.read_offset
    
    160 161
                     while remaining > 0:
    
    161
    -                    chunk_size = min(remaining, 64 * 1024)
    
    162
    +                    chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
    
    162 163
                         remaining -= chunk_size
    
    163 164
     
    
    164 165
                         response = bytestream_pb2.ReadResponse()
    
    ... ... @@ -242,7 +243,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
    242 243
     
    
    243 244
             for digest in request.digests:
    
    244 245
                 batch_size += digest.size_bytes
    
    245
    -            if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
    
    246
    +            if batch_size > _MAX_PAYLOAD_BYTES:
    
    246 247
                     context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    247 248
                     return response
    
    248 249
     
    
    ... ... @@ -269,7 +270,7 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
    269 270
             cache_capabilities = response.cache_capabilities
    
    270 271
             cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
    
    271 272
             cache_capabilities.action_cache_update_capabilities.update_enabled = False
    
    272
    -        cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
    
    273
    +        cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
    
    273 274
             cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
    
    274 275
     
    
    275 276
             response.deprecated_api_version.major = 2
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -65,7 +65,7 @@ class BuildQueue(Queue):
    65 65
             # If the estimated size outgrows the quota, ask the scheduler
    
    66 66
             # to queue a job to actually check the real cache size.
    
    67 67
             #
    
    68
    -        if artifacts.get_quota_exceeded():
    
    68
    +        if artifacts.has_quota_exceeded():
    
    69 69
                 self._scheduler.check_cache_size()
    
    70 70
     
    
    71 71
         def done(self, job, element, result, success):
    

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -349,7 +349,7 @@ class Scheduler():
    349 349
             platform = Platform.get_platform()
    
    350 350
             artifacts = platform.artifactcache
    
    351 351
     
    
    352
    -        if not artifacts.get_quota_exceeded():
    
    352
    +        if not artifacts.has_quota_exceeded():
    
    353 353
                 return
    
    354 354
     
    
    355 355
             job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]