... |
... |
@@ -13,9 +13,10 @@ |
13
|
13
|
# limitations under the License.
|
14
|
14
|
|
15
|
15
|
|
16
|
|
-from contextlib import contextmanager
|
17
|
|
-import uuid
|
18
|
16
|
import os
|
|
17
|
+import uuid
|
|
18
|
+from collections import namedtuple
|
|
19
|
+from contextlib import contextmanager
|
19
|
20
|
|
20
|
21
|
import grpc
|
21
|
22
|
|
... |
... |
@@ -30,6 +31,8 @@ from buildgrid.utils import merkle_tree_maker |
30
|
31
|
# Maximum size for a queueable file:
|
31
|
32
|
FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
|
32
|
33
|
|
|
34
|
+_FileRequest = namedtuple('FileRequest', ['digest', 'output_paths'])
|
|
35
|
+
|
33
|
36
|
|
34
|
37
|
class _CallCache:
|
35
|
38
|
"""Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
|
... |
... |
@@ -339,24 +342,41 @@ class Downloader: |
339
|
342
|
elif self.__file_request_count >= MAX_REQUEST_COUNT:
|
340
|
343
|
self.flush()
|
341
|
344
|
|
342
|
|
- self.__file_requests[digest.hash] = (digest, file_path, is_executable)
|
343
|
|
- self.__file_request_count += 1
|
344
|
|
- self.__file_request_size += digest.ByteSize()
|
345
|
|
- self.__file_response_size += digest.size_bytes
|
|
345
|
+ output_path = (file_path, is_executable)
|
|
346
|
+
|
|
347
|
+ # When queueing a file we take into account the cases where
|
|
348
|
+ # we might want to download the same digest to different paths.
|
|
349
|
+ if digest.hash not in self.__file_requests:
|
|
350
|
+ request = _FileRequest(digest=digest, output_paths=[output_path])
|
|
351
|
+ self.__file_requests[digest.hash] = request
|
346
|
352
|
|
347
|
|
- def _fetch_file_batch(self, batch):
|
348
|
|
- """Sends queued data using ContentAddressableStorage.BatchReadBlobs()"""
|
349
|
|
- batch_digests = [digest for digest, _, _ in batch.values()]
|
|
353
|
+ self.__file_request_count += 1
|
|
354
|
+ self.__file_request_size += digest.ByteSize()
|
|
355
|
+ self.__file_response_size += digest.size_bytes
|
|
356
|
+ else:
|
|
357
|
+ # We already have that hash queued; we'll fetch the blob
|
|
358
|
+ # once and write copies of it:
|
|
359
|
+ self.__file_requests[digest.hash].output_paths.append(output_path)
|
|
360
|
+
|
|
361
|
+ def _fetch_file_batch(self, requests):
|
|
362
|
+ """Sends queued data using ContentAddressableStorage.BatchReadBlobs().
|
|
363
|
+
|
|
364
|
+ Takes a dictionary (digest.hash, _FileRequest) as input.
|
|
365
|
+ """
|
|
366
|
+ batch_digests = [request.digest for request in requests.values()]
|
350
|
367
|
batch_blobs = self._fetch_blob_batch(batch_digests)
|
351
|
368
|
|
352
|
|
- for (_, file_path, is_executable), file_blob in zip(batch.values(), batch_blobs):
|
353
|
|
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
|
369
|
+ for file_digest, file_blob in zip(batch_digests, batch_blobs):
|
|
370
|
+ output_paths = requests[file_digest.hash].output_paths
|
|
371
|
+
|
|
372
|
+ for (file_path, is_executable) in output_paths:
|
|
373
|
+ os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
354
|
374
|
|
355
|
|
- with open(file_path, 'wb') as byte_file:
|
356
|
|
- byte_file.write(file_blob)
|
|
375
|
+ with open(file_path, 'wb') as byte_file:
|
|
376
|
+ byte_file.write(file_blob)
|
357
|
377
|
|
358
|
|
- if is_executable:
|
359
|
|
- os.chmod(file_path, 0o755) # rwxr-xr-x / 755
|
|
378
|
+ if is_executable:
|
|
379
|
+ os.chmod(file_path, 0o755) # rwxr-xr-x / 755
|
360
|
380
|
|
361
|
381
|
def _fetch_directory(self, digest, directory_path):
|
362
|
382
|
"""Fetches a file using ByteStream.GetTree()"""
|