[Notes] [Git][BuildStream/buildstream][raoul/802-refactor-artifactcache] 2 commits: _casremote.py: Add batching to pull command



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream

Commits:

3 changed files:

Changes:

  • buildstream/_artifactcache.py
    ... ... @@ -656,6 +656,11 @@ class ArtifactCache():
    656 656
                                 remote.request_blob(blob_digest)
    
    657 657
                                 for blob_file in remote.get_blobs():
    
    658 658
                                     self.cas.add_object(path=blob_file.name, link_directly=True)
    
    659
    +
    
    660
    +                        # request the final CAS batch
    
    661
    +                        for blob_file in remote.get_blobs(request_batch=True):
    
    662
    +                            self.cas.add_object(path=blob_file.name, link_directly=True)
    
    663
    +
    
    659 664
                             self.cas.set_ref(ref, root_digest)
    
    660 665
                         except BlobNotFound:
    
    661 666
                             element.info("Remote ({}) is missing blobs for {}".format(
    
    ... ... @@ -689,15 +694,36 @@ class ArtifactCache():
    689 694
         #
    
    690 695
         # Args:
    
    691 696
         #     project (Project): The current project
    
    692
    -    #     digest (Digest): The digest of the tree
    
    697
    +    #     tree_digest (Digest): The digest of the tree
    
    693 698
         #
    
    694
    -    def pull_tree(self, project, digest):
    
    699
    +    def pull_tree(self, project, tree_digest):
    
    695 700
             for remote in self._remotes[project]:
    
    696
    -            digest = self.cas.pull_tree(remote, digest)
    
    697
    -
    
    698
    -            if digest:
    
    699
    -                # no need to pull from additional remotes
    
    700
    -                return digest
    
    701
    +            try:
    
    702
    +                # get tree
    
    703
    +                tree = remote.get_tree_blob(tree_digest)
    
    704
    +
    
    705
    +                # request files
    
    706
    +                tree.children.extend([tree.root])
    
    707
    +                for directory in tree.children:
    
    708
    +                    for filenode in directory.files:
    
    709
    +                        if self.cas.check_blob(filenode.digest):
    
    710
    +                            continue
    
    711
    +                        remote.request_blob(blob_digest)
    
    712
    +                        for blob_file in remote.get_blobs():
    
    713
    +                            self.cas.add_object(path=blob_file.name)
    
    714
    +
    
    715
    +                # Get the last batch
    
    716
    +                for blob in remote.get_blobs(request_batch=True):
    
    717
    +                    self.cas.add_object(blob)
    
    718
    +
    
    719
    +                # add the directory to CAS
    
    720
    +                for directory in tree.children:
    
    721
    +                    self.cas.add_object(buffer=directory.SerializeToString())
    
    722
    +
    
    723
    +            except BlobNotFound:
    
    724
    +                continue
    
    725
    +            else:
    
    726
    +                return tree_digest
    
    701 727
     
    
    702 728
             return None
    
    703 729
     
    

  • buildstream/_cas/cascache.py
    ... ... @@ -183,29 +183,6 @@ class CASCache():
    183 183
     
    
    184 184
             return modified, removed, added
    
    185 185
     
    
    186
    -    # pull_tree():
    
    187
    -    #
    
    188
    -    # Pull a single Tree rather than a ref.
    
    189
    -    # Does not update local refs.
    
    190
    -    #
    
    191
    -    # Args:
    
    192
    -    #     remote (CASRemote): The remote to pull from
    
    193
    -    #     digest (Digest): The digest of the tree
    
    194
    -    #
    
    195
    -    def pull_tree(self, remote, digest):
    
    196
    -        try:
    
    197
    -            remote.init()
    
    198
    -
    
    199
    -            digest = self._fetch_tree(remote, digest)
    
    200
    -
    
    201
    -            return digest
    
    202
    -
    
    203
    -        except grpc.RpcError as e:
    
    204
    -            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    205
    -                raise
    
    206
    -
    
    207
    -        return None
    
    208
    -
    
    209 186
         # link_ref():
    
    210 187
         #
    
    211 188
         # Add an alias for an existing ref.
    
    ... ... @@ -771,29 +748,6 @@ class CASCache():
    771 748
     
    
    772 749
             return objpath
    
    773 750
     
    
    774
    -    def _fetch_tree(self, remote, digest):
    
    775
    -        # download but do not store the Tree object
    
    776
    -        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    777
    -            remote._fetch_blob(digest, out)
    
    778
    -
    
    779
    -            tree = remote_execution_pb2.Tree()
    
    780
    -
    
    781
    -            with open(out.name, 'rb') as f:
    
    782
    -                tree.ParseFromString(f.read())
    
    783
    -
    
    784
    -            tree.children.extend([tree.root])
    
    785
    -            for directory in tree.children:
    
    786
    -                for filenode in directory.files:
    
    787
    -                    self._ensure_blob(remote, filenode.digest)
    
    788
    -
    
    789
    -                # place directory blob only in final location when we've downloaded
    
    790
    -                # all referenced blobs to avoid dangling references in the repository
    
    791
    -                dirbuffer = directory.SerializeToString()
    
    792
    -                dirdigest = self.add_object(buffer=dirbuffer)
    
    793
    -                assert dirdigest.size_bytes == len(dirbuffer)
    
    794
    -
    
    795
    -        return dirdigest
    
    796
    -
    
    797 751
         def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
    
    798 752
             required_blobs = self._required_blobs(digest)
    
    799 753
     
    

  • buildstream/_cas/casremote.py
    ... ... @@ -95,6 +95,9 @@ class CASRemote():
    95 95
     
    
    96 96
             self.__tmp_downloads = []  # files in the tmpdir waiting to be added to local caches
    
    97 97
     
    
    98
    +        self.__batch_read = None
    
    99
    +        self.__batch_update = None
    
    100
    +
    
    98 101
         def init(self):
    
    99 102
             if not self._initialized:
    
    100 103
                 url = urlparse(self.spec.url)
    
    ... ... @@ -152,6 +155,7 @@ class CASRemote():
    152 155
                     request = remote_execution_pb2.BatchReadBlobsRequest()
    
    153 156
                     response = self.cas.BatchReadBlobs(request)
    
    154 157
                     self.batch_read_supported = True
    
    158
    +                self.__batch_read = _CASBatchRead(self)
    
    155 159
                 except grpc.RpcError as e:
    
    156 160
                     if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    157 161
                         raise
    
    ... ... @@ -162,6 +166,7 @@ class CASRemote():
    162 166
                     request = remote_execution_pb2.BatchUpdateBlobsRequest()
    
    163 167
                     response = self.cas.BatchUpdateBlobs(request)
    
    164 168
                     self.batch_update_supported = True
    
    169
    +                self.__batch_update = _CASBatchUpdate(self)
    
    165 170
                 except grpc.RpcError as e:
    
    166 171
                     if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
    
    167 172
                             e.code() != grpc.StatusCode.PERMISSION_DENIED):
    
    ... ... @@ -276,6 +281,17 @@ class CASRemote():
    276 281
                 else:
    
    277 282
                     return None
    
    278 283
     
    
    284
    +    def get_tree_blob(self, tree_digest):
    
    285
    +        self.init()
    
    286
    +        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    287
    +        self._fetch_blob(tree_digest, f)
    
    288
    +
    
    289
    +        tree = remote_execution_pb2.Tree()
    
    290
    +        with open(f.name, 'rb') as tmp:
    
    291
    +            tree.ParseFromString(tmp.read())
    
    292
    +
    
    293
    +        return tree
    
    294
    +
    
    279 295
         # yield_directory_digests():
    
    280 296
         #
    
    281 297
         # Iterate over blobs digests starting from a root digest
    
    ... ... @@ -296,20 +312,30 @@ class CASRemote():
    296 312
             # Fetch artifact, excluded_subdirs determined in pullqueue
    
    297 313
             yield from self._yield_directory_digests(root_digest, excluded_subdirs=excluded_subdirs)
    
    298 314
     
    
    315
    +    def yield_tree_digests(self, tree_digest):
    
    316
    +        self.init()
    
    317
    +        tree.children.extend([tree.root])
    
    318
    +        for directory in tree.children:
    
    319
    +            for filenode in directory.files:
    
    320
    +                yield filenode.digest
    
    321
    +
    
    299 322
         # request_blob():
    
    300 323
         #
    
    301
    -    # Request blob and returns path to tmpdir location
    
    324
    +    # Request blob, triggering download depending via bytestream or cas
    
    325
    +    # BatchReadBlobs depending on size.
    
    302 326
         #
    
    303 327
         # Args:
    
    304 328
         #    digest (Digest): digest of the requested blob
    
    305
    -    #    path (str): tmpdir locations of downloaded blobs
    
    306 329
         #
    
    307 330
         def request_blob(self, digest):
    
    308
    -        # TODO expand for adding to batches some other logic
    
    309
    -        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    310
    -        self._fetch_blob(digest, f)
    
    311
    -        self.__tmp_downloads.append(f)
    
    312
    -        return f.name
    
    331
    +        if (not self.batch_read_supported or
    
    332
    +                digest.size_bytes > self.max_batch_total_size_bytes):
    
    333
    +            f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    334
    +            self._fetch_blob(digest, f)
    
    335
    +            self.__tmp_downloads.append(f)
    
    336
    +        elif self.__batch_read.add(digest) is False:
    
    337
    +            self._download_batch()
    
    338
    +            self.__batch_read.add(digest)
    
    313 339
     
    
    314 340
         # get_blobs():
    
    315 341
         #
    
    ... ... @@ -318,7 +344,12 @@ class CASRemote():
    318 344
         #
    
    319 345
         # Returns:
    
    320 346
         #    iterator over NamedTemporaryFile
    
    321
    -    def get_blobs(self):
    
    347
    +    def get_blobs(self, request_batch=False):
    
    348
    +        # Send read batch request and download
    
    349
    +        if (request_batch is True and
    
    350
    +                self.batch_read_supported is True):
    
    351
    +            self._download_batch()
    
    352
    +
    
    322 353
             while self.__tmp_downloads:
    
    323 354
                 yield self.__tmp_downloads.pop()
    
    324 355
     
    
    ... ... @@ -349,18 +380,18 @@ class CASRemote():
    349 380
         #     excluded_subdirs (list): The optional list of subdirs to not fetch
    
    350 381
         #
    
    351 382
         def _yield_directory_digests(self, dir_digest, *, excluded_subdirs=[]):
    
    383
    +        # get directory blob
    
    384
    +        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    385
    +        self._fetch_blob(dir_digest, f)
    
    352 386
     
    
    353
    -        objpath = self.request_blob(dir_digest)
    
    354
    -
    
    387
    +        # need to read in directory structure to iterate over it
    
    355 388
             directory = remote_execution_pb2.Directory()
    
    356
    -
    
    357
    -        with open(objpath, 'rb') as f:
    
    358
    -            directory.ParseFromString(f.read())
    
    389
    +        with open(f.name, 'rb') as tmp:
    
    390
    +            directory.ParseFromString(tmp.read())
    
    359 391
     
    
    360 392
             yield dir_digest
    
    361 393
             for filenode in directory.files:
    
    362 394
                 yield filenode.digest
    
    363
    -
    
    364 395
             for dirnode in directory.directories:
    
    365 396
                 if dirnode.name not in excluded_subdirs:
    
    366 397
                     yield from self._yield_directory_digests(dirnode.digest)
    
    ... ... @@ -393,6 +424,15 @@ class CASRemote():
    393 424
     
    
    394 425
             assert response.committed_size == digest.size_bytes
    
    395 426
     
    
    427
    +    def _download_batch(self):
    
    428
    +        for _, data in self.__batch_read.send():
    
    429
    +            f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    430
    +            f.write(data)
    
    431
    +            f.flush()
    
    432
    +            self.__tmp_downloads.append(f)
    
    433
    +
    
    434
    +        self.__batch_read = _CASBatchRead(self)
    
    435
    +
    
    396 436
     
    
    397 437
     # Represents a batch of blobs queued for fetching.
    
    398 438
     #
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]