| ... | ... | @@ -24,11 +24,10 @@ from buildgrid._exceptions import NotFoundError | 
| 24 | 24 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | 
| 25 | 25 |  from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
 | 
| 26 | 26 |  from buildgrid._protos.google.rpc import code_pb2
 | 
| 27 |  | -from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
 | 
|  | 27 | +from buildgrid.client.capabilities import CapabilitiesInterface
 | 
|  | 28 | +from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT, BATCH_REQUEST_SIZE_THRESHOLD
 | 
| 28 | 29 |  from buildgrid.utils import create_digest, merkle_tree_maker
 | 
| 29 | 30 |  
 | 
| 30 |  | -# Maximum size for a queueable file:
 | 
| 31 |  | -FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
 | 
| 32 | 31 |  
 | 
| 33 | 32 |  _FileRequest = namedtuple('FileRequest', ['digest', 'output_paths'])
 | 
| 34 | 33 |  
 | 
| ... | ... | @@ -50,6 +49,67 @@ class _CallCache: | 
| 50 | 49 |          return name in cls.__calls[channel]
 | 
| 51 | 50 |  
 | 
| 52 | 51 |  
 | 
|  | 52 | +class _CasBatchRequestSizesCache:
 | 
|  | 53 | +    """Cache that stores, for each remote, the limit of bytes that can
 | 
|  | 54 | +    be transferred using batches as well as a threshold that determines
 | 
|  | 55 | +    when a file can be fetched as part of a batch request.
 | 
|  | 56 | +    """
 | 
|  | 57 | +    __cas_max_batch_transfer_size = {}
 | 
|  | 58 | +    __cas_batch_request_size_threshold = {}
 | 
|  | 59 | +
 | 
|  | 60 | +    @classmethod
 | 
|  | 61 | +    def max_effective_batch_size_bytes(cls, channel, instance_name):
 | 
|  | 62 | +        """Returns the maximum number of bytes that can be transferred
 | 
|  | 63 | +        using batch methods for the given remote.
 | 
|  | 64 | +        """
 | 
|  | 65 | +        if channel not in cls.__cas_max_batch_transfer_size:
 | 
|  | 66 | +            cls.__cas_max_batch_transfer_size[channel] = dict()
 | 
|  | 67 | +
 | 
|  | 68 | +        if instance_name not in cls.__cas_max_batch_transfer_size[channel]:
 | 
|  | 69 | +            max_batch_size = cls._get_server_max_batch_total_size_bytes(channel,
 | 
|  | 70 | +                                                                        instance_name)
 | 
|  | 71 | +
 | 
|  | 72 | +            cls.__cas_max_batch_transfer_size[channel][instance_name] = max_batch_size
 | 
|  | 73 | +
 | 
|  | 74 | +        return cls.__cas_max_batch_transfer_size[channel][instance_name]
 | 
|  | 75 | +
 | 
|  | 76 | +    @classmethod
 | 
|  | 77 | +    def batch_request_size_threshold(cls, channel, instance_name):
 | 
|  | 78 | +        if channel not in cls.__cas_batch_request_size_threshold:
 | 
|  | 79 | +            cls.__cas_batch_request_size_threshold[channel] = dict()
 | 
|  | 80 | +
 | 
|  | 81 | +        if instance_name not in cls.__cas_batch_request_size_threshold[channel]:
 | 
|  | 82 | +            # Computing the threshold:
 | 
|  | 83 | +            max_batch_size = cls.max_effective_batch_size_bytes(channel,
 | 
|  | 84 | +                                                                instance_name)
 | 
|  | 85 | +            threshold = BATCH_REQUEST_SIZE_THRESHOLD * max_batch_size
 | 
|  | 86 | +
 | 
|  | 87 | +            cls.__cas_batch_request_size_threshold[channel][instance_name] = threshold
 | 
|  | 88 | +
 | 
|  | 89 | +        return cls.__cas_batch_request_size_threshold[channel][instance_name]
 | 
|  | 90 | +
 | 
|  | 91 | +    @classmethod
 | 
|  | 92 | +    def _get_server_max_batch_total_size_bytes(cls, channel, instance_name):
 | 
|  | 93 | +        """Returns the maximum number of bytes that can be effectively
 | 
|  | 94 | +        transferred using batches, considering the limits imposed by
 | 
|  | 95 | +        the server's configuration and by gRPC.
 | 
|  | 96 | +        """
 | 
|  | 97 | +        try:
 | 
|  | 98 | +            capabilities_interface = CapabilitiesInterface(channel)
 | 
|  | 99 | +            server_capabilities = capabilities_interface.get_capabilities(instance_name)
 | 
|  | 100 | +
 | 
|  | 101 | +            cache_capabilities = server_capabilities.cache_capabilities
 | 
|  | 102 | +
 | 
|  | 103 | +            max_batch_total_size = cache_capabilities.max_batch_total_size_bytes
 | 
|  | 104 | +            # The server could set this value to 0 (no limit set).
 | 
|  | 105 | +            if max_batch_total_size:
 | 
|  | 106 | +                return min(max_batch_total_size, MAX_REQUEST_SIZE)
 | 
|  | 107 | +        except Exception:
 | 
|  | 108 | +            pass
 | 
|  | 109 | +
 | 
|  | 110 | +        return MAX_REQUEST_SIZE
 | 
|  | 111 | +
 | 
|  | 112 | +
 | 
| 53 | 113 |  @contextmanager
 | 
| 54 | 114 |  def download(channel, instance=None, u_uid=None):
 | 
| 55 | 115 |      """Context manager generator for the :class:`Downloader` class."""
 | 
| ... | ... | @@ -189,7 +249,7 @@ class Downloader: | 
| 189 | 249 |          if not os.path.isabs(file_path):
 | 
| 190 | 250 |              file_path = os.path.abspath(file_path)
 | 
| 191 | 251 |  
 | 
| 192 |  | -        if not queue or digest.size_bytes > FILE_SIZE_THRESHOLD:
 | 
|  | 252 | +        if not queue or digest.size_bytes > self._queueable_file_size_threshold():
 | 
| 193 | 253 |              self._fetch_file(digest, file_path, is_executable=is_executable)
 | 
| 194 | 254 |          else:
 | 
| 195 | 255 |              self._queue_file(digest, file_path, is_executable=is_executable)
 | 
| ... | ... | @@ -334,9 +394,11 @@ class Downloader: | 
| 334 | 394 |  
 | 
| 335 | 395 |      def _queue_file(self, digest, file_path, is_executable=False):
 | 
| 336 | 396 |          """Queues a file for later batch download"""
 | 
| 337 |  | -        if self.__file_request_size + digest.ByteSize() > MAX_REQUEST_SIZE:
 | 
|  | 397 | +        batch_size_limit = self._max_effective_batch_size_bytes()
 | 
|  | 398 | +
 | 
|  | 399 | +        if self.__file_request_size + digest.ByteSize() > batch_size_limit:
 | 
| 338 | 400 |              self.flush()
 | 
| 339 |  | -        elif self.__file_response_size + digest.size_bytes > MAX_REQUEST_SIZE:
 | 
|  | 401 | +        elif self.__file_response_size + digest.size_bytes > batch_size_limit:
 | 
| 340 | 402 |              self.flush()
 | 
| 341 | 403 |          elif self.__file_request_count >= MAX_REQUEST_COUNT:
 | 
| 342 | 404 |              self.flush()
 | 
| ... | ... | @@ -498,6 +560,20 @@ class Downloader: | 
| 498 | 560 |  
 | 
| 499 | 561 |              os.symlink(symlink_path, target_path)
 | 
| 500 | 562 |  
 | 
|  | 563 | +    def _max_effective_batch_size_bytes(self):
 | 
|  | 564 | +        """Returns the effective maximum number of bytes that can be
 | 
|  | 565 | +        transferred using batches, considering gRPC maximum message size.
 | 
|  | 566 | +        """
 | 
|  | 567 | +        return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel,
 | 
|  | 568 | +                                                                         self.instance_name)
 | 
|  | 569 | +
 | 
|  | 570 | +    def _queueable_file_size_threshold(self):
 | 
|  | 571 | +        """Returns the size limit up until which files can be queued to
 | 
|  | 572 | +        be requested in a batch.
 | 
|  | 573 | +        """
 | 
|  | 574 | +        return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel,
 | 
|  | 575 | +                                                                       self.instance_name)
 | 
|  | 576 | +
 | 
| 501 | 577 |  
 | 
| 502 | 578 |  @contextmanager
 | 
| 503 | 579 |  def upload(channel, instance=None, u_uid=None):
 | 
| ... | ... | @@ -563,7 +639,8 @@ class Uploader: | 
| 563 | 639 |          Returns:
 | 
| 564 | 640 |              :obj:`Digest`: the sent blob's digest.
 | 
| 565 | 641 |          """
 | 
| 566 |  | -        if not queue or len(blob) > FILE_SIZE_THRESHOLD:
 | 
|  | 642 | +
 | 
|  | 643 | +        if not queue or len(blob) > self._queueable_file_size_threshold():
 | 
| 567 | 644 |              blob_digest = self._send_blob(blob, digest=digest)
 | 
| 568 | 645 |          else:
 | 
| 569 | 646 |              blob_digest = self._queue_blob(blob, digest=digest)
 | 
| ... | ... | @@ -589,7 +666,7 @@ class Uploader: | 
| 589 | 666 |          """
 | 
| 590 | 667 |          message_blob = message.SerializeToString()
 | 
| 591 | 668 |  
 | 
| 592 |  | -        if not queue or len(message_blob) > FILE_SIZE_THRESHOLD:
 | 
|  | 669 | +        if not queue or len(message_blob) > self._queueable_file_size_threshold():
 | 
| 593 | 670 |              message_digest = self._send_blob(message_blob, digest=digest)
 | 
| 594 | 671 |          else:
 | 
| 595 | 672 |              message_digest = self._queue_blob(message_blob, digest=digest)
 | 
| ... | ... | @@ -622,7 +699,7 @@ class Uploader: | 
| 622 | 699 |          with open(file_path, 'rb') as bytes_steam:
 | 
| 623 | 700 |              file_bytes = bytes_steam.read()
 | 
| 624 | 701 |  
 | 
| 625 |  | -        if not queue or len(file_bytes) > FILE_SIZE_THRESHOLD:
 | 
|  | 702 | +        if not queue or len(file_bytes) > self._queueable_file_size_threshold():
 | 
| 626 | 703 |              file_digest = self._send_blob(file_bytes)
 | 
| 627 | 704 |          else:
 | 
| 628 | 705 |              file_digest = self._queue_blob(file_bytes)
 | 
| ... | ... | @@ -795,7 +872,12 @@ class Uploader: | 
| 795 | 872 |              blob_digest.hash = HASH(blob).hexdigest()
 | 
| 796 | 873 |              blob_digest.size_bytes = len(blob)
 | 
| 797 | 874 |  
 | 
| 798 |  | -        if self.__request_size + blob_digest.size_bytes > MAX_REQUEST_SIZE:
 | 
|  | 875 | +        # If we are here queueing a file we know that its size is
 | 
|  | 876 | +        # smaller than gRPC's message size limit.
 | 
|  | 877 | +        # We'll make a single batch request as big as the server allows.
 | 
|  | 878 | +        batch_size_limit = self._max_effective_batch_size_bytes()
 | 
|  | 879 | +
 | 
|  | 880 | +        if self.__request_size + blob_digest.size_bytes > batch_size_limit:
 | 
| 799 | 881 |              self.flush()
 | 
| 800 | 882 |          elif self.__request_count >= MAX_REQUEST_COUNT:
 | 
| 801 | 883 |              self.flush()
 | 
| ... | ... | @@ -851,3 +933,17 @@ class Uploader: | 
| 851 | 933 |                  written_digests.append(self._send_blob(blob, digest=digest))
 | 
| 852 | 934 |  
 | 
| 853 | 935 |          return written_digests
 | 
|  | 936 | +
 | 
|  | 937 | +    def _max_effective_batch_size_bytes(self):
 | 
|  | 938 | +        """Returns the effective maximum number of bytes that can be
 | 
|  | 939 | +        transferred using batches, considering gRPC maximum message size.
 | 
|  | 940 | +        """
 | 
|  | 941 | +        return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel,
 | 
|  | 942 | +                                                                         self.instance_name)
 | 
|  | 943 | +
 | 
|  | 944 | +    def _queueable_file_size_threshold(self):
 | 
|  | 945 | +        """Returns the size limit up until which files can be queued to
 | 
|  | 946 | +        be requested in a batch.
 | 
|  | 947 | +        """
 | 
|  | 948 | +        return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel,
 | 
|  | 949 | +                                                                       self.instance_name) |