[Notes] [Git][BuildGrid/buildgrid][santi/76-batch-read-blobs] Implement ContentAddressableStorage.BatchReadBlobs()



Title: GitLab

Santiago Gil pushed to branch santi/76-batch-read-blobs at BuildGrid / buildgrid

Commits:

2 changed files:

Changes:

  • buildgrid/server/cas/instance.py
    ... ... @@ -24,6 +24,7 @@ import logging
    24 24
     from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    25 25
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    26 26
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    27
    +from buildgrid._protos.google.rpc import code_pb2, status_pb2
    
    27 28
     from buildgrid.settings import HASH, HASH_LENGTH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
    
    28 29
     from buildgrid.utils import get_hash_type
    
    29 30
     
    
    ... ... @@ -70,6 +71,33 @@ class ContentAddressableStorageInstance:
    70 71
     
    
    71 72
             return response
    
    72 73
     
    
    74
    +    def batch_read_blobs(self, digests):
    
    75
    +        response = re_pb2.BatchReadBlobsResponse()
    
    76
    +
    
    77
    +        requested_bytes = sum((digest.size_bytes for digest in digests))
    
    78
    +        max_batch_size = self.max_batch_total_size_bytes()
    
    79
    +
    
    80
    +        if requested_bytes > max_batch_size:
    
    81
    +            raise InvalidArgumentError('Combined total size of blobs exceeds '
    
    82
    +                                       'server limit. '
    
    83
    +                                       '({} > {} [byte])'.format(requested_bytes,
    
    84
    +                                                                 max_batch_size))
    
    85
    +
    
    86
    +        for digest in digests:
    
    87
    +            response_proto = response.responses.add()
    
    88
    +            response_proto.digest.CopyFrom(digest)
    
    89
    +
    
    90
    +            blob = self._storage.get_blob(digest)
    
    91
    +            if blob:
    
    92
    +                response_proto.data = blob.read()
    
    93
    +                status_code = code_pb2.OK
    
    94
    +            else:
    
    95
    +                status_code = code_pb2.NOT_FOUND
    
    96
    +
    
    97
    +            response_proto.status.CopyFrom(status_pb2.Status(code=status_code))
    
    98
    +
    
    99
    +        return response
    
    100
    +
    
    73 101
         def get_tree(self, request):
    
    74 102
             storage = self._storage
    
    75 103
     
    

  • buildgrid/server/cas/service.py
    ... ... @@ -86,8 +86,15 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    86 86
         def BatchReadBlobs(self, request, context):
    
    87 87
             self.__logger.debug("BatchReadBlobs request from [%s]", context.peer())
    
    88 88
     
    
    89
    -        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    90
    -        context.set_details('Method not implemented!')
    
    89
    +        try:
    
    90
    +            instance = self._get_instance(request.instance_name)
    
    91
    +            response = instance.batch_read_blobs(request.digests)
    
    92
    +            return response
    
    93
    +
    
    94
    +        except InvalidArgumentError as e:
    
    95
    +            self.__logger.error(e)
    
    96
    +            context.set_details(str(e))
    
    97
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    91 98
     
    
    92 99
             return remote_execution_pb2.BatchReadBlobsResponse()
    
    93 100
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]