Santiago Gil pushed to branch santi/76-batch-read-blobs at BuildGrid / buildgrid
Commits:
3 changed files:
Changes:
... | ... | @@ -147,33 +147,35 @@ def _create_digest(digest_string): |
147 | 147 |
return digest
|
148 | 148 |
|
149 | 149 |
|
150 |
-@cli.command('download-file', short_help="Download a file from the CAS server.")
|
|
151 |
-@click.argument('digest-string', nargs=1, type=click.STRING, required=True)
|
|
152 |
-@click.argument('file-path', nargs=1, type=click.Path(exists=False), required=True)
|
|
150 |
+@cli.command('download-file', short_help="Download one or more files from the CAS server. "
|
|
151 |
+ "(Specified as a space-separated list of DIGEST FILE_PATH)")
|
|
152 |
+@click.argument('digest-path-list', nargs=-1, type=str, required=True) # 'digest path' pairs
|
|
153 | 153 |
@click.option('--verify', is_flag=True, show_default=True,
|
154 | 154 |
help="Check downloaded file's integrity.")
|
155 | 155 |
@pass_context
|
156 |
-def download_file(context, digest_string, file_path, verify):
|
|
157 |
- if os.path.exists(file_path):
|
|
158 |
- click.echo("Error: Invalid value for " +
|
|
159 |
- "path=[{}] already exists.".format(file_path), err=True)
|
|
160 |
- return
|
|
161 |
- |
|
162 |
- digest = _create_digest(digest_string)
|
|
163 |
- with download(context.channel, instance=context.instance_name) as downloader:
|
|
164 |
- downloader.download_file(digest, file_path)
|
|
165 |
- |
|
166 |
- if verify:
|
|
167 |
- file_digest = create_digest(read_file(file_path))
|
|
168 |
- if file_digest != digest:
|
|
169 |
- click.echo("Error: Failed to verify path=[{}]".format(file_path), err=True)
|
|
170 |
- return
|
|
171 |
- |
|
172 |
- if os.path.isfile(file_path):
|
|
173 |
- click.echo("Success: Pulled path=[{}] from digest=[{}/{}]"
|
|
174 |
- .format(file_path, digest.hash, digest.size_bytes))
|
|
175 |
- else:
|
|
176 |
- click.echo('Error: Failed pulling "{}"'.format(file_path), err=True)
|
|
156 |
+def download_file(context, digest_path_list, verify):
|
|
157 |
+ for (digest_string, file_path) in zip(digest_path_list[0::2],
|
|
158 |
+ digest_path_list[1::2]):
|
|
159 |
+ if os.path.exists(file_path):
|
|
160 |
+ click.echo("Error: Invalid value for " +
|
|
161 |
+ "path=[{}] already exists.".format(file_path), err=True)
|
|
162 |
+ continue
|
|
163 |
+ |
|
164 |
+ digest = _create_digest(digest_string)
|
|
165 |
+ with download(context.channel, instance=context.instance_name) as downloader:
|
|
166 |
+ downloader.download_file(digest, file_path)
|
|
167 |
+ |
|
168 |
+ if verify:
|
|
169 |
+ file_digest = create_digest(read_file(file_path))
|
|
170 |
+ if file_digest != digest:
|
|
171 |
+ click.echo("Error: Failed to verify path=[{}]".format(file_path), err=True)
|
|
172 |
+ continue
|
|
173 |
+ |
|
174 |
+ if os.path.isfile(file_path):
|
|
175 |
+ click.echo("Success: Pulled path=[{}] from digest=[{}/{}]"
|
|
176 |
+ .format(file_path, digest.hash, digest.size_bytes))
|
|
177 |
+ else:
|
|
178 |
+ click.echo('Error: Failed pulling "{}"'.format(file_path), err=True)
|
|
177 | 179 |
|
178 | 180 |
|
179 | 181 |
@cli.command('download-dir', short_help="Download a directory from the CAS server.")
|
... | ... | @@ -24,6 +24,7 @@ import logging |
24 | 24 |
from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
|
25 | 25 |
from buildgrid._protos.google.bytestream import bytestream_pb2
|
26 | 26 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
27 |
+from buildgrid._protos.google.rpc import code_pb2, status_pb2
|
|
27 | 28 |
from buildgrid.settings import HASH, HASH_LENGTH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
28 | 29 |
from buildgrid.utils import get_hash_type
|
29 | 30 |
|
... | ... | @@ -70,6 +71,35 @@ class ContentAddressableStorageInstance: |
70 | 71 |
|
71 | 72 |
return response
|
72 | 73 |
|
74 |
+ def batch_read_blobs(self, digests):
|
|
75 |
+ storage = self._storage
|
|
76 |
+ |
|
77 |
+ response = re_pb2.BatchReadBlobsResponse()
|
|
78 |
+ |
|
79 |
+ requested_bytes = sum((digest.size_bytes for digest in digests))
|
|
80 |
+ max_batch_size = self.max_batch_total_size_bytes()
|
|
81 |
+ |
|
82 |
+ if requested_bytes > max_batch_size:
|
|
83 |
+ raise InvalidArgumentError('Combined total size of blobs exceeds '
|
|
84 |
+ 'server limit. '
|
|
85 |
+ '({} > {} [byte])'.format(requested_bytes,
|
|
86 |
+ max_batch_size))
|
|
87 |
+ |
|
88 |
+ for digest in digests:
|
|
89 |
+ response_proto = response.responses.add()
|
|
90 |
+ response_proto.digest.CopyFrom(digest)
|
|
91 |
+ |
|
92 |
+ blob = storage.get_blob(digest)
|
|
93 |
+ if blob:
|
|
94 |
+ response_proto.data = blob.read()
|
|
95 |
+ status_code = code_pb2.OK
|
|
96 |
+ else:
|
|
97 |
+ status_code = code_pb2.NOT_FOUND
|
|
98 |
+ |
|
99 |
+ response_proto.status.CopyFrom(status_pb2.Status(code=status_code))
|
|
100 |
+ |
|
101 |
+ return response
|
|
102 |
+ |
|
73 | 103 |
def get_tree(self, request):
|
74 | 104 |
storage = self._storage
|
75 | 105 |
|
... | ... | @@ -86,8 +86,15 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
86 | 86 |
def BatchReadBlobs(self, request, context):
|
87 | 87 |
self.__logger.debug("BatchReadBlobs request from [%s]", context.peer())
|
88 | 88 |
|
89 |
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
90 |
- context.set_details('Method not implemented!')
|
|
89 |
+ try:
|
|
90 |
+ instance = self._get_instance(request.instance_name)
|
|
91 |
+ response = instance.batch_read_blobs(request.digests)
|
|
92 |
+ return response
|
|
93 |
+ |
|
94 |
+ except InvalidArgumentError as e:
|
|
95 |
+ self.__logger.error(e)
|
|
96 |
+ context.set_details(str(e))
|
|
97 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
91 | 98 |
|
92 | 99 |
return remote_execution_pb2.BatchReadBlobsResponse()
|
93 | 100 |
|