... |
... |
@@ -24,11 +24,10 @@ from buildgrid._exceptions import NotFoundError |
24
|
24
|
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
25
|
25
|
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
26
|
26
|
from buildgrid._protos.google.rpc import code_pb2
|
27
|
|
-from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
|
27
|
+from buildgrid.client.capabilities import CapabilitiesInterface
|
|
28
|
+from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT, BATCH_REQUEST_SIZE_THRESHOLD
|
28
|
29
|
from buildgrid.utils import create_digest, merkle_tree_maker
|
29
|
30
|
|
30
|
|
-# Maximum size for a queueable file:
|
31
|
|
-FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
|
32
|
31
|
|
33
|
32
|
_FileRequest = namedtuple('FileRequest', ['digest', 'output_paths'])
|
34
|
33
|
|
... |
... |
@@ -50,6 +49,80 @@ class _CallCache: |
50
|
49
|
return name in cls.__calls[channel]
|
51
|
50
|
|
52
|
51
|
|
|
52
|
+class _CasBatchRequestSizesCache:
|
|
53
|
+ """Cache that stores, for each remote, the limit of bytes that can
|
|
54
|
+ be transferred using batches as well as a threshold that determines
|
|
55
|
+ when a file can be fetched as part of a batch request.
|
|
56
|
+ """
|
|
57
|
+ __cas_max_batch_transfer_size = {}
|
|
58
|
+ __cas_batch_request_size_threshold = {}
|
|
59
|
+
|
|
60
|
+ @classmethod
|
|
61
|
+ def max_effective_batch_size_bytes(cls, channel, instance_name):
|
|
62
|
+ """Returns the maximum number of bytes that can be transferred
|
|
63
|
+ using batch methods for the given remote.
|
|
64
|
+ """
|
|
65
|
+ if channel not in cls.__cas_max_batch_transfer_size:
|
|
66
|
+ cls.__cas_max_batch_transfer_size[channel] = dict()
|
|
67
|
+
|
|
68
|
+ if instance_name not in cls.__cas_max_batch_transfer_size[channel]:
|
|
69
|
+ # Fetching the value from the server:
|
|
70
|
+ max_batch_size = cls._max_effective_batch_size_bytes(channel,
|
|
71
|
+ instance_name)
|
|
72
|
+
|
|
73
|
+ cls.__cas_max_batch_transfer_size[channel][instance_name] = max_batch_size
|
|
74
|
+
|
|
75
|
+ return cls.__cas_max_batch_transfer_size[channel][instance_name]
|
|
76
|
+
|
|
77
|
+ @classmethod
|
|
78
|
+ def batch_request_size_threshold(cls, channel, instance_name):
|
|
79
|
+ if channel not in cls.__cas_batch_request_size_threshold:
|
|
80
|
+ cls.__cas_batch_request_size_threshold[channel] = dict()
|
|
81
|
+
|
|
82
|
+ if instance_name not in cls.__cas_batch_request_size_threshold[channel]:
|
|
83
|
+ # Computing the threshold:
|
|
84
|
+ max_batch_size = cls.max_effective_batch_size_bytes(channel,
|
|
85
|
+ instance_name)
|
|
86
|
+ threshold = BATCH_REQUEST_SIZE_THRESHOLD * max_batch_size
|
|
87
|
+
|
|
88
|
+ cls.__cas_batch_request_size_threshold[channel][instance_name] = threshold
|
|
89
|
+
|
|
90
|
+ return cls.__cas_batch_request_size_threshold[channel][instance_name]
|
|
91
|
+
|
|
92
|
+ @classmethod
|
|
93
|
+ def _max_effective_batch_size_bytes(cls, channel, instance_name):
|
|
94
|
+ """Returns the maximum number of bytes that can be effectively
|
|
95
|
+ transferred using batches, considering the limits imposed by
|
|
96
|
+ the server's configuration and by gRPC.
|
|
97
|
+ """
|
|
98
|
+ server_limit = cls._get_server_max_batch_total_size_bytes(channel,
|
|
99
|
+ instance_name)
|
|
100
|
+ grpc_limit = MAX_REQUEST_SIZE
|
|
101
|
+
|
|
102
|
+ return min(server_limit, grpc_limit)
|
|
103
|
+
|
|
104
|
+ @classmethod
|
|
105
|
+ def _get_server_max_batch_total_size_bytes(cls, channel, instance_name):
|
|
106
|
+ """Attempts to get the maximum size for batch transfers from
|
|
107
|
+ the server. If the server does not implement `GetCapabilities`,
|
|
108
|
+ defaults to the configured maximum request size.
|
|
109
|
+ """
|
|
110
|
+ try:
|
|
111
|
+ capabilities_interface = CapabilitiesInterface(channel)
|
|
112
|
+ server_capabilities = capabilities_interface.get_capabilities(instance_name)
|
|
113
|
+
|
|
114
|
+ cache_capabilities = server_capabilities.cache_capabilities
|
|
115
|
+
|
|
116
|
+ max_batch_total_size = cache_capabilities.max_batch_total_size_bytes
|
|
117
|
+ # The server could set this value to 0 (no limit set).
|
|
118
|
+ if max_batch_total_size:
|
|
119
|
+ return max_batch_total_size
|
|
120
|
+ except Exception:
|
|
121
|
+ pass
|
|
122
|
+
|
|
123
|
+ return MAX_REQUEST_SIZE
|
|
124
|
+
|
|
125
|
+
|
53
|
126
|
@contextmanager
|
54
|
127
|
def download(channel, instance=None, u_uid=None):
|
55
|
128
|
"""Context manager generator for the :class:`Downloader` class."""
|
... |
... |
@@ -189,7 +262,7 @@ class Downloader: |
189
|
262
|
if not os.path.isabs(file_path):
|
190
|
263
|
file_path = os.path.abspath(file_path)
|
191
|
264
|
|
192
|
|
- if not queue or digest.size_bytes > FILE_SIZE_THRESHOLD:
|
|
265
|
+ if not queue or digest.size_bytes > self._queueable_file_size_threshold():
|
193
|
266
|
self._fetch_file(digest, file_path, is_executable=is_executable)
|
194
|
267
|
else:
|
195
|
268
|
self._queue_file(digest, file_path, is_executable=is_executable)
|
... |
... |
@@ -334,9 +407,11 @@ class Downloader: |
334
|
407
|
|
335
|
408
|
def _queue_file(self, digest, file_path, is_executable=False):
|
336
|
409
|
"""Queues a file for later batch download"""
|
337
|
|
- if self.__file_request_size + digest.ByteSize() > MAX_REQUEST_SIZE:
|
|
410
|
+ batch_size_limit = self._max_effective_batch_size_bytes()
|
|
411
|
+
|
|
412
|
+ if self.__file_request_size + digest.ByteSize() > batch_size_limit:
|
338
|
413
|
self.flush()
|
339
|
|
- elif self.__file_response_size + digest.size_bytes > MAX_REQUEST_SIZE:
|
|
414
|
+ elif self.__file_response_size + digest.size_bytes > batch_size_limit:
|
340
|
415
|
self.flush()
|
341
|
416
|
elif self.__file_request_count >= MAX_REQUEST_COUNT:
|
342
|
417
|
self.flush()
|
... |
... |
@@ -498,6 +573,20 @@ class Downloader: |
498
|
573
|
|
499
|
574
|
os.symlink(symlink_path, target_path)
|
500
|
575
|
|
|
576
|
+ def _max_effective_batch_size_bytes(self):
|
|
577
|
+ """Returns the effective maximum number of bytes that can be
|
|
578
|
+ transferred using batches, considering gRPC maximum message size.
|
|
579
|
+ """
|
|
580
|
+ return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel,
|
|
581
|
+ self.instance_name)
|
|
582
|
+
|
|
583
|
+ def _queueable_file_size_threshold(self):
|
|
584
|
+ """Returns the size limit up until which files can be queued to
|
|
585
|
+ be requested in a batch.
|
|
586
|
+ """
|
|
587
|
+ return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel,
|
|
588
|
+ self.instance_name)
|
|
589
|
+
|
501
|
590
|
|
502
|
591
|
@contextmanager
|
503
|
592
|
def upload(channel, instance=None, u_uid=None):
|
... |
... |
@@ -563,7 +652,8 @@ class Uploader: |
563
|
652
|
Returns:
|
564
|
653
|
:obj:`Digest`: the sent blob's digest.
|
565
|
654
|
"""
|
566
|
|
- if not queue or len(blob) > FILE_SIZE_THRESHOLD:
|
|
655
|
+
|
|
656
|
+ if not queue or len(blob) > self._queueable_file_size_threshold():
|
567
|
657
|
blob_digest = self._send_blob(blob, digest=digest)
|
568
|
658
|
else:
|
569
|
659
|
blob_digest = self._queue_blob(blob, digest=digest)
|
... |
... |
@@ -589,7 +679,7 @@ class Uploader: |
589
|
679
|
"""
|
590
|
680
|
message_blob = message.SerializeToString()
|
591
|
681
|
|
592
|
|
- if not queue or len(message_blob) > FILE_SIZE_THRESHOLD:
|
|
682
|
+ if not queue or len(message_blob) > self._queueable_file_size_threshold():
|
593
|
683
|
message_digest = self._send_blob(message_blob, digest=digest)
|
594
|
684
|
else:
|
595
|
685
|
message_digest = self._queue_blob(message_blob, digest=digest)
|
... |
... |
@@ -622,7 +712,7 @@ class Uploader: |
622
|
712
|
with open(file_path, 'rb') as bytes_steam:
|
623
|
713
|
file_bytes = bytes_steam.read()
|
624
|
714
|
|
625
|
|
- if not queue or len(file_bytes) > FILE_SIZE_THRESHOLD:
|
|
715
|
+ if not queue or len(file_bytes) > self._queueable_file_size_threshold():
|
626
|
716
|
file_digest = self._send_blob(file_bytes)
|
627
|
717
|
else:
|
628
|
718
|
file_digest = self._queue_blob(file_bytes)
|
... |
... |
@@ -795,7 +885,12 @@ class Uploader: |
795
|
885
|
blob_digest.hash = HASH(blob).hexdigest()
|
796
|
886
|
blob_digest.size_bytes = len(blob)
|
797
|
887
|
|
798
|
|
- if self.__request_size + blob_digest.size_bytes > MAX_REQUEST_SIZE:
|
|
888
|
+ # If we are here queueing a file we know that its size is
|
|
889
|
+ # smaller than gRPC's message size limit.
|
|
890
|
+ # We'll make a single batch request as big as the server allows.
|
|
891
|
+ batch_size_limit = self._max_effective_batch_size_bytes()
|
|
892
|
+
|
|
893
|
+ if self.__request_size + blob_digest.size_bytes > batch_size_limit:
|
799
|
894
|
self.flush()
|
800
|
895
|
elif self.__request_count >= MAX_REQUEST_COUNT:
|
801
|
896
|
self.flush()
|
... |
... |
@@ -851,3 +946,17 @@ class Uploader: |
851
|
946
|
written_digests.append(self._send_blob(blob, digest=digest))
|
852
|
947
|
|
853
|
948
|
return written_digests
|
|
949
|
+
|
|
950
|
+ def _max_effective_batch_size_bytes(self):
|
|
951
|
+ """Returns the effective maximum number of bytes that can be
|
|
952
|
+ transferred using batches, considering gRPC maximum message size.
|
|
953
|
+ """
|
|
954
|
+ return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel,
|
|
955
|
+ self.instance_name)
|
|
956
|
+
|
|
957
|
+ def _queueable_file_size_threshold(self):
|
|
958
|
+ """Returns the size limit up until which files can be queued to
|
|
959
|
+ be requested in a batch.
|
|
960
|
+ """
|
|
961
|
+ return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel,
|
|
962
|
+ self.instance_name)
|