finn pushed to branch finn/78-capabilities-service at BuildGrid / buildgrid
Commits:
-
61bb249f
by Finn at 2018-11-22T15:42:38Z
-
7a705b3e
by Finn at 2018-11-22T15:42:38Z
-
d5d0e86c
by Finn at 2018-11-22T15:42:38Z
-
1899f48f
by Finn at 2018-11-22T15:42:38Z
-
e80589da
by Finn at 2018-11-22T15:42:38Z
-
72660316
by Finn at 2018-11-22T15:45:01Z
-
f4c9182a
by Finn at 2018-11-22T15:45:44Z
-
7a8669ab
by Finn at 2018-11-22T15:46:44Z
9 changed files:
- + buildgrid/client/capabilities.py
- + buildgrid/server/capabilities/__init__.py
- + buildgrid/server/capabilities/instance.py
- + buildgrid/server/capabilities/service.py
- buildgrid/server/cas/instance.py
- buildgrid/server/execution/instance.py
- buildgrid/server/instance.py
- + tests/integration/capabilities_service.py
- + tests/utils/capabilities.py
Changes:
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import logging
|
|
17 |
+import grpc
|
|
18 |
+ |
|
19 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
|
20 |
+ |
|
21 |
+ |
|
22 |
+class CapabilitiesInterface:
|
|
23 |
+ """Interface for calls the the Capabilities Service."""
|
|
24 |
+ |
|
25 |
+ def __init__(self, channel):
|
|
26 |
+ """Initialises an instance of the capabilities service.
|
|
27 |
+ |
|
28 |
+ Args:
|
|
29 |
+ channel (grpc.Channel): A gRPC channel to the CAS endpoint.
|
|
30 |
+ """
|
|
31 |
+ self.__logger = logging.getLogger(__name__)
|
|
32 |
+ self.__stub = remote_execution_pb2_grpc.CapabilitiesStub(channel)
|
|
33 |
+ |
|
34 |
+ def get_capabilities(self, instance_name):
|
|
35 |
+ """Returns the capabilities or the server to the user.
|
|
36 |
+ |
|
37 |
+ Args:
|
|
38 |
+ instance_name (str): The name of the instance."""
|
|
39 |
+ |
|
40 |
+ request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=instance_name)
|
|
41 |
+ try:
|
|
42 |
+ return self.__stub.GetCapabilities(request)
|
|
43 |
+ |
|
44 |
+ except grpc.RpcError as e:
|
|
45 |
+ self.__logger.error(e)
|
|
46 |
+ raise
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import logging
|
|
17 |
+ |
|
18 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
19 |
+ |
|
20 |
+ |
|
21 |
+class CapabilitiesInstance:
|
|
22 |
+ |
|
23 |
+ def __init__(self, cas_instance=None, action_cache_instance=None, execution_instance=None):
|
|
24 |
+ self.__logger = logging.getLogger(__name__)
|
|
25 |
+ self.__cas_instance = cas_instance
|
|
26 |
+ self.__action_cache_instance = action_cache_instance
|
|
27 |
+ self.__execution_instance = execution_instance
|
|
28 |
+ |
|
29 |
+ def register_instance_with_server(self, instance_name, server):
|
|
30 |
+ server.add_capabilities_instance(self, instance_name)
|
|
31 |
+ |
|
32 |
+ def add_cas_instance(self, cas_instance):
|
|
33 |
+ self.__cas_instance = cas_instance
|
|
34 |
+ |
|
35 |
+ def add_action_cache_instance(self, action_cache_instance):
|
|
36 |
+ self.__action_cache_instance = action_cache_instance
|
|
37 |
+ |
|
38 |
+ def add_execution_instance(self, execution_instance):
|
|
39 |
+ self.__execution_instance = execution_instance
|
|
40 |
+ |
|
41 |
+ def get_capabilities(self):
|
|
42 |
+ server_capabilities = remote_execution_pb2.ServerCapabilities()
|
|
43 |
+ server_capabilities.cache_capabilities.CopyFrom(self._get_cache_capabilities())
|
|
44 |
+ server_capabilities.execution_capabilities.CopyFrom(self._get_capabilities_execution())
|
|
45 |
+ return server_capabilities
|
|
46 |
+ |
|
47 |
+ def _get_cache_capabilities(self):
|
|
48 |
+ capabilities = remote_execution_pb2.CacheCapabilities()
|
|
49 |
+ action_cache_update_capabilities = remote_execution_pb2.ActionCacheUpdateCapabilities()
|
|
50 |
+ |
|
51 |
+ if self.__cas_instance:
|
|
52 |
+ capabilities.digest_function.extend([self.__cas_instance.hash_type()])
|
|
53 |
+ capabilities.max_batch_total_size_bytes = self.__cas_instance.max_batch_total_size_bytes()
|
|
54 |
+ capabilities.symlink_absolute_path_strategy = self.__cas_instance.symlink_absolute_path_strategy()
|
|
55 |
+ # TODO: execution priority #102
|
|
56 |
+ # capabilities.cache_priority_capabilities =
|
|
57 |
+ |
|
58 |
+ if self.__action_cache_instance:
|
|
59 |
+ action_cache_update_capabilities.update_enabled = self.__action_cache_instance.allow_updates
|
|
60 |
+ |
|
61 |
+ capabilities.action_cache_update_capabilities.CopyFrom(action_cache_update_capabilities)
|
|
62 |
+ return capabilities
|
|
63 |
+ |
|
64 |
+ def _get_capabilities_execution(self):
|
|
65 |
+ capabilities = remote_execution_pb2.ExecutionCapabilities()
|
|
66 |
+ if self.__execution_instance:
|
|
67 |
+ capabilities.exec_enabled = True
|
|
68 |
+ capabilities.digest_function = self.__execution_instance.hash_type()
|
|
69 |
+ # TODO: execution priority #102
|
|
70 |
+ # capabilities.execution_priority =
|
|
71 |
+ |
|
72 |
+ else:
|
|
73 |
+ capabilities.exec_enabled = False
|
|
74 |
+ |
|
75 |
+ return capabilities
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import logging
|
|
17 |
+ |
|
18 |
+import grpc
|
|
19 |
+ |
|
20 |
+from buildgrid._exceptions import InvalidArgumentError
|
|
21 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
|
22 |
+ |
|
23 |
+ |
|
24 |
+class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
|
|
25 |
+ |
|
26 |
+ def __init__(self, server):
|
|
27 |
+ self.__logger = logging.getLogger(__name__)
|
|
28 |
+ self.__instances = {}
|
|
29 |
+ remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(self, server)
|
|
30 |
+ |
|
31 |
+ def add_instance(self, name, instance):
|
|
32 |
+ self.__instances[name] = instance
|
|
33 |
+ |
|
34 |
+ def add_cas_instance(self, name, instance):
|
|
35 |
+ self.__instances[name].add_cas_instance(instance)
|
|
36 |
+ |
|
37 |
+ def add_action_cache_instance(self, name, instance):
|
|
38 |
+ self.__instances[name].add_action_cache_instance(instance)
|
|
39 |
+ |
|
40 |
+ def add_execution_instance(self, name, instance):
|
|
41 |
+ self.__instances[name].add_execution_instance(instance)
|
|
42 |
+ |
|
43 |
+ def GetCapabilities(self, request, context):
|
|
44 |
+ try:
|
|
45 |
+ instance = self._get_instance(request.instance_name)
|
|
46 |
+ return instance.get_capabilities()
|
|
47 |
+ |
|
48 |
+ except InvalidArgumentError as e:
|
|
49 |
+ self.__logger.error(e)
|
|
50 |
+ context.set_details(str(e))
|
|
51 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
52 |
+ |
|
53 |
+ return remote_execution_pb2_grpc.ServerCapabilities()
|
|
54 |
+ |
|
55 |
+ def _get_instance(self, name):
|
|
56 |
+ try:
|
|
57 |
+ return self.__instances[name]
|
|
58 |
+ |
|
59 |
+ except KeyError:
|
|
60 |
+ raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))
|
... | ... | @@ -37,6 +37,22 @@ class ContentAddressableStorageInstance: |
37 | 37 |
def register_instance_with_server(self, instance_name, server):
|
38 | 38 |
server.add_cas_instance(self, instance_name)
|
39 | 39 |
|
40 |
+ def hash_type(self):
|
|
41 |
+ hash_name = HASH().name
|
|
42 |
+ if hash_name == "sha256":
|
|
43 |
+ return re_pb2.SHA256
|
|
44 |
+ return re_pb2.UNKNOWN
|
|
45 |
+ |
|
46 |
+ def max_batch_total_size_bytes(self):
|
|
47 |
+ # TODO: link with max size
|
|
48 |
+ # Should be added from settings in MR !119
|
|
49 |
+ return 2000000
|
|
50 |
+ |
|
51 |
+ def symlink_absolute_path_strategy(self):
|
|
52 |
+ # Currently this strategy is hardcoded into BuildGrid
|
|
53 |
+ # With no setting to reference
|
|
54 |
+ return re_pb2.CacheCapabilities().DISALLOWED
|
|
55 |
+ |
|
40 | 56 |
def find_missing_blobs(self, blob_digests):
|
41 | 57 |
storage = self._storage
|
42 | 58 |
return re_pb2.FindMissingBlobsResponse(
|
... | ... | @@ -22,9 +22,11 @@ An instance of the Remote Execution Service. |
22 | 22 |
import logging
|
23 | 23 |
|
24 | 24 |
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
25 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
25 | 26 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
|
26 | 27 |
|
27 | 28 |
from ..job import Job
|
29 |
+from ...settings import HASH
|
|
28 | 30 |
|
29 | 31 |
|
30 | 32 |
class ExecutionInstance:
|
... | ... | @@ -38,6 +40,12 @@ class ExecutionInstance: |
38 | 40 |
def register_instance_with_server(self, instance_name, server):
|
39 | 41 |
server.add_execution_instance(self, instance_name)
|
40 | 42 |
|
43 |
+ def hash_type(self):
|
|
44 |
+ hash_name = HASH().name
|
|
45 |
+ if hash_name == "sha256":
|
|
46 |
+ return remote_execution_pb2.SHA256
|
|
47 |
+ return remote_execution_pb2.UNKNOWN
|
|
48 |
+ |
|
41 | 49 |
def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
42 | 50 |
""" Sends a job for execution.
|
43 | 51 |
Queues an action and creates an Operation instance to be associated with
|
... | ... | @@ -21,11 +21,14 @@ import grpc |
21 | 21 |
|
22 | 22 |
from .cas.service import ByteStreamService, ContentAddressableStorageService
|
23 | 23 |
from .actioncache.service import ActionCacheService
|
24 |
+from .capabilities.service import CapabilitiesService
|
|
24 | 25 |
from .execution.service import ExecutionService
|
25 | 26 |
from .operations.service import OperationsService
|
26 | 27 |
from .bots.service import BotsService
|
27 | 28 |
from .referencestorage.service import ReferenceStorageService
|
28 | 29 |
|
30 |
+from .capabilities.instance import CapabilitiesInstance
|
|
31 |
+ |
|
29 | 32 |
|
30 | 33 |
class BuildGridServer:
|
31 | 34 |
"""Creates a BuildGrid server.
|
... | ... | @@ -50,6 +53,9 @@ class BuildGridServer: |
50 | 53 |
|
51 | 54 |
self._server = server
|
52 | 55 |
|
56 |
+ # We always want a capabilities service
|
|
57 |
+ self._capabilities_service = CapabilitiesService(self._server)
|
|
58 |
+ |
|
53 | 59 |
self._execution_service = None
|
54 | 60 |
self._bots_service = None
|
55 | 61 |
self._operations_service = None
|
... | ... | @@ -99,6 +105,7 @@ class BuildGridServer: |
99 | 105 |
self._execution_service = ExecutionService(self._server)
|
100 | 106 |
|
101 | 107 |
self._execution_service.add_instance(instance_name, instance)
|
108 |
+ self._add_capabilities_instance(instance_name, execution_instance=instance)
|
|
102 | 109 |
|
103 | 110 |
def add_bots_interface(self, instance, instance_name):
|
104 | 111 |
"""Adds a :obj:`BotsInterface` to the service.
|
... | ... | @@ -155,9 +162,10 @@ class BuildGridServer: |
155 | 162 |
self._action_cache_service = ActionCacheService(self._server)
|
156 | 163 |
|
157 | 164 |
self._action_cache_service.add_instance(instance_name, instance)
|
165 |
+ self._add_capabilities_instance(instance_name, action_cache_instance=instance)
|
|
158 | 166 |
|
159 | 167 |
def add_cas_instance(self, instance, instance_name):
|
160 |
- """Stores a :obj:`ContentAddressableStorageInstance` to the service.
|
|
168 |
+ """Adds a :obj:`ContentAddressableStorageInstance` to the service.
|
|
161 | 169 |
|
162 | 170 |
If no service exists, it creates one.
|
163 | 171 |
|
... | ... | @@ -168,10 +176,10 @@ class BuildGridServer: |
168 | 176 |
if self._cas_service is None:
|
169 | 177 |
self._cas_service = ContentAddressableStorageService(self._server)
|
170 | 178 |
|
171 |
- self._cas_service.add_instance(instance_name, instance)
|
|
179 |
+ self._add_capabilities_instance(instance_name, cas_instance=instance)
|
|
172 | 180 |
|
173 | 181 |
def add_bytestream_instance(self, instance, instance_name):
|
174 |
- """Stores a :obj:`ByteStreamInstance` to the service.
|
|
182 |
+ """Adds a :obj:`ByteStreamInstance` to the service.
|
|
175 | 183 |
|
176 | 184 |
If no service exists, it creates one.
|
177 | 185 |
|
... | ... | @@ -183,3 +191,28 @@ class BuildGridServer: |
183 | 191 |
self._bytestream_service = ByteStreamService(self._server)
|
184 | 192 |
|
185 | 193 |
self._bytestream_service.add_instance(instance_name, instance)
|
194 |
+ |
|
195 |
+ def _add_capabilities_instance(self, instance_name,
|
|
196 |
+ cas_instance=None,
|
|
197 |
+ action_cache_instance=None,
|
|
198 |
+ execution_instance=None):
|
|
199 |
+ """Adds a :obj:`CapabilitiesInstance` to the service.
|
|
200 |
+ |
|
201 |
+ Args:
|
|
202 |
+ instance (:obj:`CapabilitiesInstance`): Instance to add.
|
|
203 |
+ instance_name (str): Instance name.
|
|
204 |
+ """
|
|
205 |
+ |
|
206 |
+ try:
|
|
207 |
+ if cas_instance:
|
|
208 |
+ self._capabilities_service.add_cas_instance(instance_name, cas_instance)
|
|
209 |
+ if action_cache_instance:
|
|
210 |
+ self._capabilities_service.add_action_cache_instance(instance_name, action_cache_instance)
|
|
211 |
+ if execution_instance:
|
|
212 |
+ self._capabilities_service.add_execution_instance(instance_name, execution_instance)
|
|
213 |
+ |
|
214 |
+ except KeyError:
|
|
215 |
+ capabilities_instance = CapabilitiesInstance(cas_instance,
|
|
216 |
+ action_cache_instance,
|
|
217 |
+ execution_instance)
|
|
218 |
+ self._capabilities_service.add_instance(instance_name, capabilities_instance)
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+# pylint: disable=redefined-outer-name
|
|
16 |
+ |
|
17 |
+ |
|
18 |
+import grpc
|
|
19 |
+import pytest
|
|
20 |
+ |
|
21 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
22 |
+from buildgrid.client.capabilities import CapabilitiesInterface
|
|
23 |
+from buildgrid.server.controller import ExecutionController
|
|
24 |
+from buildgrid.server.actioncache.storage import ActionCache
|
|
25 |
+from buildgrid.server.cas.instance import ContentAddressableStorageInstance
|
|
26 |
+from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
|
|
27 |
+ |
|
28 |
+from ..utils.utils import run_in_subprocess
|
|
29 |
+from ..utils.capabilities import serve_capabilities_service
|
|
30 |
+ |
|
31 |
+ |
|
32 |
+INSTANCES = ['', 'instance']
|
|
33 |
+ |
|
34 |
+ |
|
35 |
+# Use subprocess to avoid creation of gRPC threads in main process
|
|
36 |
+# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
|
|
37 |
+# Multiprocessing uses pickle which protobufs don't work with
|
|
38 |
+# Workaround wrapper to send messages as strings
|
|
39 |
+class ServerInterface:
|
|
40 |
+ |
|
41 |
+ def __init__(self, remote):
|
|
42 |
+ self.__remote = remote
|
|
43 |
+ |
|
44 |
+ def get_capabilities(self, instance_name):
|
|
45 |
+ |
|
46 |
+ def __get_capabilities(queue, remote, instance_name):
|
|
47 |
+ interface = CapabilitiesInterface(grpc.insecure_channel(remote))
|
|
48 |
+ |
|
49 |
+ result = interface.get_capabilities(instance_name)
|
|
50 |
+ queue.put(result.SerializeToString())
|
|
51 |
+ |
|
52 |
+ result = run_in_subprocess(__get_capabilities,
|
|
53 |
+ self.__remote, instance_name)
|
|
54 |
+ |
|
55 |
+ capabilities = remote_execution_pb2.ServerCapabilities()
|
|
56 |
+ capabilities.ParseFromString(result)
|
|
57 |
+ return capabilities
|
|
58 |
+ |
|
59 |
+ |
|
60 |
+@pytest.mark.parametrize('instance', INSTANCES)
|
|
61 |
+def test_execution_not_available_capabilities(instance):
|
|
62 |
+ with serve_capabilities_service([instance]) as server:
|
|
63 |
+ server_interface = ServerInterface(server.remote)
|
|
64 |
+ response = server_interface.get_capabilities(instance)
|
|
65 |
+ |
|
66 |
+ assert not response.execution_capabilities.exec_enabled
|
|
67 |
+ |
|
68 |
+ |
|
69 |
+@pytest.mark.parametrize('instance', INSTANCES)
|
|
70 |
+def test_execution_available_capabilities(instance):
|
|
71 |
+ controller = ExecutionController()
|
|
72 |
+ |
|
73 |
+ with serve_capabilities_service([instance],
|
|
74 |
+ execution_instance=controller.execution_instance) as server:
|
|
75 |
+ server_interface = ServerInterface(server.remote)
|
|
76 |
+ response = server_interface.get_capabilities(instance)
|
|
77 |
+ |
|
78 |
+ assert response.execution_capabilities.exec_enabled
|
|
79 |
+ assert response.execution_capabilities.digest_function
|
|
80 |
+ |
|
81 |
+ |
|
82 |
+@pytest.mark.parametrize('instance', INSTANCES)
|
|
83 |
+def test_action_cache_allow_updates_capabilities(instance):
|
|
84 |
+ storage = LRUMemoryCache(limit=256)
|
|
85 |
+ action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=True)
|
|
86 |
+ |
|
87 |
+ with serve_capabilities_service([instance],
|
|
88 |
+ action_cache_instance=action_cache) as server:
|
|
89 |
+ server_interface = ServerInterface(server.remote)
|
|
90 |
+ response = server_interface.get_capabilities(instance)
|
|
91 |
+ |
|
92 |
+ assert response.cache_capabilities.action_cache_update_capabilities.update_enabled
|
|
93 |
+ |
|
94 |
+ |
|
95 |
+@pytest.mark.parametrize('instance', INSTANCES)
|
|
96 |
+def test_action_cache_not_allow_updates_capabilities(instance):
|
|
97 |
+ storage = LRUMemoryCache(limit=256)
|
|
98 |
+ action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=False)
|
|
99 |
+ |
|
100 |
+ with serve_capabilities_service([instance],
|
|
101 |
+ action_cache_instance=action_cache) as server:
|
|
102 |
+ server_interface = ServerInterface(server.remote)
|
|
103 |
+ response = server_interface.get_capabilities(instance)
|
|
104 |
+ |
|
105 |
+ assert not response.cache_capabilities.action_cache_update_capabilities.update_enabled
|
|
106 |
+ |
|
107 |
+ |
|
108 |
+@pytest.mark.parametrize('instance', INSTANCES)
|
|
109 |
+def test_cas_capabilities(instance):
|
|
110 |
+ cas = ContentAddressableStorageInstance(None)
|
|
111 |
+ |
|
112 |
+ with serve_capabilities_service([instance],
|
|
113 |
+ cas_instance=cas) as server:
|
|
114 |
+ server_interface = ServerInterface(server.remote)
|
|
115 |
+ response = server_interface.get_capabilities(instance)
|
|
116 |
+ |
|
117 |
+ assert len(response.cache_capabilities.digest_function) == 1
|
|
118 |
+ assert response.cache_capabilities.digest_function[0]
|
|
119 |
+ assert response.cache_capabilities.symlink_absolute_path_strategy
|
|
120 |
+ assert response.cache_capabilities.max_batch_total_size_bytes
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+from concurrent import futures
|
|
17 |
+from contextlib import contextmanager
|
|
18 |
+import multiprocessing
|
|
19 |
+import os
|
|
20 |
+import signal
|
|
21 |
+ |
|
22 |
+import grpc
|
|
23 |
+import pytest_cov
|
|
24 |
+ |
|
25 |
+from buildgrid.server.capabilities.service import CapabilitiesService
|
|
26 |
+from buildgrid.server.capabilities.instance import CapabilitiesInstance
|
|
27 |
+ |
|
28 |
+ |
|
29 |
+@contextmanager
|
|
30 |
+def serve_capabilities_service(instances,
|
|
31 |
+ cas_instance=None,
|
|
32 |
+ action_cache_instance=None,
|
|
33 |
+ execution_instance=None):
|
|
34 |
+ server = Server(instances,
|
|
35 |
+ cas_instance,
|
|
36 |
+ action_cache_instance,
|
|
37 |
+ execution_instance)
|
|
38 |
+ try:
|
|
39 |
+ yield server
|
|
40 |
+ finally:
|
|
41 |
+ server.quit()
|
|
42 |
+ |
|
43 |
+ |
|
44 |
+class Server:
|
|
45 |
+ |
|
46 |
+ def __init__(self, instances,
|
|
47 |
+ cas_instance=None,
|
|
48 |
+ action_cache_instance=None,
|
|
49 |
+ execution_instance=None):
|
|
50 |
+ self.instances = instances
|
|
51 |
+ |
|
52 |
+ self.__queue = multiprocessing.Queue()
|
|
53 |
+ self.__process = multiprocessing.Process(
|
|
54 |
+ target=Server.serve,
|
|
55 |
+ args=(self.__queue, self.instances, cas_instance, action_cache_instance, execution_instance))
|
|
56 |
+ self.__process.start()
|
|
57 |
+ |
|
58 |
+ self.port = self.__queue.get(timeout=1)
|
|
59 |
+ self.remote = 'localhost:{}'.format(self.port)
|
|
60 |
+ |
|
61 |
+ @staticmethod
|
|
62 |
+ def serve(queue, instances, cas_instance, action_cache_instance, execution_instance):
|
|
63 |
+ pytest_cov.embed.cleanup_on_sigterm()
|
|
64 |
+ |
|
65 |
+ # Use max_workers default from Python 3.5+
|
|
66 |
+ max_workers = (os.cpu_count() or 1) * 5
|
|
67 |
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
|
68 |
+ port = server.add_insecure_port('localhost:0')
|
|
69 |
+ |
|
70 |
+ capabilities_service = CapabilitiesService(server)
|
|
71 |
+ for name in instances:
|
|
72 |
+ capabilities_instance = CapabilitiesInstance(cas_instance, action_cache_instance, execution_instance)
|
|
73 |
+ capabilities_service.add_instance(name, capabilities_instance)
|
|
74 |
+ |
|
75 |
+ server.start()
|
|
76 |
+ queue.put(port)
|
|
77 |
+ signal.pause()
|
|
78 |
+ |
|
79 |
+ def quit(self):
|
|
80 |
+ if self.__process:
|
|
81 |
+ self.__process.terminate()
|
|
82 |
+ self.__process.join()
|