finn pushed to branch finn/78-capabilities-service at BuildGrid / buildgrid
Commits:
-
f6fdfdad
by Finn at 2018-11-21T10:05:52Z
-
85a988ba
by Finn at 2018-11-21T10:05:52Z
-
ecd4a53d
by Finn at 2018-11-21T10:05:52Z
-
532ad202
by Finn at 2018-11-21T10:05:52Z
-
cb6d4aa0
by Finn at 2018-11-21T10:05:52Z
6 changed files:
- + buildgrid/client/capabilities.py
- + buildgrid/server/capabilities/instance.py
- buildgrid/server/execution/instance.py
- buildgrid/server/instance.py
- + tests/integration/capabilities_service.py
- + tests/utils/capabilities.py
Changes:
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import logging
|
|
17 |
+import grpc
|
|
18 |
+ |
|
19 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
|
20 |
+ |
|
21 |
+ |
|
22 |
+class CapabilitiesInterface:
|
|
23 |
+ """Interface for calls the the Capabilities Service."""
|
|
24 |
+ |
|
25 |
+ def __init__(self, channel):
|
|
26 |
+ """Initialises an instance of the capabilities service.
|
|
27 |
+ |
|
28 |
+ Args:
|
|
29 |
+ channel (grpc.Channel): A gRPC channel to the CAS endpoint.
|
|
30 |
+ """
|
|
31 |
+ self.__logger = logging.getLogger(__name__)
|
|
32 |
+ self.__stub = remote_execution_pb2_grpc.CapabilitiesStub(channel)
|
|
33 |
+ |
|
34 |
+ def get_capabilities(self, instance_name):
|
|
35 |
+ """Returns the capabilities or the server to the user.
|
|
36 |
+ |
|
37 |
+ Args:
|
|
38 |
+ instance_name (str): The name of the instance."""
|
|
39 |
+ |
|
40 |
+ request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=instance_name)
|
|
41 |
+ try:
|
|
42 |
+ return self.__stub.GetCapabilities(request)
|
|
43 |
+ |
|
44 |
+ except grpc.RpcError as e:
|
|
45 |
+ self.__logger.error(e)
|
|
46 |
+ raise
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+import logging
|
|
17 |
+ |
|
18 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
19 |
+ |
|
20 |
+ |
|
21 |
+class CapabilitiesInstance:
|
|
22 |
+ |
|
23 |
+ def __init__(self, cache_instance=None,
|
|
24 |
+ execution_instance=None):
|
|
25 |
+ self.__logger = logging.getLogger(__name__)
|
|
26 |
+ self.__cache_instance = cache_instance
|
|
27 |
+ self.__execution_instance = execution_instance
|
|
28 |
+ |
|
29 |
+ def register_instance_with_server(self, instance_name, server):
|
|
30 |
+ server.add_capabilities_instance(self, instance_name)
|
|
31 |
+ |
|
32 |
+ def get_capabilities(self):
|
|
33 |
+ server_capabilities = remote_execution_pb2.ServerCapabilities()
|
|
34 |
+ server_capabilities.cache_capabilities.CopyFrom(self._get_remote_cache_capabilities())
|
|
35 |
+ server_capabilities.execution_capabilities.CopyFrom(self._get_capabilities_execution())
|
|
36 |
+ return server_capabilities
|
|
37 |
+ |
|
38 |
+ def _get_remote_cache_capabilities(self):
|
|
39 |
+ capabilities = remote_execution_pb2.CacheCapabilities()
|
|
40 |
+ if self.__cache_instance:
|
|
41 |
+ pass
|
|
42 |
+ return capabilities
|
|
43 |
+ |
|
44 |
+ def _get_capabilities_execution(self):
|
|
45 |
+ capabilities = remote_execution_pb2.ExecutionCapabilities()
|
|
46 |
+ if self.__execution_instance:
|
|
47 |
+ capabilities.exec_enabled = True
|
|
48 |
+ digest_function = remote_execution_pb2.UNKNOWN
|
|
49 |
+ |
|
50 |
+ if self.__execution_instance.hash_type() == "sha256":
|
|
51 |
+ digest_function = remote_execution_pb2.SHA256
|
|
52 |
+ |
|
53 |
+ capabilities.digest_function = digest_function
|
|
54 |
+ # TODO: execution priority #102
|
|
55 |
+ # capabilities.execution_priority =
|
|
56 |
+ |
|
57 |
+ else:
|
|
58 |
+ capabilities.exec_enabled = False
|
|
59 |
+ |
|
60 |
+ return capabilities
|
... | ... | @@ -25,6 +25,7 @@ from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError |
25 | 25 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
|
26 | 26 |
|
27 | 27 |
from ..job import Job
|
28 |
+from ...settings import HASH
|
|
28 | 29 |
|
29 | 30 |
|
30 | 31 |
class ExecutionInstance:
|
... | ... | @@ -38,6 +39,10 @@ class ExecutionInstance: |
38 | 39 |
def register_instance_with_server(self, instance_name, server):
|
39 | 40 |
server.add_execution_instance(self, instance_name)
|
40 | 41 |
|
42 |
+ def hash_type(self):
|
|
43 |
+ """Returns the name of the hash"""
|
|
44 |
+ return HASH().name
|
|
45 |
+ |
|
41 | 46 |
def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
42 | 47 |
""" Sends a job for execution.
|
43 | 48 |
Queues an action and creates an Operation instance to be associated with
|
... | ... | @@ -21,6 +21,7 @@ import grpc |
21 | 21 |
|
22 | 22 |
from .cas.service import ByteStreamService, ContentAddressableStorageService
|
23 | 23 |
from .actioncache.service import ActionCacheService
|
24 |
+from .capabilities.service import CapabilitiesService
|
|
24 | 25 |
from .execution.service import ExecutionService
|
25 | 26 |
from .operations.service import OperationsService
|
26 | 27 |
from .bots.service import BotsService
|
... | ... | @@ -57,6 +58,7 @@ class BuildGridServer: |
57 | 58 |
self._action_cache_service = None
|
58 | 59 |
self._cas_service = None
|
59 | 60 |
self._bytestream_service = None
|
61 |
+ self._capabilities_service = None
|
|
60 | 62 |
|
61 | 63 |
def start(self):
|
62 | 64 |
"""Starts the server.
|
... | ... | @@ -157,7 +159,7 @@ class BuildGridServer: |
157 | 159 |
self._action_cache_service.add_instance(instance_name, instance)
|
158 | 160 |
|
159 | 161 |
def add_cas_instance(self, instance, instance_name):
|
160 |
- """Stores a :obj:`ContentAddressableStorageInstance` to the service.
|
|
162 |
+ """Adds a :obj:`ContentAddressableStorageInstance` to the service.
|
|
161 | 163 |
|
162 | 164 |
If no service exists, it creates one.
|
163 | 165 |
|
... | ... | @@ -171,7 +173,7 @@ class BuildGridServer: |
171 | 173 |
self._cas_service.add_instance(instance_name, instance)
|
172 | 174 |
|
173 | 175 |
def add_bytestream_instance(self, instance, instance_name):
|
174 |
- """Stores a :obj:`ByteStreamInstance` to the service.
|
|
176 |
+ """Adds a :obj:`ByteStreamInstance` to the service.
|
|
175 | 177 |
|
176 | 178 |
If no service exists, it creates one.
|
177 | 179 |
|
... | ... | @@ -183,3 +185,17 @@ class BuildGridServer: |
183 | 185 |
self._bytestream_service = ByteStreamService(self._server)
|
184 | 186 |
|
185 | 187 |
self._bytestream_service.add_instance(instance_name, instance)
|
188 |
+ |
|
189 |
+ def add_capabilities_instance(self, instance, instance_name):
|
|
190 |
+ """Adds a :obj:`CapabilitiesInstance` on the service.
|
|
191 |
+ |
|
192 |
+ If no service exists, it creates one.
|
|
193 |
+ |
|
194 |
+ Args:
|
|
195 |
+ instance (:obj:`CapabilitiesInstance`): Instance to add.
|
|
196 |
+ instance_name (str): Instance name.
|
|
197 |
+ """
|
|
198 |
+ if self._capabilities_service is None:
|
|
199 |
+ self._capabilities_service = CapabilitiesService(self._server)
|
|
200 |
+ |
|
201 |
+ self._capabilities_service.add_instance(instance_name, instance)
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+# pylint: disable=redefined-outer-name
|
|
16 |
+ |
|
17 |
+ |
|
18 |
+import grpc
|
|
19 |
+import pytest
|
|
20 |
+ |
|
21 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
22 |
+from buildgrid.client.capabilities import CapabilitiesInterface
|
|
23 |
+from buildgrid.server.controller import ExecutionController
|
|
24 |
+ |
|
25 |
+from ..utils.utils import run_in_subprocess
|
|
26 |
+from ..utils.capabilities import serve_capabilities_service
|
|
27 |
+ |
|
28 |
+ |
|
29 |
+INSTANCES = ['', 'instance']
|
|
30 |
+ |
|
31 |
+ |
|
32 |
+# Use subprocess to avoid creation of gRPC threads in main process
|
|
33 |
+# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
|
|
34 |
+# Multiprocessing uses pickle which protobufs don't work with
|
|
35 |
+# Workaround wrapper to send messages as strings
|
|
36 |
+class ServerInterface:
|
|
37 |
+ |
|
38 |
+ def __init__(self, remote):
|
|
39 |
+ self.__remote = remote
|
|
40 |
+ |
|
41 |
+ def get_capabilities(self, instance_name):
|
|
42 |
+ |
|
43 |
+ def __get_capabilities(queue, remote, instance_name):
|
|
44 |
+ interface = CapabilitiesInterface(grpc.insecure_channel(remote))
|
|
45 |
+ |
|
46 |
+ result = interface.get_capabilities(instance_name)
|
|
47 |
+ queue.put(result.SerializeToString())
|
|
48 |
+ |
|
49 |
+ result = run_in_subprocess(__get_capabilities,
|
|
50 |
+ self.__remote, instance_name)
|
|
51 |
+ |
|
52 |
+ capabilities = remote_execution_pb2.ServerCapabilities()
|
|
53 |
+ capabilities.ParseFromString(result)
|
|
54 |
+ return capabilities
|
|
55 |
+ |
|
56 |
+ |
|
57 |
+@pytest.mark.parametrize('instance', INSTANCES)
|
|
58 |
+def test_execution_not_available(instance):
|
|
59 |
+ with serve_capabilities_service([instance]) as server:
|
|
60 |
+ server_interface = ServerInterface(server.remote)
|
|
61 |
+ response = server_interface.get_capabilities(instance)
|
|
62 |
+ |
|
63 |
+ assert not response.execution_capabilities.exec_enabled
|
|
64 |
+ |
|
65 |
+ |
|
66 |
+@pytest.mark.parametrize('instance', INSTANCES)
|
|
67 |
+def test_execution_available(instance):
|
|
68 |
+ controller = ExecutionController()
|
|
69 |
+ |
|
70 |
+ with serve_capabilities_service([instance],
|
|
71 |
+ execution_instance=controller.execution_instance) as server:
|
|
72 |
+ server_interface = ServerInterface(server.remote)
|
|
73 |
+ response = server_interface.get_capabilities(instance)
|
|
74 |
+ |
|
75 |
+ assert response.execution_capabilities.exec_enabled
|
|
76 |
+ assert response.execution_capabilities.digest_function == remote_execution_pb2.SHA256
|
1 |
+# Copyright (C) 2018 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+from concurrent import futures
|
|
17 |
+from contextlib import contextmanager
|
|
18 |
+import multiprocessing
|
|
19 |
+import os
|
|
20 |
+import signal
|
|
21 |
+ |
|
22 |
+import grpc
|
|
23 |
+import pytest_cov
|
|
24 |
+ |
|
25 |
+from buildgrid.server.capabilities.service import CapabilitiesService
|
|
26 |
+from buildgrid.server.capabilities.instance import CapabilitiesInstance
|
|
27 |
+ |
|
28 |
+ |
|
29 |
+@contextmanager
|
|
30 |
+def serve_capabilities_service(instances, cache_instance=None, execution_instance=None):
|
|
31 |
+ server = Server(instances, cache_instance, execution_instance)
|
|
32 |
+ try:
|
|
33 |
+ yield server
|
|
34 |
+ finally:
|
|
35 |
+ server.quit()
|
|
36 |
+ |
|
37 |
+ |
|
38 |
+class Server:
|
|
39 |
+ |
|
40 |
+ def __init__(self, instances,
|
|
41 |
+ cache_instance=None,
|
|
42 |
+ execution_instance=None):
|
|
43 |
+ self.instances = instances
|
|
44 |
+ |
|
45 |
+ self.__queue = multiprocessing.Queue()
|
|
46 |
+ self.__process = multiprocessing.Process(
|
|
47 |
+ target=Server.serve,
|
|
48 |
+ args=(self.__queue, self.instances, cache_instance, execution_instance))
|
|
49 |
+ self.__process.start()
|
|
50 |
+ |
|
51 |
+ self.port = self.__queue.get(timeout=1)
|
|
52 |
+ self.remote = 'localhost:{}'.format(self.port)
|
|
53 |
+ |
|
54 |
+ @classmethod
|
|
55 |
+ def serve(cls, queue, instances, cache_instance, execution_instance):
|
|
56 |
+ pytest_cov.embed.cleanup_on_sigterm()
|
|
57 |
+ |
|
58 |
+ # Use max_workers default from Python 3.5+
|
|
59 |
+ max_workers = (os.cpu_count() or 1) * 5
|
|
60 |
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
|
61 |
+ port = server.add_insecure_port('localhost:0')
|
|
62 |
+ |
|
63 |
+ capabilities_service = CapabilitiesService(server)
|
|
64 |
+ for name in instances:
|
|
65 |
+ capabilities_instance = CapabilitiesInstance(cache_instance, execution_instance)
|
|
66 |
+ capabilities_service.add_instance(name, capabilities_instance)
|
|
67 |
+ |
|
68 |
+ server.start()
|
|
69 |
+ queue.put(port)
|
|
70 |
+ signal.pause()
|
|
71 |
+ |
|
72 |
+ def quit(self):
|
|
73 |
+ if self.__process:
|
|
74 |
+ self.__process.terminate()
|
|
75 |
+ self.__process.join()
|