[Notes] [Git][BuildGrid/buildgrid][santigl/100-server-capabilities] CAS.{Up, Down}loader(): if provided, respect `CacheCapabilities`



Title: GitLab

Santiago Gil pushed to branch santigl/100-server-capabilities at BuildGrid / buildgrid

Commits:

2 changed files:

Changes:

  • buildgrid/client/cas.py
    ... ... @@ -24,11 +24,10 @@ from buildgrid._exceptions import NotFoundError
    24 24
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    25 25
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    26 26
     from buildgrid._protos.google.rpc import code_pb2
    
    27
    -from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
    
    27
    +from buildgrid.client.capabilities import CapabilitiesInterface
    
    28
    +from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT, BATCH_REQUEST_SIZE_THRESHOLD
    
    28 29
     from buildgrid.utils import create_digest, merkle_tree_maker
    
    29 30
     
    
    30
    -# Maximum size for a queueable file:
    
    31
    -FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    32 31
     
    
    33 32
     _FileRequest = namedtuple('FileRequest', ['digest', 'output_paths'])
    
    34 33
     
    
    ... ... @@ -50,6 +49,80 @@ class _CallCache:
    50 49
             return name in cls.__calls[channel]
    
    51 50
     
    
    52 51
     
    
    52
    +class _CasBatchRequestSizesCache:
    
    53
    +    """Cache that stores, for each remote, the limit of bytes that can
    
    54
    +    be transferred using batches as well as a threshold that determines
    
    55
    +    when a file can be fetched as part of a batch request.
    
    56
    +    """
    
    57
    +    __cas_max_batch_transfer_size = {}
    
    58
    +    __cas_batch_request_size_threshold = {}
    
    59
    +
    
    60
    +    @classmethod
    
    61
    +    def max_effective_batch_size_bytes(cls, channel, instance_name):
    
    62
    +        """Returns the maximum number of bytes that can be transferred
    
    63
    +        using batch methods for the given remote.
    
    64
    +        """
    
    65
    +        if channel not in cls.__cas_max_batch_transfer_size:
    
    66
    +            cls.__cas_max_batch_transfer_size[channel] = dict()
    
    67
    +
    
    68
    +        if instance_name not in cls.__cas_max_batch_transfer_size[channel]:
    
    69
    +            # Fetching the value from the server:
    
    70
    +            max_batch_size = cls._max_effective_batch_size_bytes(channel,
    
    71
    +                                                                 instance_name)
    
    72
    +
    
    73
    +            cls.__cas_max_batch_transfer_size[channel][instance_name] = max_batch_size
    
    74
    +
    
    75
    +        return cls.__cas_max_batch_transfer_size[channel][instance_name]
    
    76
    +
    
    77
    +    @classmethod
    
    78
    +    def batch_request_size_threshold(cls, channel, instance_name):
    
    79
    +        if channel not in cls.__cas_batch_request_size_threshold:
    
    80
    +            cls.__cas_batch_request_size_threshold[channel] = dict()
    
    81
    +
    
    82
    +        if instance_name not in cls.__cas_batch_request_size_threshold[channel]:
    
    83
    +            # Computing the threshold:
    
    84
    +            max_batch_size = cls.max_effective_batch_size_bytes(channel,
    
    85
    +                                                                instance_name)
    
    86
    +            threshold = BATCH_REQUEST_SIZE_THRESHOLD * max_batch_size
    
    87
    +
    
    88
    +            cls.__cas_batch_request_size_threshold[channel][instance_name] = threshold
    
    89
    +
    
    90
    +        return cls.__cas_batch_request_size_threshold[channel][instance_name]
    
    91
    +
    
    92
    +    @classmethod
    
    93
    +    def _max_effective_batch_size_bytes(cls, channel, instance_name):
    
    94
    +        """Returns the maximum number of bytes that can be effectively
    
    95
    +        transferred using batches, considering the limits imposed by
    
    96
    +        the server's configuration and by gRPC.
    
    97
    +        """
    
    98
    +        server_limit = cls._get_server_max_batch_total_size_bytes(channel,
    
    99
    +                                                                  instance_name)
    
    100
    +        grpc_limit = MAX_REQUEST_SIZE
    
    101
    +
    
    102
    +        return min(server_limit, grpc_limit)
    
    103
    +
    
    104
    +    @classmethod
    
    105
    +    def _get_server_max_batch_total_size_bytes(cls, channel, instance_name):
    
    106
    +        """Attempts to get the maximum size for batch transfers from
    
    107
    +        the server. If the server does not implement `GetCapabilities`,
    
    108
    +        defaults to the configured maximum request size.
    
    109
    +        """
    
    110
    +        try:
    
    111
    +            capabilities_interface = CapabilitiesInterface(channel)
    
    112
    +            server_capabilities = capabilities_interface.get_capabilities(instance_name)
    
    113
    +
    
    114
    +            cache_capabilities = server_capabilities.cache_capabilities
    
    115
    +
    
    116
    +            max_batch_total_size = cache_capabilities.max_batch_total_size_bytes
    
    117
    +            # The server could set this value to 0 (no limit set).
    
    118
    +            if max_batch_total_size:
    
    119
    +                return max_batch_total_size
    
    120
    +        except Exception:
    
    121
    +            pass
    
    122
    +
    
    123
    +        return MAX_REQUEST_SIZE
    
    124
    +
    
    125
    +
    
    53 126
     @contextmanager
    
    54 127
     def download(channel, instance=None, u_uid=None):
    
    55 128
         """Context manager generator for the :class:`Downloader` class."""
    
    ... ... @@ -189,7 +262,7 @@ class Downloader:
    189 262
             if not os.path.isabs(file_path):
    
    190 263
                 file_path = os.path.abspath(file_path)
    
    191 264
     
    
    192
    -        if not queue or digest.size_bytes > FILE_SIZE_THRESHOLD:
    
    265
    +        if not queue or digest.size_bytes > self._queueable_file_size_threshold():
    
    193 266
                 self._fetch_file(digest, file_path, is_executable=is_executable)
    
    194 267
             else:
    
    195 268
                 self._queue_file(digest, file_path, is_executable=is_executable)
    
    ... ... @@ -334,9 +407,11 @@ class Downloader:
    334 407
     
    
    335 408
         def _queue_file(self, digest, file_path, is_executable=False):
    
    336 409
             """Queues a file for later batch download"""
    
    337
    -        if self.__file_request_size + digest.ByteSize() > MAX_REQUEST_SIZE:
    
    410
    +        batch_size_limit = self._max_effective_batch_size_bytes()
    
    411
    +
    
    412
    +        if self.__file_request_size + digest.ByteSize() > batch_size_limit:
    
    338 413
                 self.flush()
    
    339
    -        elif self.__file_response_size + digest.size_bytes > MAX_REQUEST_SIZE:
    
    414
    +        elif self.__file_response_size + digest.size_bytes > batch_size_limit:
    
    340 415
                 self.flush()
    
    341 416
             elif self.__file_request_count >= MAX_REQUEST_COUNT:
    
    342 417
                 self.flush()
    
    ... ... @@ -498,6 +573,20 @@ class Downloader:
    498 573
     
    
    499 574
                 os.symlink(symlink_path, target_path)
    
    500 575
     
    
    576
    +    def _max_effective_batch_size_bytes(self):
    
    577
    +        """Returns the effective maximum number of bytes that can be
    
    578
    +        transferred using batches, considering gRPC maximum message size.
    
    579
    +        """
    
    580
    +        return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel,
    
    581
    +                                                                         self.instance_name)
    
    582
    +
    
    583
    +    def _queueable_file_size_threshold(self):
    
    584
    +        """Returns the size limit up until which files can be queued to
    
    585
    +        be requested in a batch.
    
    586
    +        """
    
    587
    +        return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel,
    
    588
    +                                                                       self.instance_name)
    
    589
    +
    
    501 590
     
    
    502 591
     @contextmanager
    
    503 592
     def upload(channel, instance=None, u_uid=None):
    
    ... ... @@ -563,7 +652,8 @@ class Uploader:
    563 652
             Returns:
    
    564 653
                 :obj:`Digest`: the sent blob's digest.
    
    565 654
             """
    
    566
    -        if not queue or len(blob) > FILE_SIZE_THRESHOLD:
    
    655
    +
    
    656
    +        if not queue or len(blob) > self._queueable_file_size_threshold():
    
    567 657
                 blob_digest = self._send_blob(blob, digest=digest)
    
    568 658
             else:
    
    569 659
                 blob_digest = self._queue_blob(blob, digest=digest)
    
    ... ... @@ -589,7 +679,7 @@ class Uploader:
    589 679
             """
    
    590 680
             message_blob = message.SerializeToString()
    
    591 681
     
    
    592
    -        if not queue or len(message_blob) > FILE_SIZE_THRESHOLD:
    
    682
    +        if not queue or len(message_blob) > self._queueable_file_size_threshold():
    
    593 683
                 message_digest = self._send_blob(message_blob, digest=digest)
    
    594 684
             else:
    
    595 685
                 message_digest = self._queue_blob(message_blob, digest=digest)
    
    ... ... @@ -622,7 +712,7 @@ class Uploader:
    622 712
             with open(file_path, 'rb') as bytes_steam:
    
    623 713
                 file_bytes = bytes_steam.read()
    
    624 714
     
    
    625
    -        if not queue or len(file_bytes) > FILE_SIZE_THRESHOLD:
    
    715
    +        if not queue or len(file_bytes) > self._queueable_file_size_threshold():
    
    626 716
                 file_digest = self._send_blob(file_bytes)
    
    627 717
             else:
    
    628 718
                 file_digest = self._queue_blob(file_bytes)
    
    ... ... @@ -795,7 +885,12 @@ class Uploader:
    795 885
                 blob_digest.hash = HASH(blob).hexdigest()
    
    796 886
                 blob_digest.size_bytes = len(blob)
    
    797 887
     
    
    798
    -        if self.__request_size + blob_digest.size_bytes > MAX_REQUEST_SIZE:
    
    888
    +        # If we are here queueing a file we know that its size is
    
    889
    +        # smaller than gRPC's message size limit.
    
    890
    +        # We'll make a single batch request as big as the server allows.
    
    891
    +        batch_size_limit = self._max_effective_batch_size_bytes()
    
    892
    +
    
    893
    +        if self.__request_size + blob_digest.size_bytes > batch_size_limit:
    
    799 894
                 self.flush()
    
    800 895
             elif self.__request_count >= MAX_REQUEST_COUNT:
    
    801 896
                 self.flush()
    
    ... ... @@ -851,3 +946,17 @@ class Uploader:
    851 946
                     written_digests.append(self._send_blob(blob, digest=digest))
    
    852 947
     
    
    853 948
             return written_digests
    
    949
    +
    
    950
    +    def _max_effective_batch_size_bytes(self):
    
    951
    +        """Returns the effective maximum number of bytes that can be
    
    952
    +        transferred using batches, considering gRPC maximum message size.
    
    953
    +        """
    
    954
    +        return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel,
    
    955
    +                                                                         self.instance_name)
    
    956
    +
    
    957
    +    def _queueable_file_size_threshold(self):
    
    958
    +        """Returns the size limit up until which files can be queued to
    
    959
    +        be requested in a batch.
    
    960
    +        """
    
    961
    +        return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel,
    
    962
    +                                                                       self.instance_name)

  • buildgrid/settings.py
    ... ... @@ -19,7 +19,7 @@ import hashlib
    19 19
     # Hash function used for computing digests:
    
    20 20
     HASH = hashlib.sha256
    
    21 21
     
    
    22
    -# Lenght in bytes of a hash string returned by HASH:
    
    22
    +# Length in bytes of a hash string returned by HASH:
    
    23 23
     HASH_LENGTH = HASH().digest_size * 2
    
    24 24
     
    
    25 25
     # Period, in seconds, for the monitoring cycle:
    
    ... ... @@ -31,6 +31,11 @@ MAX_REQUEST_SIZE = 2 * 1024 * 1024
    31 31
     # Maximum number of elements per gRPC request:
    
    32 32
     MAX_REQUEST_COUNT = 500
    
    33 33
     
    
    34
    +# Value that establishes an upper bound on the size of a file that can
    
    35
    +# be queued into a batch request. Expressed as a percentage of the
    
    36
    +# batch size limit:
    
    37
    +BATCH_REQUEST_SIZE_THRESHOLD = 0.25
    
    38
    +
    
    34 39
     # String format for log records:
    
    35 40
     LOG_RECORD_FORMAT = '%(asctime)s:[%(name)36.36s][%(levelname)5.5s]: %(message)s'
    
    36 41
     # The different log record attributes are documented here:
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]