[Notes] [Git][BuildStream/buildstream][raoul/802-refactor-artifactcache] artifactcache: implemented pull_tree similar to pull



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream

Commits:

4 changed files:

Changes:

  • buildstream/_artifactcache.py
    ... ... @@ -694,15 +694,26 @@ class ArtifactCache():
    694 694
         #
    
    695 695
         # Args:
    
    696 696
         #     project (Project): The current project
    
    697
    -    #     digest (Digest): The digest of the tree
    
    697
    +    #     tree_digest (Digest): The digest of the tree
    
    698 698
         #
    
    699
    -    def pull_tree(self, project, digest):
    
    699
    +    def pull_tree(self, project, tree_digest):
    
    700 700
             for remote in self._remotes[project]:
    
    701
    -            digest = self.cas.pull_tree(remote, digest)
    
    701
    +            try:
    
    702
    +                for blob_digest in remote.yield_tree_digests(tree_digest):
    
    703
    +                    if self.cas.check_blob(blob_digest):
    
    704
    +                        continue
    
    705
    +                    remote.request_blob(blob_digest)
    
    706
    +                    for blob_file in remote.get_blobs():
    
    707
    +                        self.cas.add_object(path=blob_file.name, link_directly=True)
    
    702 708
     
    
    703
    -            if digest:
    
    704
    -                # no need to pull from additional remotes
    
    705
    -                return digest
    
    709
    +                # Get the last batch
    
    710
    +                for blob_file in remote.get_blobs(request_batch=True):
    
    711
    +                    self.cas.add_object(path=blob_file.name, link_directly=True)
    
    712
    +
    
    713
    +            except BlobNotFound:
    
    714
    +                continue
    
    715
    +            else:
    
    716
    +                return tree_digest
    
    706 717
     
    
    707 718
             return None
    
    708 719
     
    

  • buildstream/_cas/cascache.py
    ... ... @@ -183,29 +183,6 @@ class CASCache():
    183 183
     
    
    184 184
             return modified, removed, added
    
    185 185
     
    
    186
    -    # pull_tree():
    
    187
    -    #
    
    188
    -    # Pull a single Tree rather than a ref.
    
    189
    -    # Does not update local refs.
    
    190
    -    #
    
    191
    -    # Args:
    
    192
    -    #     remote (CASRemote): The remote to pull from
    
    193
    -    #     digest (Digest): The digest of the tree
    
    194
    -    #
    
    195
    -    def pull_tree(self, remote, digest):
    
    196
    -        try:
    
    197
    -            remote.init()
    
    198
    -
    
    199
    -            digest = self._fetch_tree(remote, digest)
    
    200
    -
    
    201
    -            return digest
    
    202
    -
    
    203
    -        except grpc.RpcError as e:
    
    204
    -            if e.code() != grpc.StatusCode.NOT_FOUND:
    
    205
    -                raise
    
    206
    -
    
    207
    -        return None
    
    208
    -
    
    209 186
         # link_ref():
    
    210 187
         #
    
    211 188
         # Add an alias for an existing ref.
    
    ... ... @@ -771,29 +748,6 @@ class CASCache():
    771 748
     
    
    772 749
             return objpath
    
    773 750
     
    
    774
    -    def _fetch_tree(self, remote, digest):
    
    775
    -        # download but do not store the Tree object
    
    776
    -        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
    
    777
    -            remote._fetch_blob(digest, out)
    
    778
    -
    
    779
    -            tree = remote_execution_pb2.Tree()
    
    780
    -
    
    781
    -            with open(out.name, 'rb') as f:
    
    782
    -                tree.ParseFromString(f.read())
    
    783
    -
    
    784
    -            tree.children.extend([tree.root])
    
    785
    -            for directory in tree.children:
    
    786
    -                for filenode in directory.files:
    
    787
    -                    self._ensure_blob(remote, filenode.digest)
    
    788
    -
    
    789
    -                # place directory blob only in final location when we've downloaded
    
    790
    -                # all referenced blobs to avoid dangling references in the repository
    
    791
    -                dirbuffer = directory.SerializeToString()
    
    792
    -                dirdigest = self.add_object(buffer=dirbuffer)
    
    793
    -                assert dirdigest.size_bytes == len(dirbuffer)
    
    794
    -
    
    795
    -        return dirdigest
    
    796
    -
    
    797 751
         def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
    
    798 752
             required_blobs = self._required_blobs(digest)
    
    799 753
     
    

  • buildstream/_cas/casremote.py
    ... ... @@ -281,19 +281,29 @@ class CASRemote():
    281 281
                 else:
    
    282 282
                     return None
    
    283 283
     
    
    284
    +    def get_tree_blob(self, tree_digest):
    
    285
    +        self.init()
    
    286
    +        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    287
    +        self._fetch_blob(tree_digest, f)
    
    288
    +
    
    289
    +        tree = remote_execution_pb2.Tree()
    
    290
    +        with open(f.name, 'rb') as tmp:
    
    291
    +            tree.ParseFromString(tmp.read())
    
    292
    +
    
    293
    +        return tree
    
    294
    +
    
    284 295
         # yield_directory_digests():
    
    285 296
         #
    
    286 297
         # Iterate over blobs digests starting from a root digest
    
    287 298
         #
    
    288 299
         # Args:
    
    289
    -    #     root_digest (str): The root_digest to get a tree of
    
    300
    +    #     root_digest (digest): The root_digest to get a tree of
    
    290 301
         #     progress (callable): The progress callback, if any
    
    291 302
         #     subdir (str): The optional specific subdir to pull
    
    292 303
         #     excluded_subdirs (list): The optional list of subdirs to not pull
    
    293 304
         #
    
    294 305
         # Returns:
    
    295
    -    #   (iter): True if pull was successful, False if ref was not available
    
    296
    -    #
    
    306
    +    #     (iter digests): recursively iterates over digests contained in root directory
    
    297 307
         def yield_directory_digests(self, root_digest, *, progress=None, subdir=None, excluded_subdirs=[]):
    
    298 308
             self.init()
    
    299 309
     
    
    ... ... @@ -301,6 +311,37 @@ class CASRemote():
    301 311
             # Fetch artifact, excluded_subdirs determined in pullqueue
    
    302 312
             yield from self._yield_directory_digests(root_digest, excluded_subdirs=excluded_subdirs)
    
    303 313
     
    
    314
    +    # yield_tree_digests():
    
    315
    +    #
    
    316
    +    # Fetches a tree file from digests and then iterates over child digests
    
    317
    +    #
    
    318
    +    # Args:
    
    319
    +    #     tree_digest (digest): tree digest
    
    320
    +    #
    
    321
    +    # Returns:
    
    322
    +    #     (iter digests): iterates over digests in tree message
    
    323
    +    def yield_tree_digests(self, tree_digest):
    
    324
    +        self.init()
    
    325
    +
    
    326
    +        # get tree file
    
    327
    +        f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    328
    +        self._fetch_blob(tree_digest, f)
    
    329
    +        tree = remote_execution_pb2.Tree()
    
    330
    +        tree.ParseFromString(f.read())
    
    331
    +
    
    332
    +        tree.children.extend([tree.root])
    
    333
    +        for directory in tree.children:
    
    334
    +            for filenode in directory.files:
    
    335
    +                yield filenode.digest
    
    336
    +
    
    337
    +            # add the directory to downloaded tmp files to be added
    
    338
    +            f2 = tempfile.NamedTemporaryFile(dir=self.tmpdir)
    
    339
    +            f2.write(directory.SerializeToString())
    
    340
    +            self.__tmp_downloads.append(f2)
    
    341
    +
    
    342
    +        # Add the tree directory to downloads right at the end
    
    343
    +        self.__tmp_downloads.append(f)
    
    344
    +
    
    304 345
         # request_blob():
    
    305 346
         #
    
    306 347
         # Request blob, triggering download depending via bytestream or cas
    

  • tests/artifactcache/pull.py
    ... ... @@ -110,7 +110,7 @@ def test_pull(cli, tmpdir, datafiles):
    110 110
             # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
    
    111 111
             process = multiprocessing.Process(target=_queue_wrapper,
    
    112 112
                                               args=(_test_pull, queue, user_config_file, project_dir,
    
    113
    -                                                artifact_dir, 'target.bst', element_key))
    
    113
    +                                                artifact_dir, tmpdir, 'target.bst', element_key))
    
    114 114
     
    
    115 115
             try:
    
    116 116
                 # Keep SIGINT blocked in the child process
    
    ... ... @@ -127,13 +127,14 @@ def test_pull(cli, tmpdir, datafiles):
    127 127
             assert cas.contains(element, element_key)
    
    128 128
     
    
    129 129
     
    
    130
    -def _test_pull(user_config_file, project_dir, artifact_dir,
    
    130
    +def _test_pull(user_config_file, project_dir, artifact_dir, tmpdir,
    
    131 131
                    element_name, element_key, queue):
    
    132 132
         # Fake minimal context
    
    133 133
         context = Context()
    
    134 134
         context.load(config=user_config_file)
    
    135 135
         context.artifactdir = artifact_dir
    
    136 136
         context.set_message_handler(message_handler)
    
    137
    +    context.tmpdir = tmpdir
    
    137 138
     
    
    138 139
         # Load the project manually
    
    139 140
         project = Project(project_dir, context)
    
    ... ... @@ -246,7 +247,7 @@ def test_pull_tree(cli, tmpdir, datafiles):
    246 247
             # Use subprocess to avoid creation of gRPC threads in main BuildStream process
    
    247 248
             process = multiprocessing.Process(target=_queue_wrapper,
    
    248 249
                                               args=(_test_pull_tree, queue, user_config_file, project_dir,
    
    249
    -                                                artifact_dir, tree_digest))
    
    250
    +                                                artifact_dir, tmpdir, tree_digest))
    
    250 251
     
    
    251 252
             try:
    
    252 253
                 # Keep SIGINT blocked in the child process
    
    ... ... @@ -265,15 +266,18 @@ def test_pull_tree(cli, tmpdir, datafiles):
    265 266
                                                            size_bytes=directory_size)
    
    266 267
     
    
    267 268
             # Ensure the entire Tree stucture has been pulled
    
    269
    +        print(cas.objpath(directory_digest))
    
    268 270
             assert os.path.exists(cas.objpath(directory_digest))
    
    269 271
     
    
    270 272
     
    
    271
    -def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
    
    273
    +def _test_push_tree(user_config_file, project_dir, artifact_dir, tmpdir,
    
    274
    +                    artifact_digest, queue):
    
    272 275
         # Fake minimal context
    
    273 276
         context = Context()
    
    274 277
         context.load(config=user_config_file)
    
    275 278
         context.artifactdir = artifact_dir
    
    276 279
         context.set_message_handler(message_handler)
    
    280
    +    context.tmpdir
    
    277 281
     
    
    278 282
         # Load the project manually
    
    279 283
         project = Project(project_dir, context)
    
    ... ... @@ -304,12 +308,14 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest
    304 308
             queue.put("No remote configured")
    
    305 309
     
    
    306 310
     
    
    307
    -def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
    
    311
    +def _test_pull_tree(user_config_file, project_dir, artifact_dir, tmpdir,
    
    312
    +                    artifact_digest, queue):
    
    308 313
         # Fake minimal context
    
    309 314
         context = Context()
    
    310 315
         context.load(config=user_config_file)
    
    311 316
         context.artifactdir = artifact_dir
    
    312 317
         context.set_message_handler(message_handler)
    
    318
    +    context.tmpdir = tmpdir
    
    313 319
     
    
    314 320
         # Load the project manually
    
    315 321
         project = Project(project_dir, context)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]