... |
... |
@@ -12,10 +12,10 @@ |
12
|
12
|
# See the License for the specific language governing permissions and
|
13
|
13
|
# limitations under the License.
|
14
|
14
|
|
15
|
|
-
|
|
15
|
+from collections import namedtuple
|
16
|
16
|
from contextlib import contextmanager
|
17
|
|
-import uuid
|
18
|
17
|
import os
|
|
18
|
+import uuid
|
19
|
19
|
|
20
|
20
|
import grpc
|
21
|
21
|
|
... |
... |
@@ -26,10 +26,11 @@ from buildgrid._protos.google.rpc import code_pb2 |
26
|
26
|
from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
27
|
27
|
from buildgrid.utils import merkle_tree_maker
|
28
|
28
|
|
29
|
|
-
|
30
|
29
|
# Maximum size for a queueable file:
|
31
|
30
|
FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
|
32
|
31
|
|
|
32
|
+_FileRequest = namedtuple('FileRequest', ['digest', 'output_paths'])
|
|
33
|
+
|
33
|
34
|
|
34
|
35
|
class _CallCache:
|
35
|
36
|
"""Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
|
... |
... |
@@ -339,24 +340,41 @@ class Downloader: |
339
|
340
|
elif self.__file_request_count >= MAX_REQUEST_COUNT:
|
340
|
341
|
self.flush()
|
341
|
342
|
|
342
|
|
- self.__file_requests[digest.hash] = (digest, file_path, is_executable)
|
343
|
|
- self.__file_request_count += 1
|
344
|
|
- self.__file_request_size += digest.ByteSize()
|
345
|
|
- self.__file_response_size += digest.size_bytes
|
|
343
|
+ output_path = (file_path, is_executable)
|
|
344
|
+
|
|
345
|
+ # When queueing a file we take into account the cases where
|
|
346
|
+ # we might want to download the same digest to different paths.
|
|
347
|
+ if digest.hash not in self.__file_requests:
|
|
348
|
+ request = _FileRequest(digest=digest, output_paths=[output_path])
|
|
349
|
+ self.__file_requests[digest.hash] = request
|
346
|
350
|
|
347
|
|
- def _fetch_file_batch(self, batch):
|
348
|
|
- """Sends queued data using ContentAddressableStorage.BatchReadBlobs()"""
|
349
|
|
- batch_digests = [digest for digest, _, _ in batch.values()]
|
|
351
|
+ self.__file_request_count += 1
|
|
352
|
+ self.__file_request_size += digest.ByteSize()
|
|
353
|
+ self.__file_response_size += digest.size_bytes
|
|
354
|
+ else:
|
|
355
|
+ # We already have that hash queued; we'll fetch the blob
|
|
356
|
+ # once and write copies of it:
|
|
357
|
+ self.__file_requests[digest.hash].output_paths.append(output_path)
|
|
358
|
+
|
|
359
|
+ def _fetch_file_batch(self, requests):
|
|
360
|
+ """Sends queued data using ContentAddressableStorage.BatchReadBlobs().
|
|
361
|
+
|
|
362
|
+ Takes a dictionary (digest.hash, _FileRequest) as input.
|
|
363
|
+ """
|
|
364
|
+ batch_digests = [request.digest for request in requests.values()]
|
350
|
365
|
batch_blobs = self._fetch_blob_batch(batch_digests)
|
351
|
366
|
|
352
|
|
- for (_, file_path, is_executable), file_blob in zip(batch.values(), batch_blobs):
|
353
|
|
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
|
367
|
+ for file_digest, file_blob in zip(batch_digests, batch_blobs):
|
|
368
|
+ output_paths = requests[file_digest.hash].output_paths
|
|
369
|
+
|
|
370
|
+ for file_path, is_executable in output_paths:
|
|
371
|
+ os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
354
|
372
|
|
355
|
|
- with open(file_path, 'wb') as byte_file:
|
356
|
|
- byte_file.write(file_blob)
|
|
373
|
+ with open(file_path, 'wb') as byte_file:
|
|
374
|
+ byte_file.write(file_blob)
|
357
|
375
|
|
358
|
|
- if is_executable:
|
359
|
|
- os.chmod(file_path, 0o755) # rwxr-xr-x / 755
|
|
376
|
+ if is_executable:
|
|
377
|
+ os.chmod(file_path, 0o755) # rwxr-xr-x / 755
|
360
|
378
|
|
361
|
379
|
def _fetch_directory(self, digest, directory_path):
|
362
|
380
|
"""Fetches a file using ByteStream.GetTree()"""
|