Arber Xhindoli pushed to branch arber/91-get-tree at BuildGrid / buildgrid
Commits:
-
859c0fa8
by Finn at 2018-11-27T15:25:25Z
-
e1091b04
by Finn at 2018-11-27T15:25:25Z
-
1dc5d2d2
by Finn at 2018-11-27T15:25:25Z
-
35c901bd
by Finn at 2018-11-27T16:23:49Z
-
5cb38e2a
by Finn at 2018-11-27T16:23:52Z
-
32ad653d
by Finn at 2018-11-27T16:23:52Z
-
e15a9c91
by Finn at 2018-11-27T16:23:52Z
-
bd5587ea
by Finn at 2018-11-27T16:23:52Z
-
d94fa258
by Finn at 2018-11-27T16:23:52Z
-
6f90a553
by Finn at 2018-11-27T16:23:52Z
-
db65c5ec
by Raoul Hidalgo Charman at 2018-11-28T12:23:41Z
-
e482d7c8
by Arber Xhindoli at 2018-11-28T17:21:06Z
-
f07b9259
by Arber Xhindoli at 2018-11-28T17:21:06Z
-
02f93f54
by Arber Xhindoli at 2018-11-28T17:21:06Z
-
c8ca2f59
by Arber Xhindoli at 2018-11-28T17:21:06Z
-
04c93d5b
by Arber Xhindoli at 2018-11-28T17:22:25Z
-
41e2e11a
by Arber Xhindoli at 2018-11-28T17:22:28Z
-
83b9a9a0
by Arber Xhindoli at 2018-11-28T17:22:28Z
-
99baa2c1
by Arber Xhindoli at 2018-11-28T17:22:28Z
-
a2d050cf
by Arber Xhindoli at 2018-11-28T18:11:54Z
18 changed files:
- + buildgrid/_app/commands/cmd_capabilities.py
- + buildgrid/client/capabilities.py
- buildgrid/client/cas.py
- + buildgrid/server/capabilities/__init__.py
- + buildgrid/server/capabilities/instance.py
- + buildgrid/server/capabilities/service.py
- buildgrid/server/cas/instance.py
- buildgrid/server/cas/service.py
- buildgrid/server/execution/instance.py
- buildgrid/server/instance.py
- buildgrid/settings.py
- buildgrid/utils.py
- tests/cas/data/hello/hello.h → tests/cas/data/hello/hello2/hello.h
- + tests/cas/data/hello/hello3/hello4/hello5/hello.h
- tests/cas/test_client.py
- tests/cas/test_storage.py
- + tests/integration/capabilities_service.py
- + tests/utils/capabilities.py
Changes:
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import sys
|
|
17 |
+from urllib.parse import urlparse
|
|
18 |
+ |
|
19 |
+import click
|
|
20 |
+import grpc
|
|
21 |
+ |
|
22 |
+from buildgrid.client.capabilities import CapabilitiesInterface
|
|
23 |
+ |
|
24 |
+from ..cli import pass_context
|
|
25 |
+ |
|
26 |
+ |
|
27 |
+@click.command(name='capabilities', short_help="Capabilities service.")
|
|
28 |
+@click.option('--remote', type=click.STRING, default='http://localhost:50051', show_default=True,
|
|
29 |
+ help="Remote execution server's URL (port defaults to 50051 if no specified).")
|
|
30 |
+@click.option('--client-key', type=click.Path(exists=True, dir_okay=False), default=None,
|
|
31 |
+ help="Private client key for TLS (PEM-encoded)")
|
|
32 |
+@click.option('--client-cert', type=click.Path(exists=True, dir_okay=False), default=None,
|
|
33 |
+ help="Public client certificate for TLS (PEM-encoded)")
|
|
34 |
+@click.option('--server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
|
|
35 |
+ help="Public server certificate for TLS (PEM-encoded)")
|
|
36 |
+@click.option('--instance-name', type=click.STRING, default='main', show_default=True,
|
|
37 |
+ help="Targeted farm instance name.")
|
|
38 |
+@pass_context
|
|
39 |
+def cli(context, remote, instance_name, client_key, client_cert, server_cert):
|
|
40 |
+ click.echo("Getting capabilities...")
|
|
41 |
+ url = urlparse(remote)
|
|
42 |
+ |
|
43 |
+ remote = '{}:{}'.format(url.hostname, url.port or 50051)
|
|
44 |
+ instance_name = instance_name
|
|
45 |
+ |
|
46 |
+ if url.scheme == 'http':
|
|
47 |
+ channel = grpc.insecure_channel(remote)
|
|
48 |
+ else:
|
|
49 |
+ credentials = context.load_client_credentials(client_key, client_cert, server_cert)
|
|
50 |
+ if not credentials:
|
|
51 |
+ click.echo("ERROR: no TLS keys were specified and no defaults could be found.", err=True)
|
|
52 |
+ sys.exit(-1)
|
|
53 |
+ |
|
54 |
+ channel = grpc.secure_channel(remote, credentials)
|
|
55 |
+ |
|
56 |
+ interface = CapabilitiesInterface(channel)
|
|
57 |
+ response = interface.get_capabilities(instance_name)
|
|
58 |
+ click.echo(response)
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import logging
|
|
17 |
+import grpc
|
|
18 |
+ |
|
19 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
|
20 |
+ |
|
21 |
+ |
|
22 |
+class CapabilitiesInterface:
|
|
23 |
+ """Interface for calls the the Capabilities Service."""
|
|
24 |
+ |
|
25 |
+ def __init__(self, channel):
|
|
26 |
+ """Initialises an instance of the capabilities service.
|
|
27 |
+ |
|
28 |
+ Args:
|
|
29 |
+ channel (grpc.Channel): A gRPC channel to the CAS endpoint.
|
|
30 |
+ """
|
|
31 |
+ self.__logger = logging.getLogger(__name__)
|
|
32 |
+ self.__stub = remote_execution_pb2_grpc.CapabilitiesStub(channel)
|
|
33 |
+ |
|
34 |
+ def get_capabilities(self, instance_name):
|
|
35 |
+ """Returns the capabilities or the server to the user.
|
|
36 |
+ |
|
37 |
+ Args:
|
|
38 |
+ instance_name (str): The name of the instance."""
|
|
39 |
+ |
|
40 |
+ request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=instance_name)
|
|
41 |
+ try:
|
|
42 |
+ return self.__stub.GetCapabilities(request)
|
|
43 |
+ |
|
44 |
+ except grpc.RpcError as e:
|
|
45 |
+ self.__logger.error(e)
|
|
46 |
+ raise
|
... | ... | @@ -23,19 +23,13 @@ from buildgrid._exceptions import NotFoundError |
23 | 23 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
24 | 24 |
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
25 | 25 |
from buildgrid._protos.google.rpc import code_pb2
|
26 |
-from buildgrid.settings import HASH
|
|
26 |
+from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
|
27 | 27 |
from buildgrid.utils import merkle_tree_maker
|
28 | 28 |
|
29 | 29 |
|
30 | 30 |
# Maximum size for a queueable file:
|
31 | 31 |
FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
|
32 | 32 |
|
33 |
-# Maximum size for a single gRPC request:
|
|
34 |
-MAX_REQUEST_SIZE = 2 * 1024 * 1024
|
|
35 |
- |
|
36 |
-# Maximum number of elements per gRPC request:
|
|
37 |
-MAX_REQUEST_COUNT = 500
|
|
38 |
- |
|
39 | 33 |
|
40 | 34 |
class _CallCache:
|
41 | 35 |
"""Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
|
... | ... | @@ -390,11 +384,10 @@ class Downloader: |
390 | 384 |
assert digest.hash in directories
|
391 | 385 |
|
392 | 386 |
directory = directories[digest.hash]
|
393 |
- self._write_directory(digest.hash, directory_path,
|
|
387 |
+ self._write_directory(directory, directory_path,
|
|
394 | 388 |
directories=directories, root_barrier=directory_path)
|
395 | 389 |
|
396 | 390 |
directory_fetched = True
|
397 |
- |
|
398 | 391 |
except grpc.RpcError as e:
|
399 | 392 |
status_code = e.code()
|
400 | 393 |
if status_code == grpc.StatusCode.UNIMPLEMENTED:
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import logging
|
|
17 |
+ |
|
18 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
19 |
+ |
|
20 |
+ |
|
21 |
+class CapabilitiesInstance:
|
|
22 |
+ |
|
23 |
+ def __init__(self, cas_instance=None, action_cache_instance=None, execution_instance=None):
|
|
24 |
+ self.__logger = logging.getLogger(__name__)
|
|
25 |
+ self.__cas_instance = cas_instance
|
|
26 |
+ self.__action_cache_instance = action_cache_instance
|
|
27 |
+ self.__execution_instance = execution_instance
|
|
28 |
+ |
|
29 |
+ def register_instance_with_server(self, instance_name, server):
|
|
30 |
+ server.add_capabilities_instance(self, instance_name)
|
|
31 |
+ |
|
32 |
+ def add_cas_instance(self, cas_instance):
|
|
33 |
+ self.__cas_instance = cas_instance
|
|
34 |
+ |
|
35 |
+ def add_action_cache_instance(self, action_cache_instance):
|
|
36 |
+ self.__action_cache_instance = action_cache_instance
|
|
37 |
+ |
|
38 |
+ def add_execution_instance(self, execution_instance):
|
|
39 |
+ self.__execution_instance = execution_instance
|
|
40 |
+ |
|
41 |
+ def get_capabilities(self):
|
|
42 |
+ server_capabilities = remote_execution_pb2.ServerCapabilities()
|
|
43 |
+ server_capabilities.cache_capabilities.CopyFrom(self._get_cache_capabilities())
|
|
44 |
+ server_capabilities.execution_capabilities.CopyFrom(self._get_capabilities_execution())
|
|
45 |
+ # TODO
|
|
46 |
+ # When API is stable, fill out SemVer values
|
|
47 |
+ # server_capabilities.deprecated_api_version =
|
|
48 |
+ # server_capabilities.low_api_version =
|
|
49 |
+ # server_capabilities.low_api_version =
|
|
50 |
+ # server_capabilities.hig_api_version =
|
|
51 |
+ return server_capabilities
|
|
52 |
+ |
|
53 |
+ def _get_cache_capabilities(self):
|
|
54 |
+ capabilities = remote_execution_pb2.CacheCapabilities()
|
|
55 |
+ action_cache_update_capabilities = remote_execution_pb2.ActionCacheUpdateCapabilities()
|
|
56 |
+ |
|
57 |
+ if self.__cas_instance:
|
|
58 |
+ capabilities.digest_function.extend([self.__cas_instance.hash_type()])
|
|
59 |
+ capabilities.max_batch_total_size_bytes = self.__cas_instance.max_batch_total_size_bytes()
|
|
60 |
+ capabilities.symlink_absolute_path_strategy = self.__cas_instance.symlink_absolute_path_strategy()
|
|
61 |
+ # TODO: execution priority #102
|
|
62 |
+ # capabilities.cache_priority_capabilities =
|
|
63 |
+ |
|
64 |
+ if self.__action_cache_instance:
|
|
65 |
+ action_cache_update_capabilities.update_enabled = self.__action_cache_instance.allow_updates
|
|
66 |
+ |
|
67 |
+ capabilities.action_cache_update_capabilities.CopyFrom(action_cache_update_capabilities)
|
|
68 |
+ return capabilities
|
|
69 |
+ |
|
70 |
+ def _get_capabilities_execution(self):
|
|
71 |
+ capabilities = remote_execution_pb2.ExecutionCapabilities()
|
|
72 |
+ if self.__execution_instance:
|
|
73 |
+ capabilities.exec_enabled = True
|
|
74 |
+ capabilities.digest_function = self.__execution_instance.hash_type()
|
|
75 |
+ # TODO: execution priority #102
|
|
76 |
+ # capabilities.execution_priority =
|
|
77 |
+ |
|
78 |
+ else:
|
|
79 |
+ capabilities.exec_enabled = False
|
|
80 |
+ |
|
81 |
+ return capabilities
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import logging
|
|
17 |
+ |
|
18 |
+import grpc
|
|
19 |
+ |
|
20 |
+from buildgrid._exceptions import InvalidArgumentError
|
|
21 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
|
22 |
+ |
|
23 |
+ |
|
24 |
+class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
|
|
25 |
+ |
|
26 |
+ def __init__(self, server):
|
|
27 |
+ self.__logger = logging.getLogger(__name__)
|
|
28 |
+ self.__instances = {}
|
|
29 |
+ remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(self, server)
|
|
30 |
+ |
|
31 |
+ def add_instance(self, name, instance):
|
|
32 |
+ self.__instances[name] = instance
|
|
33 |
+ |
|
34 |
+ def add_cas_instance(self, name, instance):
|
|
35 |
+ self.__instances[name].add_cas_instance(instance)
|
|
36 |
+ |
|
37 |
+ def add_action_cache_instance(self, name, instance):
|
|
38 |
+ self.__instances[name].add_action_cache_instance(instance)
|
|
39 |
+ |
|
40 |
+ def add_execution_instance(self, name, instance):
|
|
41 |
+ self.__instances[name].add_execution_instance(instance)
|
|
42 |
+ |
|
43 |
+ def GetCapabilities(self, request, context):
|
|
44 |
+ try:
|
|
45 |
+ instance = self._get_instance(request.instance_name)
|
|
46 |
+ return instance.get_capabilities()
|
|
47 |
+ |
|
48 |
+ except InvalidArgumentError as e:
|
|
49 |
+ self.__logger.error(e)
|
|
50 |
+ context.set_details(str(e))
|
|
51 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
52 |
+ |
|
53 |
+ return remote_execution_pb2.ServerCapabilities()
|
|
54 |
+ |
|
55 |
+ def _get_instance(self, name):
|
|
56 |
+ try:
|
|
57 |
+ return self.__instances[name]
|
|
58 |
+ |
|
59 |
+ except KeyError:
|
|
60 |
+ raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))
|
... | ... | @@ -24,7 +24,8 @@ import logging |
24 | 24 |
from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
|
25 | 25 |
from buildgrid._protos.google.bytestream import bytestream_pb2
|
26 | 26 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
27 |
-from buildgrid.settings import HASH, HASH_LENGTH
|
|
27 |
+from buildgrid.settings import HASH, HASH_LENGTH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
|
28 |
+from buildgrid.utils import get_hash_type
|
|
28 | 29 |
|
29 | 30 |
|
30 | 31 |
class ContentAddressableStorageInstance:
|
... | ... | @@ -37,6 +38,17 @@ class ContentAddressableStorageInstance: |
37 | 38 |
def register_instance_with_server(self, instance_name, server):
|
38 | 39 |
server.add_cas_instance(self, instance_name)
|
39 | 40 |
|
41 |
+ def hash_type(self):
|
|
42 |
+ return get_hash_type()
|
|
43 |
+ |
|
44 |
+ def max_batch_total_size_bytes(self):
|
|
45 |
+ return MAX_REQUEST_SIZE
|
|
46 |
+ |
|
47 |
+ def symlink_absolute_path_strategy(self):
|
|
48 |
+ # Currently this strategy is hardcoded into BuildGrid
|
|
49 |
+ # With no setting to reference
|
|
50 |
+ return re_pb2.CacheCapabilities().DISALLOWED
|
|
51 |
+ |
|
40 | 52 |
def find_missing_blobs(self, blob_digests):
|
41 | 53 |
storage = self._storage
|
42 | 54 |
return re_pb2.FindMissingBlobsResponse(
|
... | ... | @@ -58,6 +70,39 @@ class ContentAddressableStorageInstance: |
58 | 70 |
|
59 | 71 |
return response
|
60 | 72 |
|
73 |
+ def get_tree(self, request):
|
|
74 |
+ storage = self._storage
|
|
75 |
+ response = re_pb2.GetTreeResponse()
|
|
76 |
+ |
|
77 |
+ if not request.page_size:
|
|
78 |
+ request.page_size = MAX_REQUEST_COUNT
|
|
79 |
+ |
|
80 |
+ root_digest = request.root_digest
|
|
81 |
+ page_size = request.page_size
|
|
82 |
+ |
|
83 |
+ def __get_tree(node_digest):
|
|
84 |
+ nonlocal response, page_size, request
|
|
85 |
+ |
|
86 |
+ if not page_size:
|
|
87 |
+ page_size = request.page_size
|
|
88 |
+ yield response
|
|
89 |
+ response = re_pb2.GetTreeResponse()
|
|
90 |
+ |
|
91 |
+ if response.ByteSize() >= (MAX_REQUEST_SIZE):
|
|
92 |
+ yield response
|
|
93 |
+ response = re_pb2.GetTreeResponse()
|
|
94 |
+ |
|
95 |
+ directory_from_digest = storage.get_message(node_digest, re_pb2.Directory)
|
|
96 |
+ page_size -= 1
|
|
97 |
+ response.directories.extend([directory_from_digest])
|
|
98 |
+ |
|
99 |
+ for directory in directory_from_digest.directories:
|
|
100 |
+ yield from __get_tree(directory.digest)
|
|
101 |
+ |
|
102 |
+ yield response
|
|
103 |
+ |
|
104 |
+ return __get_tree(root_digest)
|
|
105 |
+ |
|
61 | 106 |
|
62 | 107 |
class ByteStreamInstance:
|
63 | 108 |
|
... | ... | @@ -86,10 +86,16 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
86 | 86 |
def GetTree(self, request, context):
|
87 | 87 |
self.__logger.debug("GetTree request from [%s]", context.peer())
|
88 | 88 |
|
89 |
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
90 |
- context.set_details('Method not implemented!')
|
|
89 |
+ try:
|
|
90 |
+ instance = self._get_instance(request.instance_name)
|
|
91 |
+ yield from instance.get_tree(request)
|
|
92 |
+ |
|
93 |
+ except InvalidArgumentError as e:
|
|
94 |
+ self.__logger.error(e)
|
|
95 |
+ context.set_details(str(e))
|
|
96 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
91 | 97 |
|
92 |
- return iter([remote_execution_pb2.GetTreeResponse()])
|
|
98 |
+ yield remote_execution_pb2.GetTreeResponse()
|
|
93 | 99 |
|
94 | 100 |
def _get_instance(self, instance_name):
|
95 | 101 |
try:
|
... | ... | @@ -25,6 +25,7 @@ from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError |
25 | 25 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
|
26 | 26 |
|
27 | 27 |
from ..job import Job
|
28 |
+from ...utils import get_hash_type
|
|
28 | 29 |
|
29 | 30 |
|
30 | 31 |
class ExecutionInstance:
|
... | ... | @@ -38,6 +39,9 @@ class ExecutionInstance: |
38 | 39 |
def register_instance_with_server(self, instance_name, server):
|
39 | 40 |
server.add_execution_instance(self, instance_name)
|
40 | 41 |
|
42 |
+ def hash_type(self):
|
|
43 |
+ return get_hash_type()
|
|
44 |
+ |
|
41 | 45 |
def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
42 | 46 |
""" Sends a job for execution.
|
43 | 47 |
Queues an action and creates an Operation instance to be associated with
|
... | ... | @@ -28,6 +28,8 @@ from buildgrid.server.execution.service import ExecutionService |
28 | 28 |
from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
|
29 | 29 |
from buildgrid.server.operations.service import OperationsService
|
30 | 30 |
from buildgrid.server.referencestorage.service import ReferenceStorageService
|
31 |
+from buildgrid.server.capabilities.instance import CapabilitiesInstance
|
|
32 |
+from buildgrid.server.capabilities.service import CapabilitiesService
|
|
31 | 33 |
|
32 | 34 |
|
33 | 35 |
class BuildGridServer:
|
... | ... | @@ -55,6 +57,9 @@ class BuildGridServer: |
55 | 57 |
self.__main_loop = asyncio.get_event_loop()
|
56 | 58 |
self.__monitoring_bus = None
|
57 | 59 |
|
60 |
+ # We always want a capabilities service
|
|
61 |
+ self._capabilities_service = CapabilitiesService(self.__grpc_server)
|
|
62 |
+ |
|
58 | 63 |
self._execution_service = None
|
59 | 64 |
self._bots_service = None
|
60 | 65 |
self._operations_service = None
|
... | ... | @@ -128,6 +133,7 @@ class BuildGridServer: |
128 | 133 |
self._execution_service = ExecutionService(self.__grpc_server)
|
129 | 134 |
|
130 | 135 |
self._execution_service.add_instance(instance_name, instance)
|
136 |
+ self._add_capabilities_instance(instance_name, execution_instance=instance)
|
|
131 | 137 |
|
132 | 138 |
def add_bots_interface(self, instance, instance_name):
|
133 | 139 |
"""Adds a :obj:`BotsInterface` to the service.
|
... | ... | @@ -184,9 +190,10 @@ class BuildGridServer: |
184 | 190 |
self._action_cache_service = ActionCacheService(self.__grpc_server)
|
185 | 191 |
|
186 | 192 |
self._action_cache_service.add_instance(instance_name, instance)
|
193 |
+ self._add_capabilities_instance(instance_name, action_cache_instance=instance)
|
|
187 | 194 |
|
188 | 195 |
def add_cas_instance(self, instance, instance_name):
|
189 |
- """Stores a :obj:`ContentAddressableStorageInstance` to the service.
|
|
196 |
+ """Adds a :obj:`ContentAddressableStorageInstance` to the service.
|
|
190 | 197 |
|
191 | 198 |
If no service exists, it creates one.
|
192 | 199 |
|
... | ... | @@ -198,9 +205,10 @@ class BuildGridServer: |
198 | 205 |
self._cas_service = ContentAddressableStorageService(self.__grpc_server)
|
199 | 206 |
|
200 | 207 |
self._cas_service.add_instance(instance_name, instance)
|
208 |
+ self._add_capabilities_instance(instance_name, cas_instance=instance)
|
|
201 | 209 |
|
202 | 210 |
def add_bytestream_instance(self, instance, instance_name):
|
203 |
- """Stores a :obj:`ByteStreamInstance` to the service.
|
|
211 |
+ """Adds a :obj:`ByteStreamInstance` to the service.
|
|
204 | 212 |
|
205 | 213 |
If no service exists, it creates one.
|
206 | 214 |
|
... | ... | @@ -213,6 +221,31 @@ class BuildGridServer: |
213 | 221 |
|
214 | 222 |
self._bytestream_service.add_instance(instance_name, instance)
|
215 | 223 |
|
224 |
+ def _add_capabilities_instance(self, instance_name,
|
|
225 |
+ cas_instance=None,
|
|
226 |
+ action_cache_instance=None,
|
|
227 |
+ execution_instance=None):
|
|
228 |
+ """Adds a :obj:`CapabilitiesInstance` to the service.
|
|
229 |
+ |
|
230 |
+ Args:
|
|
231 |
+ instance (:obj:`CapabilitiesInstance`): Instance to add.
|
|
232 |
+ instance_name (str): Instance name.
|
|
233 |
+ """
|
|
234 |
+ |
|
235 |
+ try:
|
|
236 |
+ if cas_instance:
|
|
237 |
+ self._capabilities_service.add_cas_instance(instance_name, cas_instance)
|
|
238 |
+ if action_cache_instance:
|
|
239 |
+ self._capabilities_service.add_action_cache_instance(instance_name, action_cache_instance)
|
|
240 |
+ if execution_instance:
|
|
241 |
+ self._capabilities_service.add_execution_instance(instance_name, execution_instance)
|
|
242 |
+ |
|
243 |
+ except KeyError:
|
|
244 |
+ capabilities_instance = CapabilitiesInstance(cas_instance,
|
|
245 |
+ action_cache_instance,
|
|
246 |
+ execution_instance)
|
|
247 |
+ self._capabilities_service.add_instance(instance_name, capabilities_instance)
|
|
248 |
+ |
|
216 | 249 |
# --- Public API: Monitoring ---
|
217 | 250 |
|
218 | 251 |
@property
|
... | ... | @@ -4,3 +4,9 @@ import hashlib |
4 | 4 |
# The hash function that CAS uses
|
5 | 5 |
HASH = hashlib.sha256
|
6 | 6 |
HASH_LENGTH = HASH().digest_size * 2
|
7 |
+ |
|
8 |
+# Maximum size for a single gRPC request:
|
|
9 |
+MAX_REQUEST_SIZE = 2 * 1024 * 1024
|
|
10 |
+ |
|
11 |
+# Maximum number of elements per gRPC request:
|
|
12 |
+MAX_REQUEST_COUNT = 500
|
... | ... | @@ -30,6 +30,14 @@ def get_hostname(): |
30 | 30 |
return socket.gethostname()
|
31 | 31 |
|
32 | 32 |
|
33 |
+def get_hash_type():
|
|
34 |
+ """Returns the hash type."""
|
|
35 |
+ hash_name = HASH().name
|
|
36 |
+ if hash_name == "sha256":
|
|
37 |
+ return remote_execution_pb2.SHA256
|
|
38 |
+ return remote_execution_pb2.UNKNOWN
|
|
39 |
+ |
|
40 |
+ |
|
33 | 41 |
def create_digest(bytes_to_digest):
|
34 | 42 |
"""Computes the :obj:`Digest` of a piece of data.
|
35 | 43 |
|
1 |
+#define HELLO_WORLD "Hello, World!"
|
... | ... | @@ -40,16 +40,31 @@ MESSAGES = [ |
40 | 40 |
]
|
41 | 41 |
DATA_DIR = os.path.join(
|
42 | 42 |
os.path.dirname(os.path.realpath(__file__)), 'data')
|
43 |
+ |
|
44 |
+HELLO_DIR = os.path.join(DATA_DIR, 'hello')
|
|
45 |
+HELLO2_DIR = os.path.join(HELLO_DIR, 'hello2')
|
|
46 |
+HELLO3_DIR = os.path.join(HELLO_DIR, 'hello3')
|
|
47 |
+HELLO4_DIR = os.path.join(HELLO3_DIR, 'hello4')
|
|
48 |
+HELLO5_DIR = os.path.join(HELLO4_DIR, 'hello5')
|
|
49 |
+ |
|
43 | 50 |
FILES = [
|
44 | 51 |
(os.path.join(DATA_DIR, 'void'),),
|
45 | 52 |
(os.path.join(DATA_DIR, 'hello.cc'),),
|
46 | 53 |
(os.path.join(DATA_DIR, 'hello', 'hello.c'),
|
47 |
- os.path.join(DATA_DIR, 'hello', 'hello.h'))]
|
|
54 |
+ os.path.join(DATA_DIR, 'hello', 'hello.sh')),
|
|
55 |
+ (os.path.join(HELLO2_DIR, 'hello.h'),),
|
|
56 |
+ (os.path.join(HELLO5_DIR, 'hello.h'),), ]
|
|
57 |
+ |
|
48 | 58 |
FOLDERS = [
|
49 |
- (os.path.join(DATA_DIR, 'hello'),)]
|
|
59 |
+ (HELLO_DIR, HELLO2_DIR, HELLO3_DIR, HELLO4_DIR, HELLO5_DIR)]
|
|
60 |
+ |
|
50 | 61 |
DIRECTORIES = [
|
51 |
- (os.path.join(DATA_DIR, 'hello'),),
|
|
52 |
- (os.path.join(DATA_DIR, 'hello'), DATA_DIR)]
|
|
62 |
+ (HELLO_DIR,),
|
|
63 |
+ (DATA_DIR,),
|
|
64 |
+ (HELLO2_DIR,),
|
|
65 |
+ (HELLO3_DIR,),
|
|
66 |
+ (HELLO4_DIR,),
|
|
67 |
+ (HELLO5_DIR,), ]
|
|
53 | 68 |
|
54 | 69 |
|
55 | 70 |
@pytest.mark.parametrize('blobs', BLOBS)
|
... | ... | @@ -21,8 +21,8 @@ import tempfile |
21 | 21 |
|
22 | 22 |
import boto3
|
23 | 23 |
import grpc
|
24 |
-import pytest
|
|
25 | 24 |
from moto import mock_s3
|
25 |
+import pytest
|
|
26 | 26 |
|
27 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
28 | 28 |
from buildgrid.server.cas.storage.remote import RemoteStorage
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+# pylint: disable=redefined-outer-name
|
|
16 |
+ |
|
17 |
+ |
|
18 |
+import grpc
|
|
19 |
+import pytest
|
|
20 |
+ |
|
21 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
22 |
+from buildgrid.client.capabilities import CapabilitiesInterface
|
|
23 |
+from buildgrid.server.controller import ExecutionController
|
|
24 |
+from buildgrid.server.actioncache.storage import ActionCache
|
|
25 |
+from buildgrid.server.cas.instance import ContentAddressableStorageInstance
|
|
26 |
+from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
|
|
27 |
+ |
|
28 |
+from ..utils.utils import run_in_subprocess
|
|
29 |
+from ..utils.capabilities import serve_capabilities_service
|
|
30 |
+ |
|
31 |
+ |
|
32 |
+INSTANCES = ['', 'instance']
|
|
33 |
+ |
|
34 |
+ |
|
35 |
+# Use subprocess to avoid creation of gRPC threads in main process
|
|
36 |
+# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
|
|
37 |
+# Multiprocessing uses pickle which protobufs don't work with
|
|
38 |
+# Workaround wrapper to send messages as strings
|
|
39 |
+class ServerInterface:
|
|
40 |
+ |
|
41 |
+ def __init__(self, remote):
|
|
42 |
+ self.__remote = remote
|
|
43 |
+ |
|
44 |
+ def get_capabilities(self, instance_name):
|
|
45 |
+ |
|
46 |
+ def __get_capabilities(queue, remote, instance_name):
|
|
47 |
+ interface = CapabilitiesInterface(grpc.insecure_channel(remote))
|
|
48 |
+ |
|
49 |
+ result = interface.get_capabilities(instance_name)
|
|
50 |
+ queue.put(result.SerializeToString())
|
|
51 |
+ |
|
52 |
+ result = run_in_subprocess(__get_capabilities,
|
|
53 |
+ self.__remote, instance_name)
|
|
54 |
+ |
|
55 |
+ capabilities = remote_execution_pb2.ServerCapabilities()
|
|
56 |
+ capabilities.ParseFromString(result)
|
|
57 |
+ return capabilities
|
|
58 |
+ |
|
59 |
+ |
|
60 |
+@pytest.mark.parametrize('instance', INSTANCES)
|
|
61 |
+def test_execution_not_available_capabilities(instance):
|
|
62 |
+ with serve_capabilities_service([instance]) as server:
|
|
63 |
+ server_interface = ServerInterface(server.remote)
|
|
64 |
+ response = server_interface.get_capabilities(instance)
|
|
65 |
+ |
|
66 |
+ assert not response.execution_capabilities.exec_enabled
|
|
67 |
+ |
|
68 |
+ |
|
69 |
+@pytest.mark.parametrize('instance', INSTANCES)
|
|
70 |
+def test_execution_available_capabilities(instance):
|
|
71 |
+ controller = ExecutionController()
|
|
72 |
+ |
|
73 |
+ with serve_capabilities_service([instance],
|
|
74 |
+ execution_instance=controller.execution_instance) as server:
|
|
75 |
+ server_interface = ServerInterface(server.remote)
|
|
76 |
+ response = server_interface.get_capabilities(instance)
|
|
77 |
+ |
|
78 |
+ assert response.execution_capabilities.exec_enabled
|
|
79 |
+ assert response.execution_capabilities.digest_function
|
|
80 |
+ |
|
81 |
+ |
|
82 |
+@pytest.mark.parametrize('instance', INSTANCES)
|
|
83 |
+def test_action_cache_allow_updates_capabilities(instance):
|
|
84 |
+ storage = LRUMemoryCache(limit=256)
|
|
85 |
+ action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=True)
|
|
86 |
+ |
|
87 |
+ with serve_capabilities_service([instance],
|
|
88 |
+ action_cache_instance=action_cache) as server:
|
|
89 |
+ server_interface = ServerInterface(server.remote)
|
|
90 |
+ response = server_interface.get_capabilities(instance)
|
|
91 |
+ |
|
92 |
+ assert response.cache_capabilities.action_cache_update_capabilities.update_enabled
|
|
93 |
+ |
|
94 |
+ |
|
95 |
+@pytest.mark.parametrize('instance', INSTANCES)
|
|
96 |
+def test_action_cache_not_allow_updates_capabilities(instance):
|
|
97 |
+ storage = LRUMemoryCache(limit=256)
|
|
98 |
+ action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=False)
|
|
99 |
+ |
|
100 |
+ with serve_capabilities_service([instance],
|
|
101 |
+ action_cache_instance=action_cache) as server:
|
|
102 |
+ server_interface = ServerInterface(server.remote)
|
|
103 |
+ response = server_interface.get_capabilities(instance)
|
|
104 |
+ |
|
105 |
+ assert not response.cache_capabilities.action_cache_update_capabilities.update_enabled
|
|
106 |
+ |
|
107 |
+ |
|
108 |
+@pytest.mark.parametrize('instance', INSTANCES)
|
|
109 |
+def test_cas_capabilities(instance):
|
|
110 |
+ cas = ContentAddressableStorageInstance(None)
|
|
111 |
+ |
|
112 |
+ with serve_capabilities_service([instance],
|
|
113 |
+ cas_instance=cas) as server:
|
|
114 |
+ server_interface = ServerInterface(server.remote)
|
|
115 |
+ response = server_interface.get_capabilities(instance)
|
|
116 |
+ |
|
117 |
+ assert len(response.cache_capabilities.digest_function) == 1
|
|
118 |
+ assert response.cache_capabilities.digest_function[0]
|
|
119 |
+ assert response.cache_capabilities.symlink_absolute_path_strategy
|
|
120 |
+ assert response.cache_capabilities.max_batch_total_size_bytes
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+from concurrent import futures
|
|
17 |
+from contextlib import contextmanager
|
|
18 |
+import multiprocessing
|
|
19 |
+import os
|
|
20 |
+import signal
|
|
21 |
+ |
|
22 |
+import grpc
|
|
23 |
+import pytest_cov
|
|
24 |
+ |
|
25 |
+from buildgrid.server.capabilities.service import CapabilitiesService
|
|
26 |
+from buildgrid.server.capabilities.instance import CapabilitiesInstance
|
|
27 |
+ |
|
28 |
+ |
|
29 |
+@contextmanager
|
|
30 |
+def serve_capabilities_service(instances,
|
|
31 |
+ cas_instance=None,
|
|
32 |
+ action_cache_instance=None,
|
|
33 |
+ execution_instance=None):
|
|
34 |
+ server = Server(instances,
|
|
35 |
+ cas_instance,
|
|
36 |
+ action_cache_instance,
|
|
37 |
+ execution_instance)
|
|
38 |
+ try:
|
|
39 |
+ yield server
|
|
40 |
+ finally:
|
|
41 |
+ server.quit()
|
|
42 |
+ |
|
43 |
+ |
|
44 |
+class Server:
|
|
45 |
+ |
|
46 |
+ def __init__(self, instances,
|
|
47 |
+ cas_instance=None,
|
|
48 |
+ action_cache_instance=None,
|
|
49 |
+ execution_instance=None):
|
|
50 |
+ self.instances = instances
|
|
51 |
+ |
|
52 |
+ self.__queue = multiprocessing.Queue()
|
|
53 |
+ self.__process = multiprocessing.Process(
|
|
54 |
+ target=Server.serve,
|
|
55 |
+ args=(self.__queue, self.instances, cas_instance, action_cache_instance, execution_instance))
|
|
56 |
+ self.__process.start()
|
|
57 |
+ |
|
58 |
+ self.port = self.__queue.get(timeout=1)
|
|
59 |
+ self.remote = 'localhost:{}'.format(self.port)
|
|
60 |
+ |
|
61 |
+ @staticmethod
|
|
62 |
+ def serve(queue, instances, cas_instance, action_cache_instance, execution_instance):
|
|
63 |
+ pytest_cov.embed.cleanup_on_sigterm()
|
|
64 |
+ |
|
65 |
+ # Use max_workers default from Python 3.5+
|
|
66 |
+ max_workers = (os.cpu_count() or 1) * 5
|
|
67 |
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
|
68 |
+ port = server.add_insecure_port('localhost:0')
|
|
69 |
+ |
|
70 |
+ capabilities_service = CapabilitiesService(server)
|
|
71 |
+ for name in instances:
|
|
72 |
+ capabilities_instance = CapabilitiesInstance(cas_instance, action_cache_instance, execution_instance)
|
|
73 |
+ capabilities_service.add_instance(name, capabilities_instance)
|
|
74 |
+ |
|
75 |
+ server.start()
|
|
76 |
+ queue.put(port)
|
|
77 |
+ signal.pause()
|
|
78 |
+ |
|
79 |
+ def quit(self):
|
|
80 |
+ if self.__process:
|
|
81 |
+ self.__process.terminate()
|
|
82 |
+ self.__process.join()
|