[Notes] [Git][BuildStream/buildstream][raoul/802-refactor-artifactcache] 4 commits: Add tmpdir which is passed to CASRemote



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream

Commits:

8 changed files:

Changes:

  • buildstream/_artifactcache.py
    ... ... @@ -28,7 +28,7 @@ from ._message import Message, MessageType
    28 28
     from . import utils
    
    29 29
     from . import _yaml
    
    30 30
     
    
    31
    -from ._cas.casremote import CASRemote, CASRemoteSpec
    
    31
    +from ._cas.casremote import BlobNotFound, CASRemote, CASRemoteSpec
    
    32 32
     
    
    33 33
     
    
    34 34
     CACHE_SIZE_FILE = "cache_size"
    
    ... ... @@ -374,7 +374,7 @@ class ArtifactCache():
    374 374
             q = multiprocessing.Queue()
    
    375 375
             for remote_spec in remote_specs:
    
    376 376
     
    
    377
    -            error = CASRemote.check_remote(remote_spec, q)
    
    377
    +            error = CASRemote.check_remote(remote_spec, self.context.tmpdir, q)
    
    378 378
     
    
    379 379
                 if error and on_failure:
    
    380 380
                     on_failure(remote_spec.url, error)
    
    ... ... @@ -385,7 +385,7 @@ class ArtifactCache():
    385 385
                     if remote_spec.push:
    
    386 386
                         self._has_push_remotes = True
    
    387 387
     
    
    388
    -                remotes[remote_spec.url] = CASRemote(remote_spec)
    
    388
    +                remotes[remote_spec.url] = CASRemote(remote_spec, self.context.tmpdir)
    
    389 389
     
    
    390 390
             for project in self.context.get_projects():
    
    391 391
                 remote_specs = self.global_remote_specs
    
    ... ... @@ -634,7 +634,7 @@ class ArtifactCache():
    634 634
         # Returns:
    
    635 635
         #   (bool): True if pull was successful, False if artifact was not available
    
    636 636
         #
    
    637
    -    def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None):
    
    637
    +    def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=[]):
    
    638 638
             ref = self.get_artifact_fullname(element, key)
    
    639 639
     
    
    640 640
             project = element._get_project()
    
    ... ... @@ -644,13 +644,36 @@ class ArtifactCache():
    644 644
                     display_key = element._get_brief_display_key()
    
    645 645
                     element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
    
    646 646
     
    
    647
    -                if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
    
    648
    -                    element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    647
    +                root_digest = remote.get_reference(ref)
    
    648
    +
    
    649
    +                if root_digest:
    
    650
    +                    try:
    
    651
    +                        for blob_digest in remote.yield_directory_digests(
    
    652
    +                                root_digest, progress=progress, subdir=subdir,
    
    653
    +                                excluded_subdirs=excluded_subdirs):
    
    654
    +                            if self.cas.check_blob(blob_digest):
    
    655
    +                                continue
    
    656
    +                            remote.request_blob(blob_digest)
    
    657
    +                            for blob_file in remote.get_blobs():
    
    658
    +                                self.cas.add_object(path=blob_file.name, link_directly=True)
    
    659
    +
    
    660
    +                        # request the final CAS batch
    
    661
    +                        for blob_file in remote.get_blobs(request_batch=True):
    
    662
    +                            self.cas.add_object(path=blob_file.name, link_directly=True)
    
    663
    +
    
    664
    +                        self.cas.set_ref(ref, root_digest)
    
    665
    +                    except BlobNotFound:
    
    666
    +                        element.info("Remote ({}) is missing blobs for {}".format(
    
    667
    +                            remote.spec.url, element._get_brief_display_key()))
    
    668
    +                        continue
    
    669
    +
    
    649 670
                         if subdir:
    
    650 671
                             # Attempt to extract subdir into artifact extract dir if it already exists
    
    651 672
                             # without containing the subdir. If the respective artifact extract dir does not
    
    652 673
                             # exist a complete extraction will complete.
    
    653 674
                             self.extract(element, key, subdir)
    
    675
    +
    
    676
    +                    element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
    
    654 677
                         # no need to pull from additional remotes
    
    655 678
                         return True
    
    656 679
                     else:
    
    ... ... @@ -671,15 +694,26 @@ class ArtifactCache():
    671 694
         #
    
    672 695
         # Args:
    
    673 696
         #     project (Project): The current project
    
    674
    -    #     digest (Digest): The digest of the tree
    
    697
    +    #     tree_digest (Digest): The digest of the tree
    
    675 698
         #
    
    676
    -    def pull_tree(self, project, digest):
    
    699
    +    def pull_tree(self, project, tree_digest):
    
    677 700
             for remote in self._remotes[project]:
    
    678
    -            digest = self.cas.pull_tree(remote, digest)
    
    679
    -
    
    680
    -            if digest:
    
    681
    -                # no need to pull from additional remotes
    
    682
    -                return digest
    
    701
    +            try:
    
    702
    +                for blob_digest in remote.yield_tree_digests(tree_digest):
    
    703
    +                    if self.cas.check_blob(blob_digest):
    
    704
    +                        continue
    
    705
    +                    remote.request_blob(blob_digest)
    
    706
    +                    for blob_file in remote.get_blobs():
    
    707
    +                        self.cas.add_object(path=blob_file.name, link_directly=True)
    
    708
    +
    
    709
    +                # Get the last batch
    
    710
    +                for blob_file in remote.get_blobs(request_batch=True):
    
    711
    +                    self.cas.add_object(path=blob_file.name, link_directly=True)
    
    712
    +
    
    713
    +            except BlobNotFound:
    
    714
    +                continue
    
    715
    +            else:
    
    716
    +                return tree_digest
    
    683 717
     
    
    684 718
             return None
    
    685 719
     
    

  • buildstream/_cas/cascache.py
    ... ... @@ -33,7 +33,7 @@ from .._protos.buildstream.v2 import buildstream_pb2
    33 33
     from .. import utils
    
    34 34
     from .._exceptions import CASError
    
    35 35
     
    
    36
    -from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate
    
    36
    +from .casremote import _CASBatchUpdate
    
    37 37
     
    
    38 38
     
    
    39 39
     # A CASCache manages a CAS repository as specified in the Remote Execution API.
    
    ... ... @@ -183,73 +183,6 @@ class CASCache():
    183 183
     
    
    184 184
             return modified, removed, added
    
    185 185
     
    
    186
    -    # pull():
    
    187
    -    #
    
    188
    -    # Pull a ref from a remote repository.
    
    189
    -    #
    
    190
    -    # Args:
    
    191
    -    #     ref (str): The ref to pull
    
    192
    -    #     remote (CASRemote): The remote repository to pull from
    
    193
    -    #     progress (callable): The progress callback, if any
    
    194
    -    #     subdir (str): The optional specific subdir to pull
    
    195
    -    #     excluded_subdirs (list): The optional list of subdirs to not pull
    
    196
    -    #
    
    197
    -    # Returns:
    
    198
    -    #   (bool): True if pull was successful, False if ref was not available
    
    199
    -    #
    
    200
    -    def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None):
    
    201
    -        try:
    
    202
    -            remote.init()
    
    203
    -
    
    204
    -            request = buildstream_pb2.GetReferenceRequest()
    
    205
    -            request.key = ref
    
    206
    -            response = remote.ref_storage.GetReference(request)
    
    207
    -
    
    208
    -            tree = remote_execution_pb2.Digest()
    
    209
    -            tree.hash = response.digest.hash
    
    210
    -            tree.size_bytes = response.digest.size_bytes
    
    211
    -
    
    212
    -            # Check if the element artifact is present, if so just fetch the subdir.
    
    213
    -            if subdir and os.path.exists(self.objpath(tree)):
    
    214
    -                self._fetch_subdir(remote, tree, subdir)
    
    215
    -            else:
    
    216
    -                # Fetch artifact, excluded_subdirs determined in pullqueue
    
    217
    -                self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
    
    218
    -
    
    219
    -            self.set_ref(ref, tree)
    
    220
    -
    
    221
    -            return True
    
    222
    -        except grpc.RpcError as e:
    
    223
    -            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    224
    -                raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
    
    225
    -            else:
    
    226
    -                return False
    
    227
    -        except BlobNotFound as e:
    
    228
    -            return False
    
    229
    -
    
    230
    -    # pull_tree():
    
    231
    -    #
    
    232
    -    # Pull a single Tree rather than a ref.
    
    233
    -    # Does not update local refs.
    
    234
    -    #
    
    235
    -    # Args:
    
    236
    -    #     remote (CASRemote): The remote to pull from
    
    237
    -    #     digest (Digest): The digest of the tree
    
    238
    -    #
    
    239
    -    def pull_tree(self, remote, digest):
    
    240
    -        try:
    
    241
    -            remote.init()
    
    242
    -
    
    243
    -            digest = self._fetch_tree(remote, digest)
    
    244
    -
    
    245
    -            return digest
    
    246
    -
    
    247
    -        except grpc.RpcError as e:
    
    248
    -            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    249
    -                raise
    
    250
    -
    
    251
    -        return None
    
    252
    -
    
    253 186
         # link_ref():
    
    254 187
         #
    
    255 188
         # Add an alias for an existing ref.
    
    ... ... @@ -591,6 +524,16 @@ class CASCache():
    591 524
             reachable = set()
    
    592 525
             self._reachable_refs_dir(reachable, tree, update_mtime=True)
    
    593 526
     
    
    527
    +    # Check to see if a blob is in the local CAS
    
    528
    +    # return None if not
    
    529
    +    def check_blob(self, digest):
    
    530
    +        objpath = self.objpath(digest)
    
    531
    +        if os.path.exists(objpath):
    
    532
    +            # already in local repository
    
    533
    +            return objpath
    
    534
    +        else:
    
    535
    +            return None
    
    536
    +
    
    594 537
         ################################################
    
    595 538
         #             Local Private Methods            #
    
    596 539
         ################################################
    
    ... ... @@ -805,126 +748,6 @@ class CASCache():
    805 748
     
    
    806 749
             return objpath
    
    807 750
     
    
    808
    -    def _batch_download_complete(self, batch):
    
    809
    -        for digest, data in batch.send():
    
    810
    -            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
    
    811
    -                f.write(data)
    
    812
    -                f.flush()
    
    813
    -
    
    814
    -                added_digest = self.add_object(path=f.name, link_directly=True)
    
    815
    -                assert added_digest.hash == digest.hash
    
    816
    -
    
    817
    -    # Helper function for _fetch_directory().
    
    818
    -    def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
    
    819
    -        self._batch_download_complete(batch)
    
    820
    -
    
    821
    -        # All previously scheduled directories are now locally available,
    
    822
    -        # move them to the processing queue.
    
    823
    -        fetch_queue.extend(fetch_next_queue)
    
    824
    -        fetch_next_queue.clear()
    
    825
    -        return _CASBatchRead(remote)
    
    826
    -
    
    827
    -    # Helper function for _fetch_directory().
    
    828
    -    def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
    
    829
    -        in_local_cache = os.path.exists(self.objpath(digest))
    
    830
    -
    
    831
    -        if in_local_cache:
    
    832
    -            # Skip download, already in local cache.
    
    833
    -            pass
    
    834
    -        elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
    
    835
    -              not remote.batch_read_supported):
    
    836
    -            # Too large for batch request, download in independent request.
    
    837
    -            self._ensure_blob(remote, digest)
    
    838
    -            in_local_cache = True
    
    839
    -        else:
    
    840
    -            if not batch.add(digest):
    
    841
    -                # Not enough space left in batch request.
    
    842
    -                # Complete pending batch first.
    
    843
    -                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    844
    -                batch.add(digest)
    
    845
    -
    
    846
    -        if recursive:
    
    847
    -            if in_local_cache:
    
    848
    -                # Add directory to processing queue.
    
    849
    -                fetch_queue.append(digest)
    
    850
    -            else:
    
    851
    -                # Directory will be available after completing pending batch.
    
    852
    -                # Add directory to deferred processing queue.
    
    853
    -                fetch_next_queue.append(digest)
    
    854
    -
    
    855
    -        return batch
    
    856
    -
    
    857
    -    # _fetch_directory():
    
    858
    -    #
    
    859
    -    # Fetches remote directory and adds it to content addressable store.
    
    860
    -    #
    
    861
    -    # Fetches files, symbolic links and recursively other directories in
    
    862
    -    # the remote directory and adds them to the content addressable
    
    863
    -    # store.
    
    864
    -    #
    
    865
    -    # Args:
    
    866
    -    #     remote (Remote): The remote to use.
    
    867
    -    #     dir_digest (Digest): Digest object for the directory to fetch.
    
    868
    -    #     excluded_subdirs (list): The optional list of subdirs to not fetch
    
    869
    -    #
    
    870
    -    def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None):
    
    871
    -        fetch_queue = [dir_digest]
    
    872
    -        fetch_next_queue = []
    
    873
    -        batch = _CASBatchRead(remote)
    
    874
    -        if not excluded_subdirs:
    
    875
    -            excluded_subdirs = []
    
    876
    -
    
    877
    -        while len(fetch_queue) + len(fetch_next_queue) > 0:
    
    878
    -            if not fetch_queue:
    
    879
    -                batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    880
    -
    
    881
    -            dir_digest = fetch_queue.pop(0)
    
    882
    -
    
    883
    -            objpath = self._ensure_blob(remote, dir_digest)
    
    884
    -
    
    885
    -            directory = remote_execution_pb2.Directory()
    
    886
    -            with open(objpath, 'rb') as f:
    
    887
    -                directory.ParseFromString(f.read())
    
    888
    -
    
    889
    -            for dirnode in directory.directories:
    
    890
    -                if dirnode.name not in excluded_subdirs:
    
    891
    -                    batch = self._fetch_directory_node(remote, dirnode.digest, batch,
    
    892
    -                                                       fetch_queue, fetch_next_queue, recursive=True)
    
    893
    -
    
    894
    -            for filenode in directory.files:
    
    895
    -                batch = self._fetch_directory_node(remote, filenode.digest, batch,
    
    896
    -                                                   fetch_queue, fetch_next_queue)
    
    897
    -
    
    898
    -        # Fetch final batch
    
    899
    -        self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
    
    900
    -
    
    901
    -    def _fetch_subdir(self, remote, tree, subdir):
    
    902
    -        subdirdigest = self._get_subdir(tree, subdir)
    
    903
    -        self._fetch_directory(remote, subdirdigest)
    
    904
    -
    
    905
    -    def _fetch_tree(self, remote, digest):
    
    906
    -        # download but do not store the Tree object
    
    907
    -        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    908
    -            remote._fetch_blob(digest, out)
    
    909
    -
    
    910
    -            tree = remote_execution_pb2.Tree()
    
    911
    -
    
    912
    -            with open(out.name, 'rb') as f:
    
    913
    -                tree.ParseFromString(f.read())
    
    914
    -
    
    915
    -            tree.children.extend([tree.root])
    
    916
    -            for directory in tree.children:
    
    917
    -                for filenode in directory.files:
    
    918
    -                    self._ensure_blob(remote, filenode.digest)
    
    919
    -
    
    920
    -                # place directory blob only in final location when we've downloaded
    
    921
    -                # all referenced blobs to avoid dangling references in the repository
    
    922
    -                dirbuffer = directory.SerializeToString()
    
    923
    -                dirdigest = self.add_object(buffer=dirbuffer)
    
    924
    -                assert dirdigest.size_bytes == len(dirbuffer)
    
    925
    -
    
    926
    -        return dirdigest
    
    927
    -
    
    928 751
         def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
    
    929 752
             required_blobs = self._required_blobs(digest)
    
    930 753
     
    

  • buildstream/_cas/casremote.py
    ... ... @@ -3,6 +3,7 @@ import io
    3 3
     import os
    
    4 4
     import multiprocessing
    
    5 5
     import signal
    
    6
    +import tempfile
    
    6 7
     from urllib.parse import urlparse
    
    7 8
     import uuid
    
    8 9
     
    
    ... ... @@ -77,7 +78,7 @@ class BlobNotFound(CASError):
    77 78
     # Represents a single remote CAS cache.
    
    78 79
     #
    
    79 80
     class CASRemote():
    
    80
    -    def __init__(self, spec):
    
    81
    +    def __init__(self, spec, tmpdir):
    
    81 82
             self.spec = spec
    
    82 83
             self._initialized = False
    
    83 84
             self.channel = None
    
    ... ... @@ -89,6 +90,16 @@ class CASRemote():
    89 90
             self.capabilities = None
    
    90 91
             self.max_batch_total_size_bytes = None
    
    91 92
     
    
    93
    +        # Need str because python 3.5 and lower doesn't deal with path like
    
    94
    +        # objects here.
    
    95
    +        self.tmpdir = str(tmpdir)
    
    96
    +        os.makedirs(self.tmpdir, exist_ok=True)
    
    97
    +
    
    98
    +        self.__tmp_downloads = []  # files in the tmpdir waiting to be added to local caches
    
    99
    +
    
    100
    +        self.__batch_read = None
    
    101
    +        self.__batch_update = None
    
    102
    +
    
    92 103
         def init(self):
    
    93 104
             if not self._initialized:
    
    94 105
                 url = urlparse(self.spec.url)
    
    ... ... @@ -146,6 +157,7 @@ class CASRemote():
    146 157
                     request = remote_execution_pb2.BatchReadBlobsRequest()
    
    147 158
                     response = self.cas.BatchReadBlobs(request)
    
    148 159
                     self.batch_read_supported = True
    
    160
    +                self.__batch_read = _CASBatchRead(self)
    
    149 161
                 except grpc.RpcError as e:
    
    150 162
                     if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    151 163
                         raise
    
    ... ... @@ -156,6 +168,7 @@ class CASRemote():
    156 168
                     request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    157 169
                     response = self.cas.BatchUpdateBlobs(request)
    
    158 170
                     self.batch_update_supported = True
    
    171
    +                self.__batch_update = _CASBatchUpdate(self)
    
    159 172
                 except grpc.RpcError as e:
    
    160 173
                     if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    161 174
                             e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    ... ... @@ -170,11 +183,11 @@ class CASRemote():
    170 183
         # in the main BuildStream process
    
    171 184
         # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    172 185
         @classmethod
    
    173
    -    def check_remote(cls, remote_spec, q):
    
    186
    +    def check_remote(cls, remote_spec, tmpdir, q):
    
    174 187
     
    
    175 188
             def __check_remote():
    
    176 189
                 try:
    
    177
    -                remote = cls(remote_spec)
    
    190
    +                remote = cls(remote_spec, tmpdir)
    
    178 191
                     remote.init()
    
    179 192
     
    
    180 193
                     request = buildstream_pb2.StatusRequest()
    
    ... ... @@ -252,6 +265,119 @@ class CASRemote():
    252 265
     
    
    253 266
             return message_digest
    
    254 267
     
    
    268
    +    # get_reference():
    
    269
    +    #
    
    270
    +    # Args:
    
    271
    +    #    ref (str): The ref to request
    
    272
    +    #
    
    273
    +    def get_reference(self, ref):
    
    274
    +        try:
    
    275
    +            self.init()
    
    276
    +
    
    277
    +            request = buildstream_pb2.GetReferenceRequest()
    
    278
    +            request.key = ref
    
    279
    +            return self.ref_storage.GetReference(request).digest
    
    280
    +        except grpc.RpcError as e:
    
    281
    +            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    282
    +                raise CASError("Failed to find ref {}: {}".format(ref, e)) from e
    
    283
    +            else:
    
    284
    +                return None
    
    285
    +
    
    286
    +    def get_tree_blob(self, tree_digest):
    
    287
    +        self.init()
    
    288
    +        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    289
    +        self._fetch_blob(tree_digest, f)
    
    290
    +
    
    291
    +        tree = remote_execution_pb2.Tree()
    
    292
    +        with open(f.name, 'rb') as tmp:
    
    293
    +            tree.ParseFromString(tmp.read())
    
    294
    +
    
    295
    +        return tree
    
    296
    +
    
    297
    +    # yield_directory_digests():
    
    298
    +    #
    
    299
    +    # Iterate over blobs digests starting from a root digest
    
    300
    +    #
    
    301
    +    # Args:
    
    302
    +    #     root_digest (digest): The root_digest to get a tree of
    
    303
    +    #     progress (callable): The progress callback, if any
    
    304
    +    #     subdir (str): The optional specific subdir to pull
    
    305
    +    #     excluded_subdirs (list): The optional list of subdirs to not pull
    
    306
    +    #
    
    307
    +    # Returns:
    
    308
    +    #     (iter digests): recursively iterates over digests contained in root directory
    
    309
    +    def yield_directory_digests(self, root_digest, *, progress=None, subdir=None, excluded_subdirs=[]):
    
    310
    +        self.init()
    
    311
    +
    
    312
    +        # TODO add subdir and progress stuff?
    
    313
    +        # Fetch artifact, excluded_subdirs determined in pullqueue
    
    314
    +        yield from self._yield_directory_digests(root_digest, excluded_subdirs=excluded_subdirs)
    
    315
    +
    
    316
    +    # yield_tree_digests():
    
    317
    +    #
    
    318
    +    # Fetches a tree file from digests and then iterates over child digests
    
    319
    +    #
    
    320
    +    # Args:
    
    321
    +    #     tree_digest (digest): tree digest
    
    322
    +    #
    
    323
    +    # Returns:
    
    324
    +    #     (iter digests): iterates over digests in tree message
    
    325
    +    def yield_tree_digests(self, tree_digest):
    
    326
    +        self.init()
    
    327
    +
    
    328
    +        # get tree file
    
    329
    +        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    330
    +        self._fetch_blob(tree_digest, f)
    
    331
    +        tree = remote_execution_pb2.Tree()
    
    332
    +        tree.ParseFromString(f.read())
    
    333
    +
    
    334
    +        tree.children.extend([tree.root])
    
    335
    +        for directory in tree.children:
    
    336
    +            for filenode in directory.files:
    
    337
    +                yield filenode.digest
    
    338
    +
    
    339
    +            # add the directory to downloaded tmp files to be added
    
    340
    +            f2 = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    341
    +            f2.write(directory.SerializeToString())
    
    342
    +            self.__tmp_downloads.append(f2)
    
    343
    +
    
    344
    +        # Add the tree directory to downloads right at the end
    
    345
    +        self.__tmp_downloads.append(f)
    
    346
    +
    
    347
    +    # request_blob():
    
    348
    +    #
    
    349
    +    # Request blob, triggering download depending via bytestream or cas
    
    350
    +    # BatchReadBlobs depending on size.
    
    351
    +    #
    
    352
    +    # Args:
    
    353
    +    #    digest (Digest): digest of the requested blob
    
    354
    +    #
    
    355
    +    def request_blob(self, digest):
    
    356
    +        if (not self.batch_read_supported or
    
    357
    +                digest.size_bytes > self.max_batch_total_size_bytes):
    
    358
    +            f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    359
    +            self._fetch_blob(digest, f)
    
    360
    +            self.__tmp_downloads.append(f)
    
    361
    +        elif self.__batch_read.add(digest) is False:
    
    362
    +            self._download_batch()
    
    363
    +            self.__batch_read.add(digest)
    
    364
    +
    
    365
    +    # get_blobs():
    
    366
    +    #
    
    367
    +    # Yield over downloaded blobs in the tmp file locations, causing the files
    
    368
    +    # to be deleted once they go out of scope.
    
    369
    +    #
    
    370
    +    # Returns:
    
    371
    +    #    iterator over NamedTemporaryFile
    
    372
    +    def get_blobs(self, request_batch=False):
    
    373
    +        # Send read batch request and download
    
    374
    +        if (request_batch is True and
    
    375
    +                self.batch_read_supported is True):
    
    376
    +            self._download_batch()
    
    377
    +
    
    378
    +        while self.__tmp_downloads:
    
    379
    +            yield self.__tmp_downloads.pop()
    
    380
    +
    
    255 381
         ################################################
    
    256 382
         #             Local Private Methods            #
    
    257 383
         ################################################
    
    ... ... @@ -266,6 +392,35 @@ class CASRemote():
    266 392
     
    
    267 393
             assert digest.size_bytes == os.fstat(stream.fileno()).st_size
    
    268 394
     
    
    395
    +    # _yield_directory_digests():
    
    396
    +    #
    
    397
    +    # Fetches remote directory and adds it to content addressable store.
    
    398
    +    #
    
    399
    +    # Fetches files, symbolic links and recursively other directories in
    
    400
    +    # the remote directory and adds them to the content addressable
    
    401
    +    # store.
    
    402
    +    #
    
    403
    +    # Args:
    
    404
    +    #     dir_digest (Digest): Digest object for the directory to fetch.
    
    405
    +    #     excluded_subdirs (list): The optional list of subdirs to not fetch
    
    406
    +    #
    
    407
    +    def _yield_directory_digests(self, dir_digest, *, excluded_subdirs=[]):
    
    408
    +        # get directory blob
    
    409
    +        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    410
    +        self._fetch_blob(dir_digest, f)
    
    411
    +
    
    412
    +        # need to read in directory structure to iterate over it
    
    413
    +        directory = remote_execution_pb2.Directory()
    
    414
    +        with open(f.name, 'rb') as tmp:
    
    415
    +            directory.ParseFromString(tmp.read())
    
    416
    +
    
    417
    +        yield dir_digest
    
    418
    +        for filenode in directory.files:
    
    419
    +            yield filenode.digest
    
    420
    +        for dirnode in directory.directories:
    
    421
    +            if dirnode.name not in excluded_subdirs:
    
    422
    +                yield from self._yield_directory_digests(dirnode.digest)
    
    423
    +
    
    269 424
         def _send_blob(self, digest, stream, u_uid=uuid.uuid4()):
    
    270 425
             resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
    
    271 426
                                       digest.hash, str(digest.size_bytes)])
    
    ... ... @@ -294,6 +449,15 @@ class CASRemote():
    294 449
     
    
    295 450
             assert response.committed_size == digest.size_bytes
    
    296 451
     
    
    452
    +    def _download_batch(self):
    
    453
    +        for _, data in self.__batch_read.send():
    
    454
    +            f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    455
    +            f.write(data)
    
    456
    +            f.flush()
    
    457
    +            self.__tmp_downloads.append(f)
    
    458
    +
    
    459
    +        self.__batch_read = _CASBatchRead(self)
    
    460
    +
    
    297 461
     
    
    298 462
     # Represents a batch of blobs queued for fetching.
    
    299 463
     #
    

  • buildstream/_context.py
    ... ... @@ -182,10 +182,11 @@ class Context():
    182 182
             _yaml.node_validate(defaults, [
    
    183 183
                 'sourcedir', 'builddir', 'artifactdir', 'logdir',
    
    184 184
                 'scheduler', 'artifacts', 'logging', 'projects',
    
    185
    -            'cache', 'prompt', 'workspacedir',
    
    185
    +            'cache', 'prompt', 'workspacedir', 'tmpdir'
    
    186 186
             ])
    
    187 187
     
    
    188
    -        for directory in ['sourcedir', 'builddir', 'artifactdir', 'logdir', 'workspacedir']:
    
    188
    +        for directory in ['sourcedir', 'builddir', 'artifactdir', 'logdir',
    
    189
    +                          'tmpdir', 'workspacedir']:
    
    189 190
                 # Allow the ~ tilde expansion and any environment variables in
    
    190 191
                 # path specification in the config files.
    
    191 192
                 #
    

  • buildstream/data/userconfig.yaml
    ... ... @@ -19,6 +19,9 @@ builddir: ${XDG_CACHE_HOME}/buildstream/build
    19 19
     # Location to store local binary artifacts
    
    20 20
     artifactdir: ${XDG_CACHE_HOME}/buildstream/artifacts
    
    21 21
     
    
    22
    +# tmp directory, used by casremote
    
    23
    +tmpdir: ${XDG_CACHE_HOME}/buildstream/tmp
    
    24
    +
    
    22 25
     # Location to store build logs
    
    23 26
     logdir: ${XDG_CACHE_HOME}/buildstream/logs
    
    24 27
     
    

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -238,7 +238,7 @@ class SandboxRemote(Sandbox):
    238 238
     
    
    239 239
             context = self._get_context()
    
    240 240
             cascache = context.get_cascache()
    
    241
    -        casremote = CASRemote(self.storage_remote_spec)
    
    241
    +        casremote = CASRemote(self.storage_remote_spec, context.tmpdir)
    
    242 242
     
    
    243 243
             # Now do a pull to ensure we have the necessary parts.
    
    244 244
             dir_digest = cascache.pull_tree(casremote, tree_digest)
    
    ... ... @@ -297,7 +297,7 @@ class SandboxRemote(Sandbox):
    297 297
             action_result = self._check_action_cache(action_digest)
    
    298 298
     
    
    299 299
             if not action_result:
    
    300
    -            casremote = CASRemote(self.storage_remote_spec)
    
    300
    +            casremote = CASRemote(self.storage_remote_spec, self._get_context().tmpdir)
    
    301 301
     
    
    302 302
                 # Now, push that key (without necessarily needing a ref) to the remote.
    
    303 303
                 try:
    

  • tests/artifactcache/pull.py
    ... ... @@ -110,7 +110,7 @@ def test_pull(cli, tmpdir, datafiles):
    110 110
             # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    111 111
             process = multiprocessing.Process(target=_queue_wrapper,
    
    112 112
                                               args=(_test_pull, queue, user_config_file, project_dir,
    
    113
    -                                                artifact_dir, 'target.bst', element_key))
    
    113
    +                                                artifact_dir, tmpdir, 'target.bst', element_key))
    
    114 114
     
    
    115 115
             try:
    
    116 116
                 # Keep SIGINT blocked in the child process
    
    ... ... @@ -127,13 +127,14 @@ def test_pull(cli, tmpdir, datafiles):
    127 127
             assert cas.contains(element, element_key)
    
    128 128
     
    
    129 129
     
    
    130
    -def _test_pull(user_config_file, project_dir, artifact_dir,
    
    130
    +def _test_pull(user_config_file, project_dir, artifact_dir, tmpdir,
    
    131 131
                    element_name, element_key, queue):
    
    132 132
         # Fake minimal context
    
    133 133
         context = Context()
    
    134 134
         context.load(config=user_config_file)
    
    135 135
         context.artifactdir = artifact_dir
    
    136 136
         context.set_message_handler(message_handler)
    
    137
    +    context.tmpdir = tmpdir
    
    137 138
     
    
    138 139
         # Load the project manually
    
    139 140
         project = Project(project_dir, context)
    
    ... ... @@ -218,7 +219,7 @@ def test_pull_tree(cli, tmpdir, datafiles):
    218 219
             # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    219 220
             process = multiprocessing.Process(target=_queue_wrapper,
    
    220 221
                                               args=(_test_push_tree, queue, user_config_file, project_dir,
    
    221
    -                                                artifact_dir, artifact_digest))
    
    222
    +                                                artifact_dir, tmpdir, artifact_digest))
    
    222 223
     
    
    223 224
             try:
    
    224 225
                 # Keep SIGINT blocked in the child process
    
    ... ... @@ -246,7 +247,7 @@ def test_pull_tree(cli, tmpdir, datafiles):
    246 247
             # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    247 248
             process = multiprocessing.Process(target=_queue_wrapper,
    
    248 249
                                               args=(_test_pull_tree, queue, user_config_file, project_dir,
    
    249
    -                                                artifact_dir, tree_digest))
    
    250
    +                                                artifact_dir, tmpdir, tree_digest))
    
    250 251
     
    
    251 252
             try:
    
    252 253
                 # Keep SIGINT blocked in the child process
    
    ... ... @@ -268,12 +269,14 @@ def test_pull_tree(cli, tmpdir, datafiles):
    268 269
             assert os.path.exists(cas.objpath(directory_digest))
    
    269 270
     
    
    270 271
     
    
    271
    -def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
    
    272
    +def _test_push_tree(user_config_file, project_dir, artifact_dir, tmpdir,
    
    273
    +                    artifact_digest, queue):
    
    272 274
         # Fake minimal context
    
    273 275
         context = Context()
    
    274 276
         context.load(config=user_config_file)
    
    275 277
         context.artifactdir = artifact_dir
    
    276 278
         context.set_message_handler(message_handler)
    
    279
    +    context.tmpdir
    
    277 280
     
    
    278 281
         # Load the project manually
    
    279 282
         project = Project(project_dir, context)
    
    ... ... @@ -304,12 +307,14 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest
    304 307
             queue.put("No remote configured")
    
    305 308
     
    
    306 309
     
    
    307
    -def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
    
    310
    +def _test_pull_tree(user_config_file, project_dir, artifact_dir, tmpdir,
    
    311
    +                    artifact_digest, queue):
    
    308 312
         # Fake minimal context
    
    309 313
         context = Context()
    
    310 314
         context.load(config=user_config_file)
    
    311 315
         context.artifactdir = artifact_dir
    
    312 316
         context.set_message_handler(message_handler)
    
    317
    +    context.tmpdir = tmpdir
    
    313 318
     
    
    314 319
         # Load the project manually
    
    315 320
         project = Project(project_dir, context)
    

  • tests/testutils/runcli.py
    ... ... @@ -495,7 +495,8 @@ def cli_integration(tmpdir, integration_cache):
    495 495
         # to avoid downloading the huge base-sdk repeatedly
    
    496 496
         fixture.configure({
    
    497 497
             'sourcedir': os.path.join(integration_cache, 'sources'),
    
    498
    -        'artifactdir': os.path.join(integration_cache, 'artifacts')
    
    498
    +        'artifactdir': os.path.join(integration_cache, 'artifacts'),
    
    499
    +        'tmpdir': os.path.join(integration_cache, 'tmp')
    
    499 500
         })
    
    500 501
     
    
    501 502
         return fixture
    
    ... ... @@ -539,6 +540,8 @@ def configured(directory, config=None):
    539 540
             config['builddir'] = os.path.join(directory, 'build')
    
    540 541
         if not config.get('artifactdir', False):
    
    541 542
             config['artifactdir'] = os.path.join(directory, 'artifacts')
    
    543
    +    if not config.get('tmpdir', False):
    
    544
    +        config['tmpdir'] = os.path.join(directory, 'tmp')
    
    542 545
         if not config.get('logdir', False):
    
    543 546
             config['logdir'] = os.path.join(directory, 'logs')
    
    544 547
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]