Arber Xhindoli pushed to branch arber/91-get-tree at BuildGrid / buildgrid
Commits:
-
1ceece5d
by Arber Xhindoli at 2018-11-30T16:15:49Z
-
da2a96b9
by Arber Xhindoli at 2018-11-30T16:15:56Z
-
e550ed1a
by Arber Xhindoli at 2018-11-30T16:15:56Z
4 changed files:
- buildgrid/client/cas.py
- buildgrid/server/cas/instance.py
- buildgrid/server/cas/service.py
- buildgrid/settings.py
Changes:
... | ... | @@ -23,19 +23,13 @@ from buildgrid._exceptions import NotFoundError |
23 | 23 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
24 | 24 |
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
25 | 25 |
from buildgrid._protos.google.rpc import code_pb2
|
26 |
-from buildgrid.settings import HASH
|
|
26 |
+from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
|
27 | 27 |
from buildgrid.utils import merkle_tree_maker
|
28 | 28 |
|
29 | 29 |
|
30 | 30 |
# Maximum size for a queueable file:
|
31 | 31 |
FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
|
32 | 32 |
|
33 |
-# Maximum size for a single gRPC request:
|
|
34 |
-MAX_REQUEST_SIZE = 2 * 1024 * 1024
|
|
35 |
- |
|
36 |
-# Maximum number of elements per gRPC request:
|
|
37 |
-MAX_REQUEST_COUNT = 500
|
|
38 |
- |
|
39 | 33 |
|
40 | 34 |
class _CallCache:
|
41 | 35 |
"""Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
|
... | ... | @@ -390,11 +384,10 @@ class Downloader: |
390 | 384 |
assert digest.hash in directories
|
391 | 385 |
|
392 | 386 |
directory = directories[digest.hash]
|
393 |
- self._write_directory(digest.hash, directory_path,
|
|
387 |
+ self._write_directory(directory, directory_path,
|
|
394 | 388 |
directories=directories, root_barrier=directory_path)
|
395 | 389 |
|
396 | 390 |
directory_fetched = True
|
397 |
- |
|
398 | 391 |
except grpc.RpcError as e:
|
399 | 392 |
status_code = e.code()
|
400 | 393 |
if status_code == grpc.StatusCode.UNIMPLEMENTED:
|
... | ... | @@ -24,7 +24,7 @@ import logging |
24 | 24 |
from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
|
25 | 25 |
from buildgrid._protos.google.bytestream import bytestream_pb2
|
26 | 26 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
27 |
-from buildgrid.settings import HASH, HASH_LENGTH
|
|
27 |
+from buildgrid.settings import HASH, HASH_LENGTH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
|
28 | 28 |
from buildgrid.utils import get_hash_type
|
29 | 29 |
|
30 | 30 |
|
... | ... | @@ -42,9 +42,7 @@ class ContentAddressableStorageInstance: |
42 | 42 |
return get_hash_type()
|
43 | 43 |
|
44 | 44 |
def max_batch_total_size_bytes(self):
|
45 |
- # TODO: link with max size
|
|
46 |
- # Should be added from settings in MR !119
|
|
47 |
- return 2000000
|
|
45 |
+ return MAX_REQUEST_SIZE
|
|
48 | 46 |
|
49 | 47 |
def symlink_absolute_path_strategy(self):
|
50 | 48 |
# Currently this strategy is hardcoded into BuildGrid
|
... | ... | @@ -72,6 +70,41 @@ class ContentAddressableStorageInstance: |
72 | 70 |
|
73 | 71 |
return response
|
74 | 72 |
|
73 |
+ def get_tree(self, request):
|
|
74 |
+ storage = self._storage
|
|
75 |
+ |
|
76 |
+ response = re_pb2.GetTreeResponse()
|
|
77 |
+ page_size = request.page_size
|
|
78 |
+ |
|
79 |
+ if not request.page_size:
|
|
80 |
+ request.page_size = MAX_REQUEST_COUNT
|
|
81 |
+ |
|
82 |
+ root_digest = request.root_digest
|
|
83 |
+ page_size = request.page_size
|
|
84 |
+ |
|
85 |
+ def __get_tree(node_digest):
|
|
86 |
+ nonlocal response, page_size, request
|
|
87 |
+ |
|
88 |
+ if not page_size:
|
|
89 |
+ page_size = request.page_size
|
|
90 |
+ yield response
|
|
91 |
+ response = re_pb2.GetTreeResponse()
|
|
92 |
+ |
|
93 |
+ if response.ByteSize() >= (MAX_REQUEST_SIZE):
|
|
94 |
+ yield response
|
|
95 |
+ response = re_pb2.GetTreeResponse()
|
|
96 |
+ |
|
97 |
+ directory_from_digest = storage.get_message(node_digest, re_pb2.Directory)
|
|
98 |
+ page_size -= 1
|
|
99 |
+ response.directories.extend([directory_from_digest])
|
|
100 |
+ |
|
101 |
+ for directory in directory_from_digest.directories:
|
|
102 |
+ yield from __get_tree(directory.digest)
|
|
103 |
+ |
|
104 |
+ yield response
|
|
105 |
+ |
|
106 |
+ return __get_tree(root_digest)
|
|
107 |
+ |
|
75 | 108 |
|
76 | 109 |
class ByteStreamInstance:
|
77 | 110 |
|
... | ... | @@ -86,10 +86,16 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
86 | 86 |
def GetTree(self, request, context):
|
87 | 87 |
self.__logger.debug("GetTree request from [%s]", context.peer())
|
88 | 88 |
|
89 |
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
90 |
- context.set_details('Method not implemented!')
|
|
89 |
+ try:
|
|
90 |
+ instance = self._get_instance(request.instance_name)
|
|
91 |
+ yield from instance.get_tree(request)
|
|
92 |
+ |
|
93 |
+ except InvalidArgumentError as e:
|
|
94 |
+ self.__logger.error(e)
|
|
95 |
+ context.set_details(str(e))
|
|
96 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
91 | 97 |
|
92 |
- return iter([remote_execution_pb2.GetTreeResponse()])
|
|
98 |
+ yield remote_execution_pb2.GetTreeResponse()
|
|
93 | 99 |
|
94 | 100 |
def _get_instance(self, instance_name):
|
95 | 101 |
try:
|
... | ... | @@ -24,3 +24,9 @@ HASH_LENGTH = HASH().digest_size * 2 |
24 | 24 |
|
25 | 25 |
# Period, in seconds, for the monitoring cycle:
|
26 | 26 |
MONITORING_PERIOD = 5.0
|
27 |
+ |
|
28 |
+# Maximum size for a single gRPC request:
|
|
29 |
+MAX_REQUEST_SIZE = 2 * 1024 * 1024
|
|
30 |
+ |
|
31 |
+# Maximum number of elements per gRPC request:
|
|
32 |
+MAX_REQUEST_COUNT = 500
|