[Notes] [Git][BuildGrid/buildgrid][santi/fetch-dir-batch-read] Downloader._write_directory(): request dirs in batches



Title: GitLab

Santiago Gil pushed to branch santi/fetch-dir-batch-read at BuildGrid / buildgrid

Commits:

1 changed file:

Changes:

  • buildgrid/client/cas.py
    ... ... @@ -24,7 +24,7 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p
    24 24
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    25 25
     from buildgrid._protos.google.rpc import code_pb2
    
    26 26
     from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
    
    27
    -from buildgrid.utils import merkle_tree_maker
    
    27
    +from buildgrid.utils import create_digest, merkle_tree_maker
    
    28 28
     
    
    29 29
     # Maximum size for a queueable file:
    
    30 30
     FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    ... ... @@ -417,9 +417,8 @@ class Downloader:
    417 417
                     else:
    
    418 418
                         assert False
    
    419 419
     
    
    420
    -        # TODO: Try with BatchReadBlobs().
    
    421
    -
    
    422
    -        # Fallback to Read() if no GetTree():
    
    420
    +        # If no GetTree(), _write_directory() will use BatchReadBlobs()
    
    421
    +        # if available or Read() if not.
    
    423 422
             if not directory_fetched:
    
    424 423
                 directory = remote_execution_pb2.Directory()
    
    425 424
                 directory.ParseFromString(self._fetch_blob(digest))
    
    ... ... @@ -427,26 +426,54 @@ class Downloader:
    427 426
                 self._write_directory(directory, directory_path,
    
    428 427
                                       root_barrier=directory_path)
    
    429 428
     
    
    430
    -    def _write_directory(self, root_directory, root_path, directories=None, root_barrier=None):
    
    429
    +    def _write_directory(self, root_directory, root_path, directories=None,
    
    430
    +                         root_barrier=None):
    
    431 431
             """Generates a local directory structure"""
    
    432
    +
    
    433
    +        # i) Files:
    
    432 434
             for file_node in root_directory.files:
    
    433 435
                 file_path = os.path.join(root_path, file_node.name)
    
    434 436
     
    
    435
    -            self._queue_file(file_node.digest, file_path, is_executable=file_node.is_executable)
    
    437
    +            self._queue_file(file_node.digest, file_path,
    
    438
    +                             is_executable=file_node.is_executable)
    
    436 439
     
    
    440
    +        # ii) Directories:
    
    441
    +        pending_directory_digests = []
    
    442
    +        pending_directory_paths = {}
    
    437 443
             for directory_node in root_directory.directories:
    
    444
    +            directory_hash = directory_node.digest.hash
    
    445
    +
    
    438 446
                 directory_path = os.path.join(root_path, directory_node.name)
    
    447
    +            os.makedirs(directory_path, exist_ok=True)
    
    448
    +
    
    439 449
                 if directories and directory_node.digest.hash in directories:
    
    440
    -                directory = directories[directory_node.digest.hash]
    
    450
    +                # We already have the directory; just write it:
    
    451
    +                directory = directories[directory_hash]
    
    452
    +
    
    453
    +                self._write_directory(directory, directory_path,
    
    454
    +                                      directories=directories,
    
    455
    +                                      root_barrier=root_barrier)
    
    441 456
                 else:
    
    457
    +                # Gather all the directories that we need to get to
    
    458
    +                # try fetching them in a single batch request:
    
    459
    +                pending_directory_digests.append(directory_node.digest)
    
    460
    +                pending_directory_paths[directory_hash] = directory_path
    
    461
    +
    
    462
    +        if pending_directory_paths:
    
    463
    +            fetched_blobs = self._fetch_blob_batch(pending_directory_digests)
    
    464
    +
    
    465
    +            for directory_blob in fetched_blobs:
    
    442 466
                     directory = remote_execution_pb2.Directory()
    
    443
    -                directory.ParseFromString(self._fetch_blob(directory_node.digest))
    
    467
    +                directory.ParseFromString(directory_blob)
    
    444 468
     
    
    445
    -            os.makedirs(directory_path, exist_ok=True)
    
    469
    +                directory_hash = create_digest(directory_blob).hash
    
    470
    +                directory_path = pending_directory_paths[directory_hash]
    
    446 471
     
    
    447
    -            self._write_directory(directory, directory_path,
    
    448
    -                                  directories=directories, root_barrier=root_barrier)
    
    472
    +                self._write_directory(directory, directory_path,
    
    473
    +                                      directories=directories,
    
    474
    +                                      root_barrier=root_barrier)
    
    449 475
     
    
    476
    +        # iii) Symlinks:
    
    450 477
             for symlink_node in root_directory.symlinks:
    
    451 478
                 symlink_path = os.path.join(root_path, symlink_node.name)
    
    452 479
                 if not os.path.isabs(symlink_node.target):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]