... |
... |
@@ -24,11 +24,10 @@ from buildgrid._exceptions import NotFoundError |
24
|
24
|
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
25
|
25
|
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
26
|
26
|
from buildgrid._protos.google.rpc import code_pb2
|
27
|
|
-from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
|
27
|
+from buildgrid.client.capabilities import CapabilitiesInterface
|
|
28
|
+from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT, BATCH_REQUEST_SIZE_THRESHOLD
|
28
|
29
|
from buildgrid.utils import create_digest, merkle_tree_maker
|
29
|
30
|
|
30
|
|
-# Maximum size for a queueable file:
|
31
|
|
-FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
|
32
|
31
|
|
33
|
32
|
_FileRequest = namedtuple('FileRequest', ['digest', 'output_paths'])
|
34
|
33
|
|
... |
... |
@@ -50,6 +49,58 @@ class _CallCache: |
50
|
49
|
return name in cls.__calls[channel]
|
51
|
50
|
|
52
|
51
|
|
|
52
|
+class _CasMaxBatchSizeCache:
|
|
53
|
+ """Cache that stores, for each remote, the limit of bytes that can
|
|
54
|
+ be transferred using batches.
|
|
55
|
+ """
|
|
56
|
+ __cas_max_batch_transfer_size = {}
|
|
57
|
+
|
|
58
|
+ @classmethod
|
|
59
|
+ def max_batch_total_size_bytes(cls, channel, instance_name):
|
|
60
|
+ """Returns the maximum number of bytes that can be transferred
|
|
61
|
+ using batch methods for the given remote.
|
|
62
|
+ """
|
|
63
|
+ if channel not in cls.__cas_max_batch_transfer_size:
|
|
64
|
+ max_batch_size = cls._get_server_max_batch_total_size_bytes(channel,
|
|
65
|
+ instance_name)
|
|
66
|
+ cls.__cas_max_batch_transfer_size[channel] = max_batch_size
|
|
67
|
+
|
|
68
|
+ return cls.__cas_max_batch_transfer_size[channel]
|
|
69
|
+
|
|
70
|
+ @classmethod
|
|
71
|
+ def _get_server_max_batch_total_size_bytes(cls, channel, instance_name):
|
|
72
|
+ """Attempts to get the maximum size for batch transfers from
|
|
73
|
+ the server. If the server does not implement `GetCapabilities`,
|
|
74
|
+ defaults to the configured maximum request size.
|
|
75
|
+ """
|
|
76
|
+ try:
|
|
77
|
+ capabilities_interface = CapabilitiesInterface(channel)
|
|
78
|
+ server_capabilities = capabilities_interface.get_capabilities(instance_name)
|
|
79
|
+
|
|
80
|
+ cache_capabilities = server_capabilities.cache_capabilities
|
|
81
|
+
|
|
82
|
+ max_batch_total_size = cache_capabilities.max_batch_total_size_bytes
|
|
83
|
+ # The server could set this value to 0 (no limit set).
|
|
84
|
+ if max_batch_total_size:
|
|
85
|
+ return max_batch_total_size
|
|
86
|
+ except Exception:
|
|
87
|
+ pass
|
|
88
|
+
|
|
89
|
+ return MAX_REQUEST_SIZE
|
|
90
|
+
|
|
91
|
+ @classmethod
|
|
92
|
+ def max_effective_batch_size_bytes(cls, channel, instance_name):
|
|
93
|
+ """Returns the maximum number of bytes that can be effectively
|
|
94
|
+ transferred using batches, considering the limits imposed by
|
|
95
|
+ the server's configuration and by gRPC.
|
|
96
|
+ """
|
|
97
|
+ server_limit = _CasMaxBatchSizeCache.max_batch_total_size_bytes(channel,
|
|
98
|
+ instance_name)
|
|
99
|
+ grpc_limit = MAX_REQUEST_SIZE
|
|
100
|
+
|
|
101
|
+ return min(server_limit, grpc_limit)
|
|
102
|
+
|
|
103
|
+
|
53
|
104
|
@contextmanager
|
54
|
105
|
def download(channel, instance=None, u_uid=None):
|
55
|
106
|
"""Context manager generator for the :class:`Downloader` class."""
|
... |
... |
@@ -189,7 +240,10 @@ class Downloader: |
189
|
240
|
if not os.path.isabs(file_path):
|
190
|
241
|
file_path = os.path.abspath(file_path)
|
191
|
242
|
|
192
|
|
- if not queue or digest.size_bytes > FILE_SIZE_THRESHOLD:
|
|
243
|
+ assert (0 < BATCH_REQUEST_SIZE_THRESHOLD <= 1.0)
|
|
244
|
+ queueable_limit = BATCH_REQUEST_SIZE_THRESHOLD * self._max_effective_batch_size_bytes()
|
|
245
|
+
|
|
246
|
+ if not queue or digest.size_bytes > queueable_limit:
|
193
|
247
|
self._fetch_file(digest, file_path, is_executable=is_executable)
|
194
|
248
|
else:
|
195
|
249
|
self._queue_file(digest, file_path, is_executable=is_executable)
|
... |
... |
@@ -334,9 +388,11 @@ class Downloader: |
334
|
388
|
|
335
|
389
|
def _queue_file(self, digest, file_path, is_executable=False):
|
336
|
390
|
"""Queues a file for later batch download"""
|
337
|
|
- if self.__file_request_size + digest.ByteSize() > MAX_REQUEST_SIZE:
|
|
391
|
+ batch_size_limit = self._max_effective_batch_size_bytes()
|
|
392
|
+
|
|
393
|
+ if self.__file_request_size + digest.ByteSize() > batch_size_limit:
|
338
|
394
|
self.flush()
|
339
|
|
- elif self.__file_response_size + digest.size_bytes > MAX_REQUEST_SIZE:
|
|
395
|
+ elif self.__file_response_size + digest.size_bytes > batch_size_limit:
|
340
|
396
|
self.flush()
|
341
|
397
|
elif self.__file_request_count >= MAX_REQUEST_COUNT:
|
342
|
398
|
self.flush()
|
... |
... |
@@ -498,6 +554,20 @@ class Downloader: |
498
|
554
|
|
499
|
555
|
os.symlink(symlink_path, target_path)
|
500
|
556
|
|
|
557
|
+ def _max_batch_size_bytes(self):
|
|
558
|
+ """Returns the maximum number of bytes that the server allows
|
|
559
|
+ transferring using batches.
|
|
560
|
+ """
|
|
561
|
+ return _CasMaxBatchSizeCache.max_batch_total_size_bytes(self.channel,
|
|
562
|
+ self.instance_name)
|
|
563
|
+
|
|
564
|
+ def _max_effective_batch_size_bytes(self):
|
|
565
|
+ """Returns the effective maximum number of bytes that can be
|
|
566
|
+ transferred using batches, considering gRPC maximum message size.
|
|
567
|
+ """
|
|
568
|
+ return _CasMaxBatchSizeCache.max_effective_batch_size_bytes(self.channel,
|
|
569
|
+ self.instance_name)
|
|
570
|
+
|
501
|
571
|
|
502
|
572
|
@contextmanager
|
503
|
573
|
def upload(channel, instance=None, u_uid=None):
|
... |
... |
@@ -563,7 +633,11 @@ class Uploader: |
563
|
633
|
Returns:
|
564
|
634
|
:obj:`Digest`: the sent blob's digest.
|
565
|
635
|
"""
|
566
|
|
- if not queue or len(blob) > FILE_SIZE_THRESHOLD:
|
|
636
|
+
|
|
637
|
+ assert (0 < BATCH_REQUEST_SIZE_THRESHOLD <= 1.0)
|
|
638
|
+ queueable_limit = BATCH_REQUEST_SIZE_THRESHOLD * self._max_effective_batch_size_bytes()
|
|
639
|
+
|
|
640
|
+ if not queue or len(blob) > queueable_limit:
|
567
|
641
|
blob_digest = self._send_blob(blob, digest=digest)
|
568
|
642
|
else:
|
569
|
643
|
blob_digest = self._queue_blob(blob, digest=digest)
|
... |
... |
@@ -589,7 +663,10 @@ class Uploader: |
589
|
663
|
"""
|
590
|
664
|
message_blob = message.SerializeToString()
|
591
|
665
|
|
592
|
|
- if not queue or len(message_blob) > FILE_SIZE_THRESHOLD:
|
|
666
|
+ assert (0 < BATCH_REQUEST_SIZE_THRESHOLD <= 1.0)
|
|
667
|
+ queueable_limit = BATCH_REQUEST_SIZE_THRESHOLD * self._max_effective_batch_size_bytes()
|
|
668
|
+
|
|
669
|
+ if not queue or len(message_blob) > queueable_limit:
|
593
|
670
|
message_digest = self._send_blob(message_blob, digest=digest)
|
594
|
671
|
else:
|
595
|
672
|
message_digest = self._queue_blob(message_blob, digest=digest)
|
... |
... |
@@ -622,7 +699,7 @@ class Uploader: |
622
|
699
|
with open(file_path, 'rb') as bytes_steam:
|
623
|
700
|
file_bytes = bytes_steam.read()
|
624
|
701
|
|
625
|
|
- if not queue or len(file_bytes) > FILE_SIZE_THRESHOLD:
|
|
702
|
+ if not queue or len(file_bytes) > self._max_effective_batch_size_bytes():
|
626
|
703
|
file_digest = self._send_blob(file_bytes)
|
627
|
704
|
else:
|
628
|
705
|
file_digest = self._queue_blob(file_bytes)
|
... |
... |
@@ -795,7 +872,12 @@ class Uploader: |
795
|
872
|
blob_digest.hash = HASH(blob).hexdigest()
|
796
|
873
|
blob_digest.size_bytes = len(blob)
|
797
|
874
|
|
798
|
|
- if self.__request_size + blob_digest.size_bytes > MAX_REQUEST_SIZE:
|
|
875
|
+ # If we are here queueing a file we know that its size is
|
|
876
|
+ # smaller than gRPC's message size limit.
|
|
877
|
+ # We'll make a single batch request as big as the server allows.
|
|
878
|
+ batch_size_limit = self._max_batch_size_bytes()
|
|
879
|
+
|
|
880
|
+ if self.__request_size + blob_digest.size_bytes > batch_size_limit:
|
799
|
881
|
self.flush()
|
800
|
882
|
elif self.__request_count >= MAX_REQUEST_COUNT:
|
801
|
883
|
self.flush()
|
... |
... |
@@ -851,3 +933,17 @@ class Uploader: |
851
|
933
|
written_digests.append(self._send_blob(blob, digest=digest))
|
852
|
934
|
|
853
|
935
|
return written_digests
|
|
936
|
+
|
|
937
|
+ def _max_batch_size_bytes(self):
|
|
938
|
+ """Returns the maximum number of bytes that the server allows
|
|
939
|
+ transferring using batches.
|
|
940
|
+ """
|
|
941
|
+ return _CasMaxBatchSizeCache.max_batch_total_size_bytes(self.channel,
|
|
942
|
+ self.instance_name)
|
|
943
|
+
|
|
944
|
+ def _max_effective_batch_size_bytes(self):
|
|
945
|
+ """Returns the effective maximum number of bytes that can be
|
|
946
|
+ transferred using batches, considering gRPC maximum message size.
|
|
947
|
+ """
|
|
948
|
+ return _CasMaxBatchSizeCache.max_effective_batch_size_bytes(self.channel,
|
|
949
|
+ self.instance_name)
|