... |
... |
@@ -24,7 +24,7 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p |
24
|
24
|
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
25
|
25
|
from buildgrid._protos.google.rpc import code_pb2
|
26
|
26
|
from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
27
|
|
-from buildgrid.utils import merkle_tree_maker
|
|
27
|
+from buildgrid.utils import create_digest, merkle_tree_maker
|
28
|
28
|
|
29
|
29
|
# Maximum size for a queueable file:
|
30
|
30
|
FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
|
... |
... |
@@ -417,9 +417,8 @@ class Downloader: |
417
|
417
|
else:
|
418
|
418
|
assert False
|
419
|
419
|
|
420
|
|
- # TODO: Try with BatchReadBlobs().
|
421
|
|
-
|
422
|
|
- # Fallback to Read() if no GetTree():
|
|
420
|
+ # If no GetTree(), _write_directory() will use BatchReadBlobs()
|
|
421
|
+ # if available or Read() if not.
|
423
|
422
|
if not directory_fetched:
|
424
|
423
|
directory = remote_execution_pb2.Directory()
|
425
|
424
|
directory.ParseFromString(self._fetch_blob(digest))
|
... |
... |
@@ -427,26 +426,54 @@ class Downloader: |
427
|
426
|
self._write_directory(directory, directory_path,
|
428
|
427
|
root_barrier=directory_path)
|
429
|
428
|
|
430
|
|
- def _write_directory(self, root_directory, root_path, directories=None, root_barrier=None):
|
|
429
|
+ def _write_directory(self, root_directory, root_path, directories=None,
|
|
430
|
+ root_barrier=None):
|
431
|
431
|
"""Generates a local directory structure"""
|
|
432
|
+
|
|
433
|
+ # i) Files:
|
432
|
434
|
for file_node in root_directory.files:
|
433
|
435
|
file_path = os.path.join(root_path, file_node.name)
|
434
|
436
|
|
435
|
|
- self._queue_file(file_node.digest, file_path, is_executable=file_node.is_executable)
|
|
437
|
+ self._queue_file(file_node.digest, file_path,
|
|
438
|
+ is_executable=file_node.is_executable)
|
436
|
439
|
|
|
440
|
+ # ii) Directories:
|
|
441
|
+ pending_directory_digests = []
|
|
442
|
+ pending_directory_paths = {}
|
437
|
443
|
for directory_node in root_directory.directories:
|
|
444
|
+ directory_hash = directory_node.digest.hash
|
|
445
|
+
|
438
|
446
|
directory_path = os.path.join(root_path, directory_node.name)
|
|
447
|
+ os.makedirs(directory_path, exist_ok=True)
|
|
448
|
+
|
439
|
449
|
if directories and directory_node.digest.hash in directories:
|
440
|
|
- directory = directories[directory_node.digest.hash]
|
|
450
|
+ # We already have the directory; just write it:
|
|
451
|
+ directory = directories[directory_hash]
|
|
452
|
+
|
|
453
|
+ self._write_directory(directory, directory_path,
|
|
454
|
+ directories=directories,
|
|
455
|
+ root_barrier=root_barrier)
|
441
|
456
|
else:
|
|
457
|
+ # Gather all the directories that we need to get to
|
|
458
|
+ # try fetching them in a single batch request:
|
|
459
|
+ pending_directory_digests.append(directory_node.digest)
|
|
460
|
+ pending_directory_paths[directory_hash] = directory_path
|
|
461
|
+
|
|
462
|
+ if pending_directory_paths:
|
|
463
|
+ fetched_blobs = self._fetch_blob_batch(pending_directory_digests)
|
|
464
|
+
|
|
465
|
+ for directory_blob in fetched_blobs:
|
442
|
466
|
directory = remote_execution_pb2.Directory()
|
443
|
|
- directory.ParseFromString(self._fetch_blob(directory_node.digest))
|
|
467
|
+ directory.ParseFromString(directory_blob)
|
444
|
468
|
|
445
|
|
- os.makedirs(directory_path, exist_ok=True)
|
|
469
|
+ directory_hash = create_digest(directory_blob).hash
|
|
470
|
+ directory_path = pending_directory_paths[directory_hash]
|
446
|
471
|
|
447
|
|
- self._write_directory(directory, directory_path,
|
448
|
|
- directories=directories, root_barrier=root_barrier)
|
|
472
|
+ self._write_directory(directory, directory_path,
|
|
473
|
+ directories=directories,
|
|
474
|
+ root_barrier=root_barrier)
|
449
|
475
|
|
|
476
|
+ # iii) Symlinks:
|
450
|
477
|
for symlink_node in root_directory.symlinks:
|
451
|
478
|
symlink_path = os.path.join(root_path, symlink_node.name)
|
452
|
479
|
if not os.path.isabs(symlink_node.target):
|