[Notes] [Git][BuildGrid/buildgrid][santigl/100-server-capabilities] CAS.{Up, Down}loader(): if provided, respect `CacheCapabilities`



Title: GitLab

Santiago Gil pushed to branch santigl/100-server-capabilities at BuildGrid / buildgrid

Commits:

2 changed files:

Changes:

  • buildgrid/client/cas.py
    ... ... @@ -24,11 +24,10 @@ from buildgrid._exceptions import NotFoundError
    24 24
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    25 25
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    26 26
     from buildgrid._protos.google.rpc import code_pb2
    
    27
    -from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
    
    27
    +from buildgrid.client.capabilities import CapabilitiesInterface
    
    28
    +from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT, BATCH_REQUEST_SIZE_THRESHOLD
    
    28 29
     from buildgrid.utils import create_digest, merkle_tree_maker
    
    29 30
     
    
    30
    -# Maximum size for a queueable file:
    
    31
    -FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    32 31
     
    
    33 32
     _FileRequest = namedtuple('FileRequest', ['digest', 'output_paths'])
    
    34 33
     
    
    ... ... @@ -50,6 +49,58 @@ class _CallCache:
    50 49
             return name in cls.__calls[channel]
    
    51 50
     
    
    52 51
     
    
    52
    +class _CasMaxBatchSizeCache:
    
    53
    +    """Cache that stores, for each remote, the limit of bytes that can
    
    54
    +    be transferred using batches.
    
    55
    +    """
    
    56
    +    __cas_max_batch_transfer_size = {}
    
    57
    +
    
    58
    +    @classmethod
    
    59
    +    def max_batch_total_size_bytes(cls, channel, instance_name):
    
    60
    +        """Returns the maximum number of bytes that can be transferred
    
    61
    +        using batch methods for the given remote.
    
    62
    +        """
    
    63
    +        if channel not in cls.__cas_max_batch_transfer_size:
    
    64
    +            max_batch_size = cls._get_server_max_batch_total_size_bytes(channel,
    
    65
    +                                                                        instance_name)
    
    66
    +            cls.__cas_max_batch_transfer_size[channel] = max_batch_size
    
    67
    +
    
    68
    +        return cls.__cas_max_batch_transfer_size[channel]
    
    69
    +
    
    70
    +    @classmethod
    
    71
    +    def _get_server_max_batch_total_size_bytes(cls, channel, instance_name):
    
    72
    +        """Attempts to get the maximum size for batch transfers from
    
    73
    +        the server. If the server does not implement `GetCapabilities`,
    
    74
    +        defaults to the configured maximum request size.
    
    75
    +        """
    
    76
    +        try:
    
    77
    +            capabilities_interface = CapabilitiesInterface(channel)
    
    78
    +            server_capabilities = capabilities_interface.get_capabilities(instance_name)
    
    79
    +
    
    80
    +            cache_capabilities = server_capabilities.cache_capabilities
    
    81
    +
    
    82
    +            max_batch_total_size = cache_capabilities.max_batch_total_size_bytes
    
    83
    +            # The server could set this value to 0 (no limit set).
    
    84
    +            if max_batch_total_size:
    
    85
    +                return max_batch_total_size
    
    86
    +        except Exception:
    
    87
    +            pass
    
    88
    +
    
    89
    +        return MAX_REQUEST_SIZE
    
    90
    +
    
    91
    +    @classmethod
    
    92
    +    def max_effective_batch_size_bytes(cls, channel, instance_name):
    
    93
    +        """Returns the maximum number of bytes that can be effectively
    
    94
    +        transferred using batches, considering the limits imposed by
    
    95
    +        the server's configuration and by gRPC.
    
    96
    +        """
    
    97
    +        server_limit = _CasMaxBatchSizeCache.max_batch_total_size_bytes(channel,
    
    98
    +                                                                        instance_name)
    
    99
    +        grpc_limit = MAX_REQUEST_SIZE
    
    100
    +
    
    101
    +        return min(server_limit, grpc_limit)
    
    102
    +
    
    103
    +
    
    53 104
     @contextmanager
    
    54 105
     def download(channel, instance=None, u_uid=None):
    
    55 106
         """Context manager generator for the :class:`Downloader` class."""
    
    ... ... @@ -189,7 +240,10 @@ class Downloader:
    189 240
             if not os.path.isabs(file_path):
    
    190 241
                 file_path = os.path.abspath(file_path)
    
    191 242
     
    
    192
    -        if not queue or digest.size_bytes > FILE_SIZE_THRESHOLD:
    
    243
    +        assert (0 < BATCH_REQUEST_SIZE_THRESHOLD <= 1.0)
    
    244
    +        queueable_limit = BATCH_REQUEST_SIZE_THRESHOLD * self._max_effective_batch_size_bytes()
    
    245
    +
    
    246
    +        if not queue or digest.size_bytes > queueable_limit:
    
    193 247
                 self._fetch_file(digest, file_path, is_executable=is_executable)
    
    194 248
             else:
    
    195 249
                 self._queue_file(digest, file_path, is_executable=is_executable)
    
    ... ... @@ -334,9 +388,11 @@ class Downloader:
    334 388
     
    
    335 389
         def _queue_file(self, digest, file_path, is_executable=False):
    
    336 390
             """Queues a file for later batch download"""
    
    337
    -        if self.__file_request_size + digest.ByteSize() > MAX_REQUEST_SIZE:
    
    391
    +        batch_size_limit = self._max_effective_batch_size_bytes()
    
    392
    +
    
    393
    +        if self.__file_request_size + digest.ByteSize() > batch_size_limit:
    
    338 394
                 self.flush()
    
    339
    -        elif self.__file_response_size + digest.size_bytes > MAX_REQUEST_SIZE:
    
    395
    +        elif self.__file_response_size + digest.size_bytes > batch_size_limit:
    
    340 396
                 self.flush()
    
    341 397
             elif self.__file_request_count >= MAX_REQUEST_COUNT:
    
    342 398
                 self.flush()
    
    ... ... @@ -498,6 +554,20 @@ class Downloader:
    498 554
     
    
    499 555
                 os.symlink(symlink_path, target_path)
    
    500 556
     
    
    557
    +    def _max_batch_size_bytes(self):
    
    558
    +        """Returns the maximum number of bytes that the server allows
    
    559
    +        transferring using batches.
    
    560
    +        """
    
    561
    +        return _CasMaxBatchSizeCache.max_batch_total_size_bytes(self.channel,
    
    562
    +                                                                self.instance_name)
    
    563
    +
    
    564
    +    def _max_effective_batch_size_bytes(self):
    
    565
    +        """Returns the effective maximum number of bytes that can be
    
    566
    +        transferred using batches, considering gRPC maximum message size.
    
    567
    +        """
    
    568
    +        return _CasMaxBatchSizeCache.max_effective_batch_size_bytes(self.channel,
    
    569
    +                                                                    self.instance_name)
    
    570
    +
    
    501 571
     
    
    502 572
     @contextmanager
    
    503 573
     def upload(channel, instance=None, u_uid=None):
    
    ... ... @@ -563,7 +633,11 @@ class Uploader:
    563 633
             Returns:
    
    564 634
                 :obj:`Digest`: the sent blob's digest.
    
    565 635
             """
    
    566
    -        if not queue or len(blob) > FILE_SIZE_THRESHOLD:
    
    636
    +
    
    637
    +        assert (0 < BATCH_REQUEST_SIZE_THRESHOLD <= 1.0)
    
    638
    +        queueable_limit = BATCH_REQUEST_SIZE_THRESHOLD * self._max_effective_batch_size_bytes()
    
    639
    +
    
    640
    +        if not queue or len(blob) > queueable_limit:
    
    567 641
                 blob_digest = self._send_blob(blob, digest=digest)
    
    568 642
             else:
    
    569 643
                 blob_digest = self._queue_blob(blob, digest=digest)
    
    ... ... @@ -589,7 +663,10 @@ class Uploader:
    589 663
             """
    
    590 664
             message_blob = message.SerializeToString()
    
    591 665
     
    
    592
    -        if not queue or len(message_blob) > FILE_SIZE_THRESHOLD:
    
    666
    +        assert (0 < BATCH_REQUEST_SIZE_THRESHOLD <= 1.0)
    
    667
    +        queueable_limit = BATCH_REQUEST_SIZE_THRESHOLD * self._max_effective_batch_size_bytes()
    
    668
    +
    
    669
    +        if not queue or len(message_blob) > queueable_limit:
    
    593 670
                 message_digest = self._send_blob(message_blob, digest=digest)
    
    594 671
             else:
    
    595 672
                 message_digest = self._queue_blob(message_blob, digest=digest)
    
    ... ... @@ -622,7 +699,7 @@ class Uploader:
    622 699
             with open(file_path, 'rb') as bytes_steam:
    
    623 700
                 file_bytes = bytes_steam.read()
    
    624 701
     
    
    625
    -        if not queue or len(file_bytes) > FILE_SIZE_THRESHOLD:
    
    702
    +        if not queue or len(file_bytes) > self._max_effective_batch_size_bytes():
    
    626 703
                 file_digest = self._send_blob(file_bytes)
    
    627 704
             else:
    
    628 705
                 file_digest = self._queue_blob(file_bytes)
    
    ... ... @@ -795,7 +872,12 @@ class Uploader:
    795 872
                 blob_digest.hash = HASH(blob).hexdigest()
    
    796 873
                 blob_digest.size_bytes = len(blob)
    
    797 874
     
    
    798
    -        if self.__request_size + blob_digest.size_bytes > MAX_REQUEST_SIZE:
    
    875
    +        # If we are here queueing a file we know that its size is
    
    876
    +        # smaller than gRPC's message size limit.
    
    877
    +        # We'll make a single batch request as big as the server allows.
    
    878
    +        batch_size_limit = self._max_batch_size_bytes()
    
    879
    +
    
    880
    +        if self.__request_size + blob_digest.size_bytes > batch_size_limit:
    
    799 881
                 self.flush()
    
    800 882
             elif self.__request_count >= MAX_REQUEST_COUNT:
    
    801 883
                 self.flush()
    
    ... ... @@ -851,3 +933,17 @@ class Uploader:
    851 933
                     written_digests.append(self._send_blob(blob, digest=digest))
    
    852 934
     
    
    853 935
             return written_digests
    
    936
    +
    
    937
    +    def _max_batch_size_bytes(self):
    
    938
    +        """Returns the maximum number of bytes that the server allows
    
    939
    +        transferring using batches.
    
    940
    +        """
    
    941
    +        return _CasMaxBatchSizeCache.max_batch_total_size_bytes(self.channel,
    
    942
    +                                                                self.instance_name)
    
    943
    +
    
    944
    +    def _max_effective_batch_size_bytes(self):
    
    945
    +        """Returns the effective maximum number of bytes that can be
    
    946
    +        transferred using batches, considering gRPC maximum message size.
    
    947
    +        """
    
    948
    +        return _CasMaxBatchSizeCache.max_effective_batch_size_bytes(self.channel,
    
    949
    +                                                                    self.instance_name)

  • buildgrid/settings.py
    ... ... @@ -19,7 +19,7 @@ import hashlib
    19 19
     # Hash function used for computing digests:
    
    20 20
     HASH = hashlib.sha256
    
    21 21
     
    
    22
    -# Lenght in bytes of a hash string returned by HASH:
    
    22
    +# Length in bytes of a hash string returned by HASH:
    
    23 23
     HASH_LENGTH = HASH().digest_size * 2
    
    24 24
     
    
    25 25
     # Period, in seconds, for the monitoring cycle:
    
    ... ... @@ -31,6 +31,11 @@ MAX_REQUEST_SIZE = 2 * 1024 * 1024
    31 31
     # Maximum number of elements per gRPC request:
    
    32 32
     MAX_REQUEST_COUNT = 500
    
    33 33
     
    
    34
    +# Value that establishes an upper bound on the size of a file that can
    
    35
    +# be queued into a batch request. Expressed as a percentage of the
    
    36
    +# batch size limit:
    
    37
    +BATCH_REQUEST_SIZE_THRESHOLD = 0.5
    
    38
    +
    
    34 39
     # String format for log records:
    
    35 40
     LOG_RECORD_FORMAT = '%(asctime)s:[%(name)36.36s][%(levelname)5.5s]: %(message)s'
    
    36 41
     # The different log record attributes are documented here:
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]