[Notes] [Git][BuildGrid/buildgrid][santi/76-batch-read-blobs] 2 commits: Implement ContentAddressableStorage.BatchReadBlobs()



Title: GitLab

Santiago Gil pushed to branch santi/76-batch-read-blobs at BuildGrid / buildgrid

Commits:

3 changed files:

Changes:

  • buildgrid/_app/commands/cmd_cas.py
    ... ... @@ -147,33 +147,35 @@ def _create_digest(digest_string):
    147 147
         return digest
    
    148 148
     
    
    149 149
     
    
    150
    -@cli.command('download-file', short_help="Download a file from the CAS server.")
    
    151
    -@click.argument('digest-string', nargs=1, type=click.STRING, required=True)
    
    152
    -@click.argument('file-path', nargs=1, type=click.Path(exists=False), required=True)
    
    150
    +@cli.command('download-file', short_help="Download one or more files from the CAS server. "
    
    151
    +                                         "(Specified as a space-separated list of DIGEST FILE_PATH)")
    
    152
    +@click.argument('digest-path-list', nargs=-1, type=str, required=True)  # 'digest path' pairs
    
    153 153
     @click.option('--verify', is_flag=True, show_default=True,
    
    154 154
                   help="Check downloaded file's integrity.")
    
    155 155
     @pass_context
    
    156
    -def download_file(context, digest_string, file_path, verify):
    
    157
    -    if os.path.exists(file_path):
    
    158
    -        click.echo("Error: Invalid value for " +
    
    159
    -                   "path=[{}] already exists.".format(file_path), err=True)
    
    160
    -        return
    
    161
    -
    
    162
    -    digest = _create_digest(digest_string)
    
    163
    -    with download(context.channel, instance=context.instance_name) as downloader:
    
    164
    -        downloader.download_file(digest, file_path)
    
    165
    -
    
    166
    -    if verify:
    
    167
    -        file_digest = create_digest(read_file(file_path))
    
    168
    -        if file_digest != digest:
    
    169
    -            click.echo("Error: Failed to verify path=[{}]".format(file_path), err=True)
    
    170
    -            return
    
    171
    -
    
    172
    -    if os.path.isfile(file_path):
    
    173
    -        click.echo("Success: Pulled path=[{}] from digest=[{}/{}]"
    
    174
    -                   .format(file_path, digest.hash, digest.size_bytes))
    
    175
    -    else:
    
    176
    -        click.echo('Error: Failed pulling "{}"'.format(file_path), err=True)
    
    156
    +def download_file(context, digest_path_list, verify):
    
    157
    +    for (digest_string, file_path) in zip(digest_path_list[0::2],
    
    158
    +                                          digest_path_list[1::2]):
    
    159
    +        if os.path.exists(file_path):
    
    160
    +            click.echo("Error: Invalid value for " +
    
    161
    +                       "path=[{}] already exists.".format(file_path), err=True)
    
    162
    +            continue
    
    163
    +
    
    164
    +        digest = _create_digest(digest_string)
    
    165
    +        with download(context.channel, instance=context.instance_name) as downloader:
    
    166
    +            downloader.download_file(digest, file_path)
    
    167
    +
    
    168
    +        if verify:
    
    169
    +            file_digest = create_digest(read_file(file_path))
    
    170
    +            if file_digest != digest:
    
    171
    +                click.echo("Error: Failed to verify path=[{}]".format(file_path), err=True)
    
    172
    +                continue
    
    173
    +
    
    174
    +        if os.path.isfile(file_path):
    
    175
    +            click.echo("Success: Pulled path=[{}] from digest=[{}/{}]"
    
    176
    +                       .format(file_path, digest.hash, digest.size_bytes))
    
    177
    +        else:
    
    178
    +            click.echo('Error: Failed pulling "{}"'.format(file_path), err=True)
    
    177 179
     
    
    178 180
     
    
    179 181
     @cli.command('download-dir', short_help="Download a directory from the CAS server.")
    

  • buildgrid/server/cas/instance.py
    ... ... @@ -24,6 +24,7 @@ import logging
    24 24
     from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    25 25
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    26 26
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    27
    +from buildgrid._protos.google.rpc import code_pb2, status_pb2
    
    27 28
     from buildgrid.settings import HASH, HASH_LENGTH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
    
    28 29
     from buildgrid.utils import get_hash_type
    
    29 30
     
    
    ... ... @@ -70,6 +71,35 @@ class ContentAddressableStorageInstance:
    70 71
     
    
    71 72
             return response
    
    72 73
     
    
    74
    +    def batch_read_blobs(self, digests):
    
    75
    +        storage = self._storage
    
    76
    +
    
    77
    +        response = re_pb2.BatchReadBlobsResponse()
    
    78
    +
    
    79
    +        requested_bytes = sum((digest.size_bytes for digest in digests))
    
    80
    +        max_batch_size = self.max_batch_total_size_bytes()
    
    81
    +
    
    82
    +        if requested_bytes > max_batch_size:
    
    83
    +            raise InvalidArgumentError('Combined total size of blobs exceeds '
    
    84
    +                                       'server limit. '
    
    85
    +                                       '({} > {} [byte])'.format(requested_bytes,
    
    86
    +                                                                 max_batch_size))
    
    87
    +
    
    88
    +        for digest in digests:
    
    89
    +            response_proto = response.responses.add()
    
    90
    +            response_proto.digest.CopyFrom(digest)
    
    91
    +
    
    92
    +            blob = storage.get_blob(digest)
    
    93
    +            if blob:
    
    94
    +                response_proto.data = blob.read()
    
    95
    +                status_code = code_pb2.OK
    
    96
    +            else:
    
    97
    +                status_code = code_pb2.NOT_FOUND
    
    98
    +
    
    99
    +            response_proto.status.CopyFrom(status_pb2.Status(code=status_code))
    
    100
    +
    
    101
    +        return response
    
    102
    +
    
    73 103
         def get_tree(self, request):
    
    74 104
             storage = self._storage
    
    75 105
     
    

  • buildgrid/server/cas/service.py
    ... ... @@ -86,8 +86,15 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    86 86
         def BatchReadBlobs(self, request, context):
    
    87 87
             self.__logger.debug("BatchReadBlobs request from [%s]", context.peer())
    
    88 88
     
    
    89
    -        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    90
    -        context.set_details('Method not implemented!')
    
    89
    +        try:
    
    90
    +            instance = self._get_instance(request.instance_name)
    
    91
    +            response = instance.batch_read_blobs(request.digests)
    
    92
    +            return response
    
    93
    +
    
    94
    +        except InvalidArgumentError as e:
    
    95
    +            self.__logger.error(e)
    
    96
    +            context.set_details(str(e))
    
    97
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    91 98
     
    
    92 99
             return remote_execution_pb2.BatchReadBlobsResponse()
    
    93 100
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]