... |
... |
@@ -24,11 +24,10 @@ from buildgrid._exceptions import NotFoundError |
24
|
24
|
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
25
|
25
|
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
26
|
26
|
from buildgrid._protos.google.rpc import code_pb2
|
27
|
|
-from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
|
27
|
+from buildgrid.client.capabilities import CapabilitiesInterface
|
|
28
|
+from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT, BATCH_REQUEST_SIZE_THRESHOLD
|
28
|
29
|
from buildgrid.utils import create_digest, merkle_tree_maker
|
29
|
30
|
|
30
|
|
-# Maximum size for a queueable file:
|
31
|
|
-FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
|
32
|
31
|
|
33
|
32
|
_FileRequest = namedtuple('FileRequest', ['digest', 'output_paths'])
|
34
|
33
|
|
... |
... |
@@ -50,6 +49,67 @@ class _CallCache: |
50
|
49
|
return name in cls.__calls[channel]
|
51
|
50
|
|
52
|
51
|
|
|
52
|
+class _CasBatchRequestSizesCache:
|
|
53
|
+ """Cache that stores, for each remote, the limit of bytes that can
|
|
54
|
+ be transferred using batches as well as a threshold that determines
|
|
55
|
+ when a file can be fetched as part of a batch request.
|
|
56
|
+ """
|
|
57
|
+ __cas_max_batch_transfer_size = {}
|
|
58
|
+ __cas_batch_request_size_threshold = {}
|
|
59
|
+
|
|
60
|
+ @classmethod
|
|
61
|
+ def max_effective_batch_size_bytes(cls, channel, instance_name):
|
|
62
|
+ """Returns the maximum number of bytes that can be transferred
|
|
63
|
+ using batch methods for the given remote.
|
|
64
|
+ """
|
|
65
|
+ if channel not in cls.__cas_max_batch_transfer_size:
|
|
66
|
+ cls.__cas_max_batch_transfer_size[channel] = dict()
|
|
67
|
+
|
|
68
|
+ if instance_name not in cls.__cas_max_batch_transfer_size[channel]:
|
|
69
|
+ max_batch_size = cls._get_server_max_batch_total_size_bytes(channel,
|
|
70
|
+ instance_name)
|
|
71
|
+
|
|
72
|
+ cls.__cas_max_batch_transfer_size[channel][instance_name] = max_batch_size
|
|
73
|
+
|
|
74
|
+ return cls.__cas_max_batch_transfer_size[channel][instance_name]
|
|
75
|
+
|
|
76
|
+ @classmethod
|
|
77
|
+ def batch_request_size_threshold(cls, channel, instance_name):
|
|
78
|
+ if channel not in cls.__cas_batch_request_size_threshold:
|
|
79
|
+ cls.__cas_batch_request_size_threshold[channel] = dict()
|
|
80
|
+
|
|
81
|
+ if instance_name not in cls.__cas_batch_request_size_threshold[channel]:
|
|
82
|
+ # Computing the threshold:
|
|
83
|
+ max_batch_size = cls.max_effective_batch_size_bytes(channel,
|
|
84
|
+ instance_name)
|
|
85
|
+ threshold = BATCH_REQUEST_SIZE_THRESHOLD * max_batch_size
|
|
86
|
+
|
|
87
|
+ cls.__cas_batch_request_size_threshold[channel][instance_name] = threshold
|
|
88
|
+
|
|
89
|
+ return cls.__cas_batch_request_size_threshold[channel][instance_name]
|
|
90
|
+
|
|
91
|
+ @classmethod
|
|
92
|
+ def _get_server_max_batch_total_size_bytes(cls, channel, instance_name):
|
|
93
|
+ """Returns the maximum number of bytes that can be effectively
|
|
94
|
+ transferred using batches, considering the limits imposed by
|
|
95
|
+ the server's configuration and by gRPC.
|
|
96
|
+ """
|
|
97
|
+ try:
|
|
98
|
+ capabilities_interface = CapabilitiesInterface(channel)
|
|
99
|
+ server_capabilities = capabilities_interface.get_capabilities(instance_name)
|
|
100
|
+
|
|
101
|
+ cache_capabilities = server_capabilities.cache_capabilities
|
|
102
|
+
|
|
103
|
+ max_batch_total_size = cache_capabilities.max_batch_total_size_bytes
|
|
104
|
+ # The server could set this value to 0 (no limit set).
|
|
105
|
+ if max_batch_total_size:
|
|
106
|
+ return min(max_batch_total_size, MAX_REQUEST_SIZE)
|
|
107
|
+ except Exception:
|
|
108
|
+ pass
|
|
109
|
+
|
|
110
|
+ return MAX_REQUEST_SIZE
|
|
111
|
+
|
|
112
|
+
|
53
|
113
|
@contextmanager
|
54
|
114
|
def download(channel, instance=None, u_uid=None):
|
55
|
115
|
"""Context manager generator for the :class:`Downloader` class."""
|
... |
... |
@@ -189,7 +249,7 @@ class Downloader: |
189
|
249
|
if not os.path.isabs(file_path):
|
190
|
250
|
file_path = os.path.abspath(file_path)
|
191
|
251
|
|
192
|
|
- if not queue or digest.size_bytes > FILE_SIZE_THRESHOLD:
|
|
252
|
+ if not queue or digest.size_bytes > self._queueable_file_size_threshold():
|
193
|
253
|
self._fetch_file(digest, file_path, is_executable=is_executable)
|
194
|
254
|
else:
|
195
|
255
|
self._queue_file(digest, file_path, is_executable=is_executable)
|
... |
... |
@@ -334,9 +394,11 @@ class Downloader: |
334
|
394
|
|
335
|
395
|
def _queue_file(self, digest, file_path, is_executable=False):
|
336
|
396
|
"""Queues a file for later batch download"""
|
337
|
|
- if self.__file_request_size + digest.ByteSize() > MAX_REQUEST_SIZE:
|
|
397
|
+ batch_size_limit = self._max_effective_batch_size_bytes()
|
|
398
|
+
|
|
399
|
+ if self.__file_request_size + digest.ByteSize() > batch_size_limit:
|
338
|
400
|
self.flush()
|
339
|
|
- elif self.__file_response_size + digest.size_bytes > MAX_REQUEST_SIZE:
|
|
401
|
+ elif self.__file_response_size + digest.size_bytes > batch_size_limit:
|
340
|
402
|
self.flush()
|
341
|
403
|
elif self.__file_request_count >= MAX_REQUEST_COUNT:
|
342
|
404
|
self.flush()
|
... |
... |
@@ -498,6 +560,20 @@ class Downloader: |
498
|
560
|
|
499
|
561
|
os.symlink(symlink_path, target_path)
|
500
|
562
|
|
|
563
|
+ def _max_effective_batch_size_bytes(self):
|
|
564
|
+ """Returns the effective maximum number of bytes that can be
|
|
565
|
+ transferred using batches, considering gRPC maximum message size.
|
|
566
|
+ """
|
|
567
|
+ return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel,
|
|
568
|
+ self.instance_name)
|
|
569
|
+
|
|
570
|
+ def _queueable_file_size_threshold(self):
|
|
571
|
+ """Returns the size limit up until which files can be queued to
|
|
572
|
+ be requested in a batch.
|
|
573
|
+ """
|
|
574
|
+ return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel,
|
|
575
|
+ self.instance_name)
|
|
576
|
+
|
501
|
577
|
|
502
|
578
|
@contextmanager
|
503
|
579
|
def upload(channel, instance=None, u_uid=None):
|
... |
... |
@@ -563,7 +639,8 @@ class Uploader: |
563
|
639
|
Returns:
|
564
|
640
|
:obj:`Digest`: the sent blob's digest.
|
565
|
641
|
"""
|
566
|
|
- if not queue or len(blob) > FILE_SIZE_THRESHOLD:
|
|
642
|
+
|
|
643
|
+ if not queue or len(blob) > self._queueable_file_size_threshold():
|
567
|
644
|
blob_digest = self._send_blob(blob, digest=digest)
|
568
|
645
|
else:
|
569
|
646
|
blob_digest = self._queue_blob(blob, digest=digest)
|
... |
... |
@@ -589,7 +666,7 @@ class Uploader: |
589
|
666
|
"""
|
590
|
667
|
message_blob = message.SerializeToString()
|
591
|
668
|
|
592
|
|
- if not queue or len(message_blob) > FILE_SIZE_THRESHOLD:
|
|
669
|
+ if not queue or len(message_blob) > self._queueable_file_size_threshold():
|
593
|
670
|
message_digest = self._send_blob(message_blob, digest=digest)
|
594
|
671
|
else:
|
595
|
672
|
message_digest = self._queue_blob(message_blob, digest=digest)
|
... |
... |
@@ -622,7 +699,7 @@ class Uploader: |
622
|
699
|
with open(file_path, 'rb') as bytes_steam:
|
623
|
700
|
file_bytes = bytes_steam.read()
|
624
|
701
|
|
625
|
|
- if not queue or len(file_bytes) > FILE_SIZE_THRESHOLD:
|
|
702
|
+ if not queue or len(file_bytes) > self._queueable_file_size_threshold():
|
626
|
703
|
file_digest = self._send_blob(file_bytes)
|
627
|
704
|
else:
|
628
|
705
|
file_digest = self._queue_blob(file_bytes)
|
... |
... |
@@ -795,7 +872,12 @@ class Uploader: |
795
|
872
|
blob_digest.hash = HASH(blob).hexdigest()
|
796
|
873
|
blob_digest.size_bytes = len(blob)
|
797
|
874
|
|
798
|
|
- if self.__request_size + blob_digest.size_bytes > MAX_REQUEST_SIZE:
|
|
875
|
+ # If we are here queueing a file we know that its size is
|
|
876
|
+ # smaller than gRPC's message size limit.
|
|
877
|
+ # We'll make a single batch request as big as the server allows.
|
|
878
|
+ batch_size_limit = self._max_effective_batch_size_bytes()
|
|
879
|
+
|
|
880
|
+ if self.__request_size + blob_digest.size_bytes > batch_size_limit:
|
799
|
881
|
self.flush()
|
800
|
882
|
elif self.__request_count >= MAX_REQUEST_COUNT:
|
801
|
883
|
self.flush()
|
... |
... |
@@ -851,3 +933,17 @@ class Uploader: |
851
|
933
|
written_digests.append(self._send_blob(blob, digest=digest))
|
852
|
934
|
|
853
|
935
|
return written_digests
|
|
936
|
+
|
|
937
|
+ def _max_effective_batch_size_bytes(self):
|
|
938
|
+ """Returns the effective maximum number of bytes that can be
|
|
939
|
+ transferred using batches, considering gRPC maximum message size.
|
|
940
|
+ """
|
|
941
|
+ return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel,
|
|
942
|
+ self.instance_name)
|
|
943
|
+
|
|
944
|
+ def _queueable_file_size_threshold(self):
|
|
945
|
+ """Returns the size limit up until which files can be queued to
|
|
946
|
+ be requested in a batch.
|
|
947
|
+ """
|
|
948
|
+ return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel,
|
|
949
|
+ self.instance_name)
|