[Notes] [Git][BuildStream/buildstream][raoul/802-refactor-artifactcache] 2 commits: artifactcache: implemented pull_tree similar to pull



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream

Commits:

4 changed files:

Changes:

  • buildstream/_artifactcache.py
    ... ... @@ -21,6 +21,7 @@ import multiprocessing
    21 21
     import os
    
    22 22
     import string
    
    23 23
     from collections.abc import Mapping
    
    24
    +import grpc
    
    24 25
     
    
    25 26
     from .types import _KeyStrength
    
    26 27
     from ._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
    
    ... ... @@ -607,16 +608,37 @@ class ArtifactCache():
    607 608
     
    
    608 609
             for remote in push_remotes:
    
    609 610
                 remote.init()
    
    611
    +            pushed_remote = False
    
    610 612
                 display_key = element._get_brief_display_key()
    
    611 613
                 element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
    
    612 614
     
    
    613
    -            if self.cas.push(refs, remote):
    
    614
    -                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    615
    +            try:
    
    616
    +                for ref in refs:
    
    617
    +                    # Check whether ref is already on the server in which case
    
    618
    +                    # there is no need to push the ref
    
    619
    +                    root_digest = self.cas.resolve_ref(ref)
    
    620
    +                    response = remote.get_reference(ref)
    
    621
    +                    if (response is not None and
    
    622
    +                            response.hash == root_digest.hash and
    
    623
    +                            response.size_bytes == root_digest.size_bytes):
    
    624
    +                        element.info("Remote ({}) already has {} cached".format(
    
    625
    +                            remote.spec.url, element._get_brief_display_key()))
    
    626
    +                        continue
    
    627
    +
    
    628
    +                    # upload blobs
    
    629
    +                    self._send_directory(root_digest, remote)
    
    630
    +
    
    631
    +                    remote.update_reference(ref, root_digest)
    
    632
    +                    pushed_remote = True
    
    633
    +
    
    634
    +            except grpc.RpcError as e:
    
    635
    +                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    636
    +                    raise CASError("Failed to push ref {}: {}"
    
    637
    +                                   .format(refs, e), temporary=True) from e
    
    638
    +
    
    639
    +            if pushed_remote is True:
    
    615 640
                     pushed = True
    
    616
    -            else:
    
    617
    -                element.info("Remote ({}) already has {} cached".format(
    
    618
    -                    remote.spec.url, element._get_brief_display_key()
    
    619
    -                ))
    
    641
    +                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
    
    620 642
     
    
    621 643
             return pushed
    
    622 644
     
    
    ... ... @@ -694,15 +716,26 @@ class ArtifactCache():
    694 716
         #
    
    695 717
         # Args:
    
    696 718
         #     project (Project): The current project
    
    697
    -    #     digest (Digest): The digest of the tree
    
    719
    +    #     tree_digest (Digest): The digest of the tree
    
    698 720
         #
    
    699
    -    def pull_tree(self, project, digest):
    
    721
    +    def pull_tree(self, project, tree_digest):
    
    700 722
             for remote in self._remotes[project]:
    
    701
    -            digest = self.cas.pull_tree(remote, digest)
    
    723
    +            try:
    
    724
    +                for blob_digest in remote.yield_tree_digests(tree_digest):
    
    725
    +                    if self.cas.check_blob(blob_digest):
    
    726
    +                        continue
    
    727
    +                    remote.request_blob(blob_digest)
    
    728
    +                    for blob_file in remote.get_blobs():
    
    729
    +                        self.cas.add_object(path=blob_file.name, link_directly=True)
    
    702 730
     
    
    703
    -            if digest:
    
    704
    -                # no need to pull from additional remotes
    
    705
    -                return digest
    
    731
    +                # Get the last batch
    
    732
    +                for blob_file in remote.get_blobs(request_batch=True):
    
    733
    +                    self.cas.add_object(path=blob_file.name, link_directly=True)
    
    734
    +
    
    735
    +            except BlobNotFound:
    
    736
    +                continue
    
    737
    +            else:
    
    738
    +                return tree_digest
    
    706 739
     
    
    707 740
             return None
    
    708 741
     
    
    ... ... @@ -731,7 +764,7 @@ class ArtifactCache():
    731 764
                 return
    
    732 765
     
    
    733 766
             for remote in push_remotes:
    
    734
    -            self.cas.push_directory(remote, directory)
    
    767
    +            self._send_directory(directory.ref, remote)
    
    735 768
     
    
    736 769
         # push_message():
    
    737 770
         #
    
    ... ... @@ -816,6 +849,14 @@ class ArtifactCache():
    816 849
             with self.context.timed_activity("Initializing remote caches", silent_nested=True):
    
    817 850
                 self.initialize_remotes(on_failure=remote_failed)
    
    818 851
     
    
    852
    +    def _send_directory(self, root_digest, remote):
    
    853
    +        required_blobs = self.cas.yield_directory_digests(root_digest)
    
    854
    +        missing_blobs = remote.find_missing_blobs(required_blobs)
    
    855
    +        for blob in missing_blobs.values():
    
    856
    +            blob_file = self.cas.objpath(blob)
    
    857
    +            remote.upload_blob(blob, blob_file, final=True)
    
    858
    +        remote.send_update_batch()
    
    859
    +
    
    819 860
         # _write_cache_size()
    
    820 861
         #
    
    821 862
         # Writes the given size of the artifact to the cache's size file
    

  • buildstream/_cas/cascache.py
    ... ... @@ -18,23 +18,16 @@
    18 18
     #        Jürg Billeter <juerg billeter codethink co uk>
    
    19 19
     
    
    20 20
     import hashlib
    
    21
    -import itertools
    
    22 21
     import os
    
    23 22
     import stat
    
    24 23
     import tempfile
    
    25
    -import uuid
    
    26 24
     import contextlib
    
    27 25
     
    
    28
    -import grpc
    
    29
    -
    
    30 26
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    31
    -from .._protos.buildstream.v2 import buildstream_pb2
    
    32 27
     
    
    33 28
     from .. import utils
    
    34 29
     from .._exceptions import CASError
    
    35 30
     
    
    36
    -from .casremote import _CASBatchUpdate
    
    37
    -
    
    38 31
     
    
    39 32
     # A CASCache manages a CAS repository as specified in the Remote Execution API.
    
    40 33
     #
    
    ... ... @@ -183,29 +176,6 @@ class CASCache():
    183 176
     
    
    184 177
             return modified, removed, added
    
    185 178
     
    
    186
    -    # pull_tree():
    
    187
    -    #
    
    188
    -    # Pull a single Tree rather than a ref.
    
    189
    -    # Does not update local refs.
    
    190
    -    #
    
    191
    -    # Args:
    
    192
    -    #     remote (CASRemote): The remote to pull from
    
    193
    -    #     digest (Digest): The digest of the tree
    
    194
    -    #
    
    195
    -    def pull_tree(self, remote, digest):
    
    196
    -        try:
    
    197
    -            remote.init()
    
    198
    -
    
    199
    -            digest = self._fetch_tree(remote, digest)
    
    200
    -
    
    201
    -            return digest
    
    202
    -
    
    203
    -        except grpc.RpcError as e:
    
    204
    -            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    205
    -                raise
    
    206
    -
    
    207
    -        return None
    
    208
    -
    
    209 179
         # link_ref():
    
    210 180
         #
    
    211 181
         # Add an alias for an existing ref.
    
    ... ... @@ -219,73 +189,6 @@ class CASCache():
    219 189
     
    
    220 190
             self.set_ref(newref, tree)
    
    221 191
     
    
    222
    -    # push():
    
    223
    -    #
    
    224
    -    # Push committed refs to remote repository.
    
    225
    -    #
    
    226
    -    # Args:
    
    227
    -    #     refs (list): The refs to push
    
    228
    -    #     remote (CASRemote): The remote to push to
    
    229
    -    #
    
    230
    -    # Returns:
    
    231
    -    #   (bool): True if any remote was updated, False if no pushes were required
    
    232
    -    #
    
    233
    -    # Raises:
    
    234
    -    #   (CASError): if there was an error
    
    235
    -    #
    
    236
    -    def push(self, refs, remote):
    
    237
    -        skipped_remote = True
    
    238
    -        try:
    
    239
    -            for ref in refs:
    
    240
    -                tree = self.resolve_ref(ref)
    
    241
    -
    
    242
    -                # Check whether ref is already on the server in which case
    
    243
    -                # there is no need to push the ref
    
    244
    -                try:
    
    245
    -                    request = buildstream_pb2.GetReferenceRequest()
    
    246
    -                    request.key = ref
    
    247
    -                    response = remote.ref_storage.GetReference(request)
    
    248
    -
    
    249
    -                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
    
    250
    -                        # ref is already on the server with the same tree
    
    251
    -                        continue
    
    252
    -
    
    253
    -                except grpc.RpcError as e:
    
    254
    -                    if e.code() != grpc.StatusCode.NOT_FOUND:
    
    255
    -                        # Intentionally re-raise RpcError for outer except block.
    
    256
    -                        raise
    
    257
    -
    
    258
    -                self._send_directory(remote, tree)
    
    259
    -
    
    260
    -                request = buildstream_pb2.UpdateReferenceRequest()
    
    261
    -                request.keys.append(ref)
    
    262
    -                request.digest.hash = tree.hash
    
    263
    -                request.digest.size_bytes = tree.size_bytes
    
    264
    -                remote.ref_storage.UpdateReference(request)
    
    265
    -
    
    266
    -                skipped_remote = False
    
    267
    -        except grpc.RpcError as e:
    
    268
    -            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
    
    269
    -                raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
    
    270
    -
    
    271
    -        return not skipped_remote
    
    272
    -
    
    273
    -    # push_directory():
    
    274
    -    #
    
    275
    -    # Push the given virtual directory to a remote.
    
    276
    -    #
    
    277
    -    # Args:
    
    278
    -    #     remote (CASRemote): The remote to push to
    
    279
    -    #     directory (Directory): A virtual directory object to push.
    
    280
    -    #
    
    281
    -    # Raises:
    
    282
    -    #     (CASError): if there was an error
    
    283
    -    #
    
    284
    -    def push_directory(self, remote, directory):
    
    285
    -        remote.init()
    
    286
    -
    
    287
    -        self._send_directory(remote, directory.ref)
    
    288
    -
    
    289 192
         # objpath():
    
    290 193
         #
    
    291 194
         # Return the path of an object based on its digest.
    
    ... ... @@ -557,6 +460,27 @@ class CASCache():
    557 460
             else:
    
    558 461
                 return None
    
    559 462
     
    
    463
    +    def yield_directory_digests(self, directory_digest):
    
    464
    +        # parse directory, and recursively add blobs
    
    465
    +        d = remote_execution_pb2.Digest()
    
    466
    +        d.hash = directory_digest.hash
    
    467
    +        d.size_bytes = directory_digest.size_bytes
    
    468
    +        yield d
    
    469
    +
    
    470
    +        directory = remote_execution_pb2.Directory()
    
    471
    +
    
    472
    +        with open(self.objpath(directory_digest), 'rb') as f:
    
    473
    +            directory.ParseFromString(f.read())
    
    474
    +
    
    475
    +        for filenode in directory.files:
    
    476
    +            d = remote_execution_pb2.Digest()
    
    477
    +            d.hash = filenode.digest.hash
    
    478
    +            d.size_bytes = filenode.digest.size_bytes
    
    479
    +            yield d
    
    480
    +
    
    481
    +        for dirnode in directory.directories:
    
    482
    +            yield from self.yield_directory_digests(dirnode.digest)
    
    483
    +
    
    560 484
         ################################################
    
    561 485
         #             Local Private Methods            #
    
    562 486
         ################################################
    
    ... ... @@ -745,105 +669,3 @@ class CASCache():
    745 669
     
    
    746 670
             for dirnode in directory.directories:
    
    747 671
                 yield from self._required_blobs(dirnode.digest)
    748
    -
    
    749
    -    # _ensure_blob():
    
    750
    -    #
    
    751
    -    # Fetch and add blob if it's not already local.
    
    752
    -    #
    
    753
    -    # Args:
    
    754
    -    #     remote (Remote): The remote to use.
    
    755
    -    #     digest (Digest): Digest object for the blob to fetch.
    
    756
    -    #
    
    757
    -    # Returns:
    
    758
    -    #     (str): The path of the object
    
    759
    -    #
    
    760
    -    def _ensure_blob(self, remote, digest):
    
    761
    -        objpath = self.objpath(digest)
    
    762
    -        if os.path.exists(objpath):
    
    763
    -            # already in local repository
    
    764
    -            return objpath
    
    765
    -
    
    766
    -        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    767
    -            remote._fetch_blob(digest, f)
    
    768
    -
    
    769
    -            added_digest = self.add_object(path=f.name, link_directly=True)
    
    770
    -            assert added_digest.hash == digest.hash
    
    771
    -
    
    772
    -        return objpath
    
    773
    -
    
    774
    -    def _fetch_tree(self, remote, digest):
    
    775
    -        # download but do not store the Tree object
    
    776
    -        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    777
    -            remote._fetch_blob(digest, out)
    
    778
    -
    
    779
    -            tree = remote_execution_pb2.Tree()
    
    780
    -
    
    781
    -            with open(out.name, 'rb') as f:
    
    782
    -                tree.ParseFromString(f.read())
    
    783
    -
    
    784
    -            tree.children.extend([tree.root])
    
    785
    -            for directory in tree.children:
    
    786
    -                for filenode in directory.files:
    
    787
    -                    self._ensure_blob(remote, filenode.digest)
    
    788
    -
    
    789
    -                # place directory blob only in final location when we've downloaded
    
    790
    -                # all referenced blobs to avoid dangling references in the repository
    
    791
    -                dirbuffer = directory.SerializeToString()
    
    792
    -                dirdigest = self.add_object(buffer=dirbuffer)
    
    793
    -                assert dirdigest.size_bytes == len(dirbuffer)
    
    794
    -
    
    795
    -        return dirdigest
    
    796
    -
    
    797
    -    def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
    
    798
    -        required_blobs = self._required_blobs(digest)
    
    799
    -
    
    800
    -        missing_blobs = dict()
    
    801
    -        # Limit size of FindMissingBlobs request
    
    802
    -        for required_blobs_group in _grouper(required_blobs, 512):
    
    803
    -            request = remote_execution_pb2.FindMissingBlobsRequest()
    
    804
    -
    
    805
    -            for required_digest in required_blobs_group:
    
    806
    -                d = request.blob_digests.add()
    
    807
    -                d.hash = required_digest.hash
    
    808
    -                d.size_bytes = required_digest.size_bytes
    
    809
    -
    
    810
    -            response = remote.cas.FindMissingBlobs(request)
    
    811
    -            for missing_digest in response.missing_blob_digests:
    
    812
    -                d = remote_execution_pb2.Digest()
    
    813
    -                d.hash = missing_digest.hash
    
    814
    -                d.size_bytes = missing_digest.size_bytes
    
    815
    -                missing_blobs[d.hash] = d
    
    816
    -
    
    817
    -        # Upload any blobs missing on the server
    
    818
    -        self._send_blobs(remote, missing_blobs.values(), u_uid)
    
    819
    -
    
    820
    -    def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
    
    821
    -        batch = _CASBatchUpdate(remote)
    
    822
    -
    
    823
    -        for digest in digests:
    
    824
    -            with open(self.objpath(digest), 'rb') as f:
    
    825
    -                assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    826
    -
    
    827
    -                if (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    828
    -                        not remote.batch_update_supported):
    
    829
    -                    # Too large for batch request, upload in independent request.
    
    830
    -                    remote._send_blob(digest, f, u_uid=u_uid)
    
    831
    -                else:
    
    832
    -                    if not batch.add(digest, f):
    
    833
    -                        # Not enough space left in batch request.
    
    834
    -                        # Complete pending batch first.
    
    835
    -                        batch.send()
    
    836
    -                        batch = _CASBatchUpdate(remote)
    
    837
    -                        batch.add(digest, f)
    
    838
    -
    
    839
    -        # Send final batch
    
    840
    -        batch.send()
    
    841
    -
    
    842
    -
    
    843
    -def _grouper(iterable, n):
    
    844
    -    while True:
    
    845
    -        try:
    
    846
    -            current = next(iterable)
    
    847
    -        except StopIteration:
    
    848
    -            return
    
    849
    -        yield itertools.chain([current], itertools.islice(iterable, n - 1))

  • buildstream/_cas/casremote.py
    1 1
     from collections import namedtuple
    
    2 2
     import io
    
    3
    +import itertools
    
    3 4
     import os
    
    4 5
     import multiprocessing
    
    5 6
     import signal
    
    ... ... @@ -283,19 +284,41 @@ class CASRemote():
    283 284
                 else:
    
    284 285
                     return None
    
    285 286
     
    
    287
    +    # update_reference():
    
    288
    +    #
    
    289
    +    # Args:
    
    290
    +    #    ref (str): Reference to update
    
    291
    +    #    digest (Digest): New digest to update ref with
    
    292
    +    def update_reference(self, ref, digest):
    
    293
    +        request = buildstream_pb2.UpdateReferenceRequest()
    
    294
    +        request.keys.append(ref)
    
    295
    +        request.digest.hash = digest.hash
    
    296
    +        request.digest.size_bytes = digest.size_bytes
    
    297
    +        self.ref_storage.UpdateReference(request)
    
    298
    +
    
    299
    +    def get_tree_blob(self, tree_digest):
    
    300
    +        self.init()
    
    301
    +        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    302
    +        self._fetch_blob(tree_digest, f)
    
    303
    +
    
    304
    +        tree = remote_execution_pb2.Tree()
    
    305
    +        with open(f.name, 'rb') as tmp:
    
    306
    +            tree.ParseFromString(tmp.read())
    
    307
    +
    
    308
    +        return tree
    
    309
    +
    
    286 310
         # yield_directory_digests():
    
    287 311
         #
    
    288 312
         # Iterate over blobs digests starting from a root digest
    
    289 313
         #
    
    290 314
         # Args:
    
    291
    -    #     root_digest (str): The root_digest to get a tree of
    
    315
    +    #     root_digest (digest): The root_digest to get a tree of
    
    292 316
         #     progress (callable): The progress callback, if any
    
    293 317
         #     subdir (str): The optional specific subdir to pull
    
    294 318
         #     excluded_subdirs (list): The optional list of subdirs to not pull
    
    295 319
         #
    
    296 320
         # Returns:
    
    297
    -    #   (iter): True if pull was successful, False if ref was not available
    
    298
    -    #
    
    321
    +    #     (iter digests): recursively iterates over digests contained in root directory
    
    299 322
         def yield_directory_digests(self, root_digest, *, progress=None, subdir=None, excluded_subdirs=[]):
    
    300 323
             self.init()
    
    301 324
     
    
    ... ... @@ -303,6 +326,37 @@ class CASRemote():
    303 326
             # Fetch artifact, excluded_subdirs determined in pullqueue
    
    304 327
             yield from self._yield_directory_digests(root_digest, excluded_subdirs=excluded_subdirs)
    
    305 328
     
    
    329
    +    # yield_tree_digests():
    
    330
    +    #
    
    331
    +    # Fetches a tree file from digests and then iterates over child digests
    
    332
    +    #
    
    333
    +    # Args:
    
    334
    +    #     tree_digest (digest): tree digest
    
    335
    +    #
    
    336
    +    # Returns:
    
    337
    +    #     (iter digests): iterates over digests in tree message
    
    338
    +    def yield_tree_digests(self, tree_digest):
    
    339
    +        self.init()
    
    340
    +
    
    341
    +        # get tree file
    
    342
    +        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    343
    +        self._fetch_blob(tree_digest, f)
    
    344
    +        tree = remote_execution_pb2.Tree()
    
    345
    +        tree.ParseFromString(f.read())
    
    346
    +
    
    347
    +        tree.children.extend([tree.root])
    
    348
    +        for directory in tree.children:
    
    349
    +            for filenode in directory.files:
    
    350
    +                yield filenode.digest
    
    351
    +
    
    352
    +            # add the directory to downloaded tmp files to be added
    
    353
    +            f2 = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    354
    +            f2.write(directory.SerializeToString())
    
    355
    +            self.__tmp_downloads.append(f2)
    
    356
    +
    
    357
    +        # Add the tree directory to downloads right at the end
    
    358
    +        self.__tmp_downloads.append(f)
    
    359
    +
    
    306 360
         # request_blob():
    
    307 361
         #
    
    308 362
         # Request blob, triggering download depending via bytestream or cas
    
    ... ... @@ -337,6 +391,60 @@ class CASRemote():
    337 391
             while self.__tmp_downloads:
    
    338 392
                 yield self.__tmp_downloads.pop()
    
    339 393
     
    
    394
    +    # upload_blob():
    
    395
    +    #
    
    396
    +    # Push blobs given an iterator over blob files
    
    397
    +    #
    
    398
    +    def upload_blob(self, digest, blob_file, u_uid=uuid.uuid4(), final=False):
    
    399
    +        with open(blob_file, 'rb') as f:
    
    400
    +            assert os.fstat(f.fileno()).st_size == digest.size_bytes
    
    401
    +
    
    402
    +            if (digest.size_bytes >= self.max_batch_total_size_bytes or
    
    403
    +                    not self.batch_update_supported):
    
    404
    +                # Too large for batch request, upload in independent request.
    
    405
    +                self._send_blob(digest, f, u_uid=u_uid)
    
    406
    +            else:
    
    407
    +                if self.__batch_update.add(digest, f) is False:
    
    408
    +                    self.__batch_update.send()
    
    409
    +                    self.__batch_update = _CASBatchUpdate(self)
    
    410
    +                    self.__batch_update.add(digest, f)
    
    411
    +
    
    412
    +    def send_update_batch(self):
    
    413
    +        # make sure everything is sent
    
    414
    +        self.__batch_update.send()
    
    415
    +        self.__batch_update = _CASBatchUpdate(self)
    
    416
    +
    
    417
    +    # find_missing_blobs()
    
    418
    +    #
    
    419
    +    # Does FindMissingBlobs request to remote
    
    420
    +    #
    
    421
    +    # Args:
    
    422
    +    #    required_blobs ([Digest]): list of blobs required
    
    423
    +    #    u_uid (str): uuid4
    
    424
    +    #
    
    425
    +    # Returns:
    
    426
    +    #    (Dict(Digest)): missing blobs
    
    427
    +    def find_missing_blobs(self, required_blobs, u_uid=uuid.uuid4()):
    
    428
    +        self.init()
    
    429
    +        missing_blobs = dict()
    
    430
    +        # Limit size of FindMissingBlobs request
    
    431
    +        for required_blobs_group in _grouper(required_blobs, 512):
    
    432
    +            request = remote_execution_pb2.FindMissingBlobsRequest()
    
    433
    +
    
    434
    +            for required_digest in required_blobs_group:
    
    435
    +                d = request.blob_digests.add()
    
    436
    +                d.hash = required_digest.hash
    
    437
    +                d.size_bytes = required_digest.size_bytes
    
    438
    +
    
    439
    +            response = self.cas.FindMissingBlobs(request)
    
    440
    +            for missing_digest in response.missing_blob_digests:
    
    441
    +                d = remote_execution_pb2.Digest()
    
    442
    +                d.hash = missing_digest.hash
    
    443
    +                d.size_bytes = missing_digest.size_bytes
    
    444
    +                missing_blobs[d.hash] = d
    
    445
    +
    
    446
    +        return missing_blobs
    
    447
    +
    
    340 448
         ################################################
    
    341 449
         #             Local Private Methods            #
    
    342 450
         ################################################
    
    ... ... @@ -418,6 +526,15 @@ class CASRemote():
    418 526
             self.__batch_read = _CASBatchRead(self)
    
    419 527
     
    
    420 528
     
    
    529
    +def _grouper(iterable, n):
    
    530
    +    while True:
    
    531
    +        try:
    
    532
    +            current = next(iterable)
    
    533
    +        except StopIteration:
    
    534
    +            return
    
    535
    +        yield itertools.chain([current], itertools.islice(iterable, n - 1))
    
    536
    +
    
    537
    +
    
    421 538
     # Represents a batch of blobs queued for fetching.
    
    422 539
     #
    
    423 540
     class _CASBatchRead():
    

  • tests/artifactcache/pull.py
    ... ... @@ -110,7 +110,7 @@ def test_pull(cli, tmpdir, datafiles):
    110 110
             # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    111 111
             process = multiprocessing.Process(target=_queue_wrapper,
    
    112 112
                                               args=(_test_pull, queue, user_config_file, project_dir,
    
    113
    -                                                artifact_dir, 'target.bst', element_key))
    
    113
    +                                                artifact_dir, tmpdir, 'target.bst', element_key))
    
    114 114
     
    
    115 115
             try:
    
    116 116
                 # Keep SIGINT blocked in the child process
    
    ... ... @@ -127,13 +127,14 @@ def test_pull(cli, tmpdir, datafiles):
    127 127
             assert cas.contains(element, element_key)
    
    128 128
     
    
    129 129
     
    
    130
    -def _test_pull(user_config_file, project_dir, artifact_dir,
    
    130
    +def _test_pull(user_config_file, project_dir, artifact_dir, tmpdir,
    
    131 131
                    element_name, element_key, queue):
    
    132 132
         # Fake minimal context
    
    133 133
         context = Context()
    
    134 134
         context.load(config=user_config_file)
    
    135 135
         context.artifactdir = artifact_dir
    
    136 136
         context.set_message_handler(message_handler)
    
    137
    +    context.tmpdir = tmpdir
    
    137 138
     
    
    138 139
         # Load the project manually
    
    139 140
         project = Project(project_dir, context)
    
    ... ... @@ -218,7 +219,7 @@ def test_pull_tree(cli, tmpdir, datafiles):
    218 219
             # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    219 220
             process = multiprocessing.Process(target=_queue_wrapper,
    
    220 221
                                               args=(_test_push_tree, queue, user_config_file, project_dir,
    
    221
    -                                                artifact_dir, artifact_digest))
    
    222
    +                                                artifact_dir, tmpdir, artifact_digest))
    
    222 223
     
    
    223 224
             try:
    
    224 225
                 # Keep SIGINT blocked in the child process
    
    ... ... @@ -246,7 +247,7 @@ def test_pull_tree(cli, tmpdir, datafiles):
    246 247
             # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    247 248
             process = multiprocessing.Process(target=_queue_wrapper,
    
    248 249
                                               args=(_test_pull_tree, queue, user_config_file, project_dir,
    
    249
    -                                                artifact_dir, tree_digest))
    
    250
    +                                                artifact_dir, tmpdir, tree_digest))
    
    250 251
     
    
    251 252
             try:
    
    252 253
                 # Keep SIGINT blocked in the child process
    
    ... ... @@ -268,12 +269,14 @@ def test_pull_tree(cli, tmpdir, datafiles):
    268 269
             assert os.path.exists(cas.objpath(directory_digest))
    
    269 270
     
    
    270 271
     
    
    271
    -def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
    
    272
    +def _test_push_tree(user_config_file, project_dir, artifact_dir, tmpdir,
    
    273
    +                    artifact_digest, queue):
    
    272 274
         # Fake minimal context
    
    273 275
         context = Context()
    
    274 276
         context.load(config=user_config_file)
    
    275 277
         context.artifactdir = artifact_dir
    
    276 278
         context.set_message_handler(message_handler)
    
    279
    +    context.tmpdir
    
    277 280
     
    
    278 281
         # Load the project manually
    
    279 282
         project = Project(project_dir, context)
    
    ... ... @@ -304,12 +307,14 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest
    304 307
             queue.put("No remote configured")
    
    305 308
     
    
    306 309
     
    
    307
    -def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
    
    310
    +def _test_pull_tree(user_config_file, project_dir, artifact_dir, tmpdir,
    
    311
    +                    artifact_digest, queue):
    
    308 312
         # Fake minimal context
    
    309 313
         context = Context()
    
    310 314
         context.load(config=user_config_file)
    
    311 315
         context.artifactdir = artifact_dir
    
    312 316
         context.set_message_handler(message_handler)
    
    317
    +    context.tmpdir = tmpdir
    
    313 318
     
    
    314 319
         # Load the project manually
    
    315 320
         project = Project(project_dir, context)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]