[Notes] [Git][BuildGrid/buildgrid][santigl/100-server-capabilities] CAS.{Up, Down}loader(): if provided, respect `CacheCapabilities`



Title: GitLab

Santiago Gil pushed to branch santigl/100-server-capabilities at BuildGrid / buildgrid

Commits:

1 changed file:

Changes:

  • buildgrid/client/cas.py
    ... ... @@ -24,11 +24,13 @@ from buildgrid._exceptions import NotFoundError
    24 24
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    25 25
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    26 26
     from buildgrid._protos.google.rpc import code_pb2
    
    27
    +from buildgrid.client.capabilities import CapabilitiesInterface
    
    27 28
     from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
    
    28 29
     from buildgrid.utils import create_digest, merkle_tree_maker
    
    29 30
     
    
    31
    +
    
    30 32
     # Maximum size for a queueable file:
    
    31
    -FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    33
    +DEFAULT_FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    32 34
     
    
    33 35
     _FileRequest = namedtuple('FileRequest', ['digest', 'output_paths'])
    
    34 36
     
    
    ... ... @@ -50,6 +52,54 @@ class _CallCache:
    50 52
             return name in cls.__calls[channel]
    
    51 53
     
    
    52 54
     
    
    55
    +class _CasMaxBatchSizeCache:
    
    56
    +    """Cache that stores, for each remote, the limit of bytes that can
    
    57
    +    be transferred using batches.
    
    58
    +    """
    
    59
    +    __cas_max_batch_transfer_size = {}
    
    60
    +
    
    61
    +    @classmethod
    
    62
    +    def max_batch_total_size_bytes(cls, channel, instance_name):
    
    63
    +        """Returns the maximum number of bytes that can be transferred
    
    64
    +        using batch methods for the given remote.
    
    65
    +        """
    
    66
    +        if channel not in cls.__cas_max_batch_transfer_size:
    
    67
    +            cls.__cas_max_batch_transfer_size[channel] = \
    
    68
    +                cls._get_server_max_batch_total_size_bytes(channel, instance_name)
    
    69
    +
    
    70
    +        return cls.__cas_max_batch_transfer_size[channel]
    
    71
    +
    
    72
    +    @classmethod
    
    73
    +    def _get_server_max_batch_total_size_bytes(cls, channel, instance_name):
    
    74
    +        """Attempts to get the maximum size for batch transfers from
    
    75
    +        the server. If the server does not implement `GetCapabilities`,
    
    76
    +        returns a default value.
    
    77
    +        """
    
    78
    +        try:
    
    79
    +            capabilities_interface = CapabilitiesInterface(channel)
    
    80
    +            server_capabilities = \
    
    81
    +                capabilities_interface.get_capabilities(instance_name)
    
    82
    +
    
    83
    +            cache_capabilities = server_capabilities.cache_capabilities
    
    84
    +
    
    85
    +            return cache_capabilities.max_batch_total_size_bytes
    
    86
    +
    
    87
    +        except:
    
    88
    +            return DEFAULT_FILE_SIZE_THRESHOLD
    
    89
    +
    
    90
    +    @classmethod
    
    91
    +    def max_effective_batch_size_bytes(cls, channel, instance_name):
    
    92
    +        """Returns the maximum number of bytes that can be effectively
    
    93
    +        transferred using batches, considering the limits imposed by
    
    94
    +        the server's configuration and by gRPC.
    
    95
    +        """
    
    96
    +        server_limit = _CasMaxBatchSizeCache.max_batch_total_size_bytes(channel,
    
    97
    +                                                                        instance_name)
    
    98
    +        grpc_limit = MAX_REQUEST_SIZE
    
    99
    +
    
    100
    +        return min(server_limit, grpc_limit)
    
    101
    +
    
    102
    +
    
    53 103
     @contextmanager
    
    54 104
     def download(channel, instance=None, u_uid=None):
    
    55 105
         """Context manager generator for the :class:`Downloader` class."""
    
    ... ... @@ -189,7 +239,7 @@ class Downloader:
    189 239
             if not os.path.isabs(file_path):
    
    190 240
                 file_path = os.path.abspath(file_path)
    
    191 241
     
    
    192
    -        if not queue or digest.size_bytes > FILE_SIZE_THRESHOLD:
    
    242
    +        if not queue or digest.size_bytes > self._max_effective_batch_size_bytes():
    
    193 243
                 self._fetch_file(digest, file_path, is_executable=is_executable)
    
    194 244
             else:
    
    195 245
                 self._queue_file(digest, file_path, is_executable=is_executable)
    
    ... ... @@ -334,9 +384,14 @@ class Downloader:
    334 384
     
    
    335 385
         def _queue_file(self, digest, file_path, is_executable=False):
    
    336 386
             """Queues a file for later batch download"""
    
    337
    -        if self.__file_request_size + digest.ByteSize() > MAX_REQUEST_SIZE:
    
    387
    +        # If we are here queueing a file we know that its size is
    
    388
    +        # smaller than gRPC's message size limit.
    
    389
    +        # We'll make a single batch request as big as the server allows.
    
    390
    +        batch_size_limit = self._max_batch_size_bytes()
    
    391
    +
    
    392
    +        if self.__file_request_size + digest.ByteSize() > batch_size_limit:
    
    338 393
                 self.flush()
    
    339
    -        elif self.__file_response_size + digest.size_bytes > MAX_REQUEST_SIZE:
    
    394
    +        elif self.__file_response_size + digest.size_bytes > batch_size_limit:
    
    340 395
                 self.flush()
    
    341 396
             elif self.__file_request_count >= MAX_REQUEST_COUNT:
    
    342 397
                 self.flush()
    
    ... ... @@ -498,6 +553,20 @@ class Downloader:
    498 553
     
    
    499 554
                 os.symlink(symlink_path, target_path)
    
    500 555
     
    
    556
    +    def _max_batch_size_bytes(self):
    
    557
    +        """Returns the maximum number of bytes that the server allows
    
    558
    +        transferring using batches.
    
    559
    +        """
    
    560
    +        return _CasMaxBatchSizeCache.max_batch_total_size_bytes(self.channel,
    
    561
    +                                                                self.instance_name)
    
    562
    +
    
    563
    +    def _max_effective_batch_size_bytes(self):
    
    564
    +        """Returns the effective maximum number of bytes that can be
    
    565
    +        transferred using batches, considering gRPC maximum message size.
    
    566
    +        """
    
    567
    +        return _CasMaxBatchSizeCache.max_effective_batch_size_bytes(self.channel,
    
    568
    +                                                                    self.instance_name)
    
    569
    +
    
    501 570
     
    
    502 571
     @contextmanager
    
    503 572
     def upload(channel, instance=None, u_uid=None):
    
    ... ... @@ -563,7 +632,7 @@ class Uploader:
    563 632
             Returns:
    
    564 633
                 :obj:`Digest`: the sent blob's digest.
    
    565 634
             """
    
    566
    -        if not queue or len(blob) > FILE_SIZE_THRESHOLD:
    
    635
    +        if not queue or len(blob) > self._max_effective_batch_size_bytes():
    
    567 636
                 blob_digest = self._send_blob(blob, digest=digest)
    
    568 637
             else:
    
    569 638
                 blob_digest = self._queue_blob(blob, digest=digest)
    
    ... ... @@ -589,7 +658,7 @@ class Uploader:
    589 658
             """
    
    590 659
             message_blob = message.SerializeToString()
    
    591 660
     
    
    592
    -        if not queue or len(message_blob) > FILE_SIZE_THRESHOLD:
    
    661
    +        if not queue or len(message_blob) > self._max_effective_batch_size_bytes():
    
    593 662
                 message_digest = self._send_blob(message_blob, digest=digest)
    
    594 663
             else:
    
    595 664
                 message_digest = self._queue_blob(message_blob, digest=digest)
    
    ... ... @@ -622,7 +691,7 @@ class Uploader:
    622 691
             with open(file_path, 'rb') as bytes_steam:
    
    623 692
                 file_bytes = bytes_steam.read()
    
    624 693
     
    
    625
    -        if not queue or len(file_bytes) > FILE_SIZE_THRESHOLD:
    
    694
    +        if not queue or len(file_bytes) > self._max_effective_batch_size_bytes():
    
    626 695
                 file_digest = self._send_blob(file_bytes)
    
    627 696
             else:
    
    628 697
                 file_digest = self._queue_blob(file_bytes)
    
    ... ... @@ -795,7 +864,12 @@ class Uploader:
    795 864
                 blob_digest.hash = HASH(blob).hexdigest()
    
    796 865
                 blob_digest.size_bytes = len(blob)
    
    797 866
     
    
    798
    -        if self.__request_size + blob_digest.size_bytes > MAX_REQUEST_SIZE:
    
    867
    +        # If we are here queueing a file we know that its size is
    
    868
    +        # smaller than gRPC's message size limit.
    
    869
    +        # We'll make a single batch request as big as the server allows.
    
    870
    +        batch_size_limit = self._max_batch_size_bytes()
    
    871
    +
    
    872
    +        if self.__request_size + blob_digest.size_bytes > batch_size_limit:
    
    799 873
                 self.flush()
    
    800 874
             elif self.__request_count >= MAX_REQUEST_COUNT:
    
    801 875
                 self.flush()
    
    ... ... @@ -851,3 +925,17 @@ class Uploader:
    851 925
                     written_digests.append(self._send_blob(blob, digest=digest))
    
    852 926
     
    
    853 927
             return written_digests
    
    928
    +
    
    929
    +    def _max_batch_size_bytes(self):
    
    930
    +        """Returns the maximum number of bytes that the server allows
    
    931
    +        transferring using batches.
    
    932
    +        """
    
    933
    +        return _CasMaxBatchSizeCache.max_batch_total_size_bytes(self.channel,
    
    934
    +                                                                self.instance_name)
    
    935
    +
    
    936
    +    def _max_effective_batch_size_bytes(self):
    
    937
    +        """Returns the effective maximum number of bytes that can be
    
    938
    +        transferred using batches, considering gRPC maximum message size.
    
    939
    +        """
    
    940
    +        return _CasMaxBatchSizeCache.max_effective_batch_size_bytes(self.channel,
    
    941
    +                                                                    self.instance_name)



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]