[Notes] [Git][BuildGrid/buildgrid][master] CAS.{Up, Down}loader(): if provided, respect `CacheCapabilities`



Title: GitLab

Santiago Gil pushed to branch master at BuildGrid / buildgrid

Commits:

2 changed files:

Changes:

  • buildgrid/client/cas.py
    ... ... @@ -24,11 +24,10 @@ from buildgrid._exceptions import NotFoundError
    24 24
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    25 25
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    26 26
     from buildgrid._protos.google.rpc import code_pb2
    
    27
    -from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
    
    27
    +from buildgrid.client.capabilities import CapabilitiesInterface
    
    28
    +from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT, BATCH_REQUEST_SIZE_THRESHOLD
    
    28 29
     from buildgrid.utils import create_digest, merkle_tree_maker
    
    29 30
     
    
    30
    -# Maximum size for a queueable file:
    
    31
    -FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    32 31
     
    
    33 32
     _FileRequest = namedtuple('FileRequest', ['digest', 'output_paths'])
    
    34 33
     
    
    ... ... @@ -50,6 +49,67 @@ class _CallCache:
    50 49
             return name in cls.__calls[channel]
    
    51 50
     
    
    52 51
     
    
    52
    +class _CasBatchRequestSizesCache:
    
    53
    +    """Cache that stores, for each remote, the limit of bytes that can
    
    54
    +    be transferred using batches as well as a threshold that determines
    
    55
    +    when a file can be fetched as part of a batch request.
    
    56
    +    """
    
    57
    +    __cas_max_batch_transfer_size = {}
    
    58
    +    __cas_batch_request_size_threshold = {}
    
    59
    +
    
    60
    +    @classmethod
    
    61
    +    def max_effective_batch_size_bytes(cls, channel, instance_name):
    
    62
    +        """Returns the maximum number of bytes that can be transferred
    
    63
    +        using batch methods for the given remote.
    
    64
    +        """
    
    65
    +        if channel not in cls.__cas_max_batch_transfer_size:
    
    66
    +            cls.__cas_max_batch_transfer_size[channel] = dict()
    
    67
    +
    
    68
    +        if instance_name not in cls.__cas_max_batch_transfer_size[channel]:
    
    69
    +            max_batch_size = cls._get_server_max_batch_total_size_bytes(channel,
    
    70
    +                                                                        instance_name)
    
    71
    +
    
    72
    +            cls.__cas_max_batch_transfer_size[channel][instance_name] = max_batch_size
    
    73
    +
    
    74
    +        return cls.__cas_max_batch_transfer_size[channel][instance_name]
    
    75
    +
    
    76
    +    @classmethod
    
    77
    +    def batch_request_size_threshold(cls, channel, instance_name):
    
    78
    +        if channel not in cls.__cas_batch_request_size_threshold:
    
    79
    +            cls.__cas_batch_request_size_threshold[channel] = dict()
    
    80
    +
    
    81
    +        if instance_name not in cls.__cas_batch_request_size_threshold[channel]:
    
    82
    +            # Computing the threshold:
    
    83
    +            max_batch_size = cls.max_effective_batch_size_bytes(channel,
    
    84
    +                                                                instance_name)
    
    85
    +            threshold = BATCH_REQUEST_SIZE_THRESHOLD * max_batch_size
    
    86
    +
    
    87
    +            cls.__cas_batch_request_size_threshold[channel][instance_name] = threshold
    
    88
    +
    
    89
    +        return cls.__cas_batch_request_size_threshold[channel][instance_name]
    
    90
    +
    
    91
    +    @classmethod
    
    92
    +    def _get_server_max_batch_total_size_bytes(cls, channel, instance_name):
    
    93
    +        """Returns the maximum number of bytes that can be effectively
    
    94
    +        transferred using batches, considering the limits imposed by
    
    95
    +        the server's configuration and by gRPC.
    
    96
    +        """
    
    97
    +        try:
    
    98
    +            capabilities_interface = CapabilitiesInterface(channel)
    
    99
    +            server_capabilities = capabilities_interface.get_capabilities(instance_name)
    
    100
    +
    
    101
    +            cache_capabilities = server_capabilities.cache_capabilities
    
    102
    +
    
    103
    +            max_batch_total_size = cache_capabilities.max_batch_total_size_bytes
    
    104
    +            # The server could set this value to 0 (no limit set).
    
    105
    +            if max_batch_total_size:
    
    106
    +                return min(max_batch_total_size, MAX_REQUEST_SIZE)
    
    107
    +        except Exception:
    
    108
    +            pass
    
    109
    +
    
    110
    +        return MAX_REQUEST_SIZE
    
    111
    +
    
    112
    +
    
    53 113
     @contextmanager
    
    54 114
     def download(channel, instance=None, u_uid=None):
    
    55 115
         """Context manager generator for the :class:`Downloader` class."""
    
    ... ... @@ -189,7 +249,7 @@ class Downloader:
    189 249
             if not os.path.isabs(file_path):
    
    190 250
                 file_path = os.path.abspath(file_path)
    
    191 251
     
    
    192
    -        if not queue or digest.size_bytes > FILE_SIZE_THRESHOLD:
    
    252
    +        if not queue or digest.size_bytes > self._queueable_file_size_threshold():
    
    193 253
                 self._fetch_file(digest, file_path, is_executable=is_executable)
    
    194 254
             else:
    
    195 255
                 self._queue_file(digest, file_path, is_executable=is_executable)
    
    ... ... @@ -334,9 +394,11 @@ class Downloader:
    334 394
     
    
    335 395
         def _queue_file(self, digest, file_path, is_executable=False):
    
    336 396
             """Queues a file for later batch download"""
    
    337
    -        if self.__file_request_size + digest.ByteSize() > MAX_REQUEST_SIZE:
    
    397
    +        batch_size_limit = self._max_effective_batch_size_bytes()
    
    398
    +
    
    399
    +        if self.__file_request_size + digest.ByteSize() > batch_size_limit:
    
    338 400
                 self.flush()
    
    339
    -        elif self.__file_response_size + digest.size_bytes > MAX_REQUEST_SIZE:
    
    401
    +        elif self.__file_response_size + digest.size_bytes > batch_size_limit:
    
    340 402
                 self.flush()
    
    341 403
             elif self.__file_request_count >= MAX_REQUEST_COUNT:
    
    342 404
                 self.flush()
    
    ... ... @@ -498,6 +560,20 @@ class Downloader:
    498 560
     
    
    499 561
                 os.symlink(symlink_path, target_path)
    
    500 562
     
    
    563
    +    def _max_effective_batch_size_bytes(self):
    
    564
    +        """Returns the effective maximum number of bytes that can be
    
    565
    +        transferred using batches, considering gRPC maximum message size.
    
    566
    +        """
    
    567
    +        return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel,
    
    568
    +                                                                         self.instance_name)
    
    569
    +
    
    570
    +    def _queueable_file_size_threshold(self):
    
    571
    +        """Returns the size limit up until which files can be queued to
    
    572
    +        be requested in a batch.
    
    573
    +        """
    
    574
    +        return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel,
    
    575
    +                                                                       self.instance_name)
    
    576
    +
    
    501 577
     
    
    502 578
     @contextmanager
    
    503 579
     def upload(channel, instance=None, u_uid=None):
    
    ... ... @@ -563,7 +639,8 @@ class Uploader:
    563 639
             Returns:
    
    564 640
                 :obj:`Digest`: the sent blob's digest.
    
    565 641
             """
    
    566
    -        if not queue or len(blob) > FILE_SIZE_THRESHOLD:
    
    642
    +
    
    643
    +        if not queue or len(blob) > self._queueable_file_size_threshold():
    
    567 644
                 blob_digest = self._send_blob(blob, digest=digest)
    
    568 645
             else:
    
    569 646
                 blob_digest = self._queue_blob(blob, digest=digest)
    
    ... ... @@ -589,7 +666,7 @@ class Uploader:
    589 666
             """
    
    590 667
             message_blob = message.SerializeToString()
    
    591 668
     
    
    592
    -        if not queue or len(message_blob) > FILE_SIZE_THRESHOLD:
    
    669
    +        if not queue or len(message_blob) > self._queueable_file_size_threshold():
    
    593 670
                 message_digest = self._send_blob(message_blob, digest=digest)
    
    594 671
             else:
    
    595 672
                 message_digest = self._queue_blob(message_blob, digest=digest)
    
    ... ... @@ -622,7 +699,7 @@ class Uploader:
    622 699
             with open(file_path, 'rb') as bytes_steam:
    
    623 700
                 file_bytes = bytes_steam.read()
    
    624 701
     
    
    625
    -        if not queue or len(file_bytes) > FILE_SIZE_THRESHOLD:
    
    702
    +        if not queue or len(file_bytes) > self._queueable_file_size_threshold():
    
    626 703
                 file_digest = self._send_blob(file_bytes)
    
    627 704
             else:
    
    628 705
                 file_digest = self._queue_blob(file_bytes)
    
    ... ... @@ -795,7 +872,12 @@ class Uploader:
    795 872
                 blob_digest.hash = HASH(blob).hexdigest()
    
    796 873
                 blob_digest.size_bytes = len(blob)
    
    797 874
     
    
    798
    -        if self.__request_size + blob_digest.size_bytes > MAX_REQUEST_SIZE:
    
    875
    +        # If we are here queueing a file we know that its size is
    
    876
    +        # smaller than gRPC's message size limit.
    
    877
    +        # We'll make a single batch request as big as the server allows.
    
    878
    +        batch_size_limit = self._max_effective_batch_size_bytes()
    
    879
    +
    
    880
    +        if self.__request_size + blob_digest.size_bytes > batch_size_limit:
    
    799 881
                 self.flush()
    
    800 882
             elif self.__request_count >= MAX_REQUEST_COUNT:
    
    801 883
                 self.flush()
    
    ... ... @@ -851,3 +933,17 @@ class Uploader:
    851 933
                     written_digests.append(self._send_blob(blob, digest=digest))
    
    852 934
     
    
    853 935
             return written_digests
    
    936
    +
    
    937
    +    def _max_effective_batch_size_bytes(self):
    
    938
    +        """Returns the effective maximum number of bytes that can be
    
    939
    +        transferred using batches, considering gRPC maximum message size.
    
    940
    +        """
    
    941
    +        return _CasBatchRequestSizesCache.max_effective_batch_size_bytes(self.channel,
    
    942
    +                                                                         self.instance_name)
    
    943
    +
    
    944
    +    def _queueable_file_size_threshold(self):
    
    945
    +        """Returns the size limit up until which files can be queued to
    
    946
    +        be requested in a batch.
    
    947
    +        """
    
    948
    +        return _CasBatchRequestSizesCache.batch_request_size_threshold(self.channel,
    
    949
    +                                                                       self.instance_name)

  • buildgrid/settings.py
    ... ... @@ -25,7 +25,7 @@ LOW_REAPI_VERSION = '2.0.0'
    25 25
     # Hash function used for computing digests:
    
    26 26
     HASH = hashlib.sha256
    
    27 27
     
    
    28
    -# Lenght in bytes of a hash string returned by HASH:
    
    28
    +# Length in bytes of a hash string returned by HASH:
    
    29 29
     HASH_LENGTH = HASH().digest_size * 2
    
    30 30
     
    
    31 31
     # Period, in seconds, for the monitoring cycle:
    
    ... ... @@ -37,6 +37,11 @@ MAX_REQUEST_SIZE = 2 * 1024 * 1024
    37 37
     # Maximum number of elements per gRPC request:
    
    38 38
     MAX_REQUEST_COUNT = 500
    
    39 39
     
    
    40
    +# Value that establishes an upper bound on the size of a file that can
    
    41
    +# be queued into a batch request. Expressed as a percentage of the
    
    42
    +# batch size limit:
    
    43
    +BATCH_REQUEST_SIZE_THRESHOLD = 0.25
    
    44
    +
    
    40 45
     # String format for log records:
    
    41 46
     LOG_RECORD_FORMAT = '%(asctime)s:[%(name)36.36s][%(levelname)5.5s]: %(message)s'
    
    42 47
     # The different log record attributes are documented here:
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]