[Notes] [Git][BuildGrid/buildgrid][santigl/100-server-capabilities] CAS.{Up, Down}loader(): if provided, respect `CacheCapabilities`



Title: GitLab

Santiago Gil pushed to branch santigl/100-server-capabilities at BuildGrid / buildgrid

Commits:

1 changed file:

Changes:

  • buildgrid/client/cas.py
    ... ... @@ -24,11 +24,13 @@ from buildgrid._exceptions import NotFoundError
    24 24
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    25 25
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    26 26
     from buildgrid._protos.google.rpc import code_pb2
    
    27
    +from buildgrid.client.capabilities import CapabilitiesInterface
    
    27 28
     from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
    
    28 29
     from buildgrid.utils import create_digest, merkle_tree_maker
    
    29 30
     
    
    31
    +
    
    30 32
     # Maximum size for a queueable file:
    
    31
    -FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    33
    +DEFAULT_FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    32 34
     
    
    33 35
     _FileRequest = namedtuple('FileRequest', ['digest', 'output_paths'])
    
    34 36
     
    
    ... ... @@ -50,6 +52,58 @@ class _CallCache:
    50 52
             return name in cls.__calls[channel]
    
    51 53
     
    
    52 54
     
    
    55
    +class _CasMaxBatchSizeCache:
    
    56
    +    """Cache that stores, for each remote, the limit of bytes that can
    
    57
    +    be transferred using batches.
    
    58
    +    """
    
    59
    +    __cas_max_batch_transfer_size = {}
    
    60
    +
    
    61
    +    @classmethod
    
    62
    +    def max_batch_total_size_bytes(cls, channel, instance_name):
    
    63
    +        """Returns the maximum number of bytes that can be transferred
    
    64
    +        using batch methods for the given remote.
    
    65
    +        """
    
    66
    +        if channel not in cls.__cas_max_batch_transfer_size:
    
    67
    +            max_batch_size = cls._get_server_max_batch_total_size_bytes(channel,
    
    68
    +                                                                        instance_name)
    
    69
    +            cls.__cas_max_batch_transfer_size[channel] = max_batch_size
    
    70
    +
    
    71
    +        return cls.__cas_max_batch_transfer_size[channel]
    
    72
    +
    
    73
    +    @classmethod
    
    74
    +    def _get_server_max_batch_total_size_bytes(cls, channel, instance_name):
    
    75
    +        """Attempts to get the maximum size for batch transfers from
    
    76
    +        the server. If the server does not implement `GetCapabilities`,
    
    77
    +        returns a default value.
    
    78
    +        """
    
    79
    +        try:
    
    80
    +            capabilities_interface = CapabilitiesInterface(channel)
    
    81
    +            server_capabilities = capabilities_interface.get_capabilities(instance_name)
    
    82
    +
    
    83
    +            cache_capabilities = server_capabilities.cache_capabilities
    
    84
    +
    
    85
    +            max_batch_total_size = cache_capabilities.max_batch_total_size_bytes
    
    86
    +            # The server could set this value to 0 (no limit set).
    
    87
    +            if max_batch_total_size:
    
    88
    +                return max_batch_total_size
    
    89
    +        except Exception:
    
    90
    +            pass
    
    91
    +
    
    92
    +        return DEFAULT_FILE_SIZE_THRESHOLD
    
    93
    +
    
    94
    +    @classmethod
    
    95
    +    def max_effective_batch_size_bytes(cls, channel, instance_name):
    
    96
    +        """Returns the maximum number of bytes that can be effectively
    
    97
    +        transferred using batches, considering the limits imposed by
    
    98
    +        the server's configuration and by gRPC.
    
    99
    +        """
    
    100
    +        server_limit = _CasMaxBatchSizeCache.max_batch_total_size_bytes(channel,
    
    101
    +                                                                        instance_name)
    
    102
    +        grpc_limit = MAX_REQUEST_SIZE
    
    103
    +
    
    104
    +        return min(server_limit, grpc_limit)
    
    105
    +
    
    106
    +
    
    53 107
     @contextmanager
    
    54 108
     def download(channel, instance=None, u_uid=None):
    
    55 109
         """Context manager generator for the :class:`Downloader` class."""
    
    ... ... @@ -189,7 +243,7 @@ class Downloader:
    189 243
             if not os.path.isabs(file_path):
    
    190 244
                 file_path = os.path.abspath(file_path)
    
    191 245
     
    
    192
    -        if not queue or digest.size_bytes > FILE_SIZE_THRESHOLD:
    
    246
    +        if not queue or digest.size_bytes > self._max_effective_batch_size_bytes():
    
    193 247
                 self._fetch_file(digest, file_path, is_executable=is_executable)
    
    194 248
             else:
    
    195 249
                 self._queue_file(digest, file_path, is_executable=is_executable)
    
    ... ... @@ -334,9 +388,14 @@ class Downloader:
    334 388
     
    
    335 389
         def _queue_file(self, digest, file_path, is_executable=False):
    
    336 390
             """Queues a file for later batch download"""
    
    337
    -        if self.__file_request_size + digest.ByteSize() > MAX_REQUEST_SIZE:
    
    391
    +        # If we are here queueing a file we know that its size is
    
    392
    +        # smaller than gRPC's message size limit.
    
    393
    +        # We'll make a single batch request as big as the server allows.
    
    394
    +        batch_size_limit = self._max_batch_size_bytes()
    
    395
    +
    
    396
    +        if self.__file_request_size + digest.ByteSize() > batch_size_limit:
    
    338 397
                 self.flush()
    
    339
    -        elif self.__file_response_size + digest.size_bytes > MAX_REQUEST_SIZE:
    
    398
    +        elif self.__file_response_size + digest.size_bytes > batch_size_limit:
    
    340 399
                 self.flush()
    
    341 400
             elif self.__file_request_count >= MAX_REQUEST_COUNT:
    
    342 401
                 self.flush()
    
    ... ... @@ -498,6 +557,20 @@ class Downloader:
    498 557
     
    
    499 558
                 os.symlink(symlink_path, target_path)
    
    500 559
     
    
    560
    +    def _max_batch_size_bytes(self):
    
    561
    +        """Returns the maximum number of bytes that the server allows
    
    562
    +        transferring using batches.
    
    563
    +        """
    
    564
    +        return _CasMaxBatchSizeCache.max_batch_total_size_bytes(self.channel,
    
    565
    +                                                                self.instance_name)
    
    566
    +
    
    567
    +    def _max_effective_batch_size_bytes(self):
    
    568
    +        """Returns the effective maximum number of bytes that can be
    
    569
    +        transferred using batches, considering gRPC maximum message size.
    
    570
    +        """
    
    571
    +        return _CasMaxBatchSizeCache.max_effective_batch_size_bytes(self.channel,
    
    572
    +                                                                    self.instance_name)
    
    573
    +
    
    501 574
     
    
    502 575
     @contextmanager
    
    503 576
     def upload(channel, instance=None, u_uid=None):
    
    ... ... @@ -563,7 +636,7 @@ class Uploader:
    563 636
             Returns:
    
    564 637
                 :obj:`Digest`: the sent blob's digest.
    
    565 638
             """
    
    566
    -        if not queue or len(blob) > FILE_SIZE_THRESHOLD:
    
    639
    +        if not queue or len(blob) > self._max_effective_batch_size_bytes():
    
    567 640
                 blob_digest = self._send_blob(blob, digest=digest)
    
    568 641
             else:
    
    569 642
                 blob_digest = self._queue_blob(blob, digest=digest)
    
    ... ... @@ -589,7 +662,7 @@ class Uploader:
    589 662
             """
    
    590 663
             message_blob = message.SerializeToString()
    
    591 664
     
    
    592
    -        if not queue or len(message_blob) > FILE_SIZE_THRESHOLD:
    
    665
    +        if not queue or len(message_blob) > self._max_effective_batch_size_bytes():
    
    593 666
                 message_digest = self._send_blob(message_blob, digest=digest)
    
    594 667
             else:
    
    595 668
                 message_digest = self._queue_blob(message_blob, digest=digest)
    
    ... ... @@ -622,7 +695,7 @@ class Uploader:
    622 695
             with open(file_path, 'rb') as bytes_steam:
    
    623 696
                 file_bytes = bytes_steam.read()
    
    624 697
     
    
    625
    -        if not queue or len(file_bytes) > FILE_SIZE_THRESHOLD:
    
    698
    +        if not queue or len(file_bytes) > self._max_effective_batch_size_bytes():
    
    626 699
                 file_digest = self._send_blob(file_bytes)
    
    627 700
             else:
    
    628 701
                 file_digest = self._queue_blob(file_bytes)
    
    ... ... @@ -795,7 +868,12 @@ class Uploader:
    795 868
                 blob_digest.hash = HASH(blob).hexdigest()
    
    796 869
                 blob_digest.size_bytes = len(blob)
    
    797 870
     
    
    798
    -        if self.__request_size + blob_digest.size_bytes > MAX_REQUEST_SIZE:
    
    871
    +        # If we are here queueing a file we know that its size is
    
    872
    +        # smaller than gRPC's message size limit.
    
    873
    +        # We'll make a single batch request as big as the server allows.
    
    874
    +        batch_size_limit = self._max_batch_size_bytes()
    
    875
    +
    
    876
    +        if self.__request_size + blob_digest.size_bytes > batch_size_limit:
    
    799 877
                 self.flush()
    
    800 878
             elif self.__request_count >= MAX_REQUEST_COUNT:
    
    801 879
                 self.flush()
    
    ... ... @@ -851,3 +929,17 @@ class Uploader:
    851 929
                     written_digests.append(self._send_blob(blob, digest=digest))
    
    852 930
     
    
    853 931
             return written_digests
    
    932
    +
    
    933
    +    def _max_batch_size_bytes(self):
    
    934
    +        """Returns the maximum number of bytes that the server allows
    
    935
    +        transferring using batches.
    
    936
    +        """
    
    937
    +        return _CasMaxBatchSizeCache.max_batch_total_size_bytes(self.channel,
    
    938
    +                                                                self.instance_name)
    
    939
    +
    
    940
    +    def _max_effective_batch_size_bytes(self):
    
    941
    +        """Returns the effective maximum number of bytes that can be
    
    942
    +        transferred using batches, considering gRPC maximum message size.
    
    943
    +        """
    
    944
    +        return _CasMaxBatchSizeCache.max_effective_batch_size_bytes(self.channel,
    
    945
    +                                                                    self.instance_name)



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]