Martin Blanchard pushed to branch master at BuildGrid / buildgrid
Commits:
-
100e91b9
by Arber Xhindoli at 2018-11-30T16:40:19Z
4 changed files:
- buildgrid/client/cas.py
- buildgrid/server/cas/instance.py
- buildgrid/server/cas/service.py
- buildgrid/settings.py
Changes:
| ... | ... | @@ -23,19 +23,13 @@ from buildgrid._exceptions import NotFoundError |
| 23 | 23 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
| 24 | 24 |
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
| 25 | 25 |
from buildgrid._protos.google.rpc import code_pb2
|
| 26 |
-from buildgrid.settings import HASH
|
|
| 26 |
+from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
|
| 27 | 27 |
from buildgrid.utils import merkle_tree_maker
|
| 28 | 28 |
|
| 29 | 29 |
|
| 30 | 30 |
# Maximum size for a queueable file:
|
| 31 | 31 |
FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
|
| 32 | 32 |
|
| 33 |
-# Maximum size for a single gRPC request:
|
|
| 34 |
-MAX_REQUEST_SIZE = 2 * 1024 * 1024
|
|
| 35 |
- |
|
| 36 |
-# Maximum number of elements per gRPC request:
|
|
| 37 |
-MAX_REQUEST_COUNT = 500
|
|
| 38 |
- |
|
| 39 | 33 |
|
| 40 | 34 |
class _CallCache:
|
| 41 | 35 |
"""Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
|
| ... | ... | @@ -390,11 +384,10 @@ class Downloader: |
| 390 | 384 |
assert digest.hash in directories
|
| 391 | 385 |
|
| 392 | 386 |
directory = directories[digest.hash]
|
| 393 |
- self._write_directory(digest.hash, directory_path,
|
|
| 387 |
+ self._write_directory(directory, directory_path,
|
|
| 394 | 388 |
directories=directories, root_barrier=directory_path)
|
| 395 | 389 |
|
| 396 | 390 |
directory_fetched = True
|
| 397 |
- |
|
| 398 | 391 |
except grpc.RpcError as e:
|
| 399 | 392 |
status_code = e.code()
|
| 400 | 393 |
if status_code == grpc.StatusCode.UNIMPLEMENTED:
|
| ... | ... | @@ -24,7 +24,7 @@ import logging |
| 24 | 24 |
from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
|
| 25 | 25 |
from buildgrid._protos.google.bytestream import bytestream_pb2
|
| 26 | 26 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
| 27 |
-from buildgrid.settings import HASH, HASH_LENGTH
|
|
| 27 |
+from buildgrid.settings import HASH, HASH_LENGTH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
|
| 28 | 28 |
from buildgrid.utils import get_hash_type
|
| 29 | 29 |
|
| 30 | 30 |
|
| ... | ... | @@ -42,9 +42,7 @@ class ContentAddressableStorageInstance: |
| 42 | 42 |
return get_hash_type()
|
| 43 | 43 |
|
| 44 | 44 |
def max_batch_total_size_bytes(self):
|
| 45 |
- # TODO: link with max size
|
|
| 46 |
- # Should be added from settings in MR !119
|
|
| 47 |
- return 2000000
|
|
| 45 |
+ return MAX_REQUEST_SIZE
|
|
| 48 | 46 |
|
| 49 | 47 |
def symlink_absolute_path_strategy(self):
|
| 50 | 48 |
# Currently this strategy is hardcoded into BuildGrid
|
| ... | ... | @@ -72,6 +70,41 @@ class ContentAddressableStorageInstance: |
| 72 | 70 |
|
| 73 | 71 |
return response
|
| 74 | 72 |
|
| 73 |
+ def get_tree(self, request):
|
|
| 74 |
+ storage = self._storage
|
|
| 75 |
+ |
|
| 76 |
+ response = re_pb2.GetTreeResponse()
|
|
| 77 |
+ page_size = request.page_size
|
|
| 78 |
+ |
|
| 79 |
+ if not request.page_size:
|
|
| 80 |
+ request.page_size = MAX_REQUEST_COUNT
|
|
| 81 |
+ |
|
| 82 |
+ root_digest = request.root_digest
|
|
| 83 |
+ page_size = request.page_size
|
|
| 84 |
+ |
|
| 85 |
+ def __get_tree(node_digest):
|
|
| 86 |
+ nonlocal response, page_size, request
|
|
| 87 |
+ |
|
| 88 |
+ if not page_size:
|
|
| 89 |
+ page_size = request.page_size
|
|
| 90 |
+ yield response
|
|
| 91 |
+ response = re_pb2.GetTreeResponse()
|
|
| 92 |
+ |
|
| 93 |
+ if response.ByteSize() >= (MAX_REQUEST_SIZE):
|
|
| 94 |
+ yield response
|
|
| 95 |
+ response = re_pb2.GetTreeResponse()
|
|
| 96 |
+ |
|
| 97 |
+ directory_from_digest = storage.get_message(node_digest, re_pb2.Directory)
|
|
| 98 |
+ page_size -= 1
|
|
| 99 |
+ response.directories.extend([directory_from_digest])
|
|
| 100 |
+ |
|
| 101 |
+ for directory in directory_from_digest.directories:
|
|
| 102 |
+ yield from __get_tree(directory.digest)
|
|
| 103 |
+ |
|
| 104 |
+ yield response
|
|
| 105 |
+ |
|
| 106 |
+ return __get_tree(root_digest)
|
|
| 107 |
+ |
|
| 75 | 108 |
|
| 76 | 109 |
class ByteStreamInstance:
|
| 77 | 110 |
|
| ... | ... | @@ -86,10 +86,16 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
| 86 | 86 |
def GetTree(self, request, context):
|
| 87 | 87 |
self.__logger.debug("GetTree request from [%s]", context.peer())
|
| 88 | 88 |
|
| 89 |
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
| 90 |
- context.set_details('Method not implemented!')
|
|
| 89 |
+ try:
|
|
| 90 |
+ instance = self._get_instance(request.instance_name)
|
|
| 91 |
+ yield from instance.get_tree(request)
|
|
| 92 |
+ |
|
| 93 |
+ except InvalidArgumentError as e:
|
|
| 94 |
+ self.__logger.error(e)
|
|
| 95 |
+ context.set_details(str(e))
|
|
| 96 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
| 91 | 97 |
|
| 92 |
- return iter([remote_execution_pb2.GetTreeResponse()])
|
|
| 98 |
+ yield remote_execution_pb2.GetTreeResponse()
|
|
| 93 | 99 |
|
| 94 | 100 |
def _get_instance(self, instance_name):
|
| 95 | 101 |
try:
|
| ... | ... | @@ -24,3 +24,9 @@ HASH_LENGTH = HASH().digest_size * 2 |
| 24 | 24 |
|
| 25 | 25 |
# Period, in seconds, for the monitoring cycle:
|
| 26 | 26 |
MONITORING_PERIOD = 5.0
|
| 27 |
+ |
|
| 28 |
+# Maximum size for a single gRPC request:
|
|
| 29 |
+MAX_REQUEST_SIZE = 2 * 1024 * 1024
|
|
| 30 |
+ |
|
| 31 |
+# Maximum number of elements per gRPC request:
|
|
| 32 |
+MAX_REQUEST_COUNT = 500
|
