[Notes] [Git][BuildGrid/buildgrid][finn/78-capabilities-service] 8 commits: Adding capabilities service.



Title: GitLab

finn pushed to branch finn/78-capabilities-service at BuildGrid / buildgrid

Commits:

9 changed files:

Changes:

  • buildgrid/client/capabilities.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import logging
    
    17
    +import grpc
    
    18
    +
    
    19
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    20
    +
    
    21
    +
    
    22
    +class CapabilitiesInterface:
    
    23
    +    """Interface for calls the the Capabilities Service."""
    
    24
    +
    
    25
    +    def __init__(self, channel):
    
    26
    +        """Initialises an instance of the capabilities service.
    
    27
    +
    
    28
    +        Args:
    
    29
    +            channel (grpc.Channel): A gRPC channel to the CAS endpoint.
    
    30
    +        """
    
    31
    +        self.__logger = logging.getLogger(__name__)
    
    32
    +        self.__stub = remote_execution_pb2_grpc.CapabilitiesStub(channel)
    
    33
    +
    
    34
    +    def get_capabilities(self, instance_name):
    
    35
    +        """Returns the capabilities or the server to the user.
    
    36
    +
    
    37
    +        Args:
    
    38
    +            instance_name (str): The name of the instance."""
    
    39
    +
    
    40
    +        request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=instance_name)
    
    41
    +        try:
    
    42
    +            return self.__stub.GetCapabilities(request)
    
    43
    +
    
    44
    +        except grpc.RpcError as e:
    
    45
    +            self.__logger.error(e)
    
    46
    +            raise

  • buildgrid/server/capabilities/__init__.py

  • buildgrid/server/capabilities/instance.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import logging
    
    17
    +
    
    18
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    19
    +
    
    20
    +
    
    21
    +class CapabilitiesInstance:
    
    22
    +
    
    23
    +    def __init__(self, cas_instance=None, action_cache_instance=None, execution_instance=None):
    
    24
    +        self.__logger = logging.getLogger(__name__)
    
    25
    +        self.__cas_instance = cas_instance
    
    26
    +        self.__action_cache_instance = action_cache_instance
    
    27
    +        self.__execution_instance = execution_instance
    
    28
    +
    
    29
    +    def register_instance_with_server(self, instance_name, server):
    
    30
    +        server.add_capabilities_instance(self, instance_name)
    
    31
    +
    
    32
    +    def add_cas_instance(self, cas_instance):
    
    33
    +        self.__cas_instance = cas_instance
    
    34
    +
    
    35
    +    def add_action_cache_instance(self, action_cache_instance):
    
    36
    +        self.__action_cache_instance = action_cache_instance
    
    37
    +
    
    38
    +    def add_execution_instance(self, execution_instance):
    
    39
    +        self.__execution_instance = execution_instance
    
    40
    +
    
    41
    +    def get_capabilities(self):
    
    42
    +        server_capabilities = remote_execution_pb2.ServerCapabilities()
    
    43
    +        server_capabilities.cache_capabilities.CopyFrom(self._get_cache_capabilities())
    
    44
    +        server_capabilities.execution_capabilities.CopyFrom(self._get_capabilities_execution())
    
    45
    +        return server_capabilities
    
    46
    +
    
    47
    +    def _get_cache_capabilities(self):
    
    48
    +        capabilities = remote_execution_pb2.CacheCapabilities()
    
    49
    +        action_cache_update_capabilities = remote_execution_pb2.ActionCacheUpdateCapabilities()
    
    50
    +
    
    51
    +        if self.__cas_instance:
    
    52
    +            capabilities.digest_function.extend([self.__cas_instance.hash_type()])
    
    53
    +            capabilities.max_batch_total_size_bytes = self.__cas_instance.max_batch_total_size_bytes()
    
    54
    +            capabilities.symlink_absolute_path_strategy = self.__cas_instance.symlink_absolute_path_strategy()
    
    55
    +            # TODO: execution priority #102
    
    56
    +            # capabilities.cache_priority_capabilities =
    
    57
    +
    
    58
    +        if self.__action_cache_instance:
    
    59
    +            action_cache_update_capabilities.update_enabled = self.__action_cache_instance.allow_updates
    
    60
    +
    
    61
    +        capabilities.action_cache_update_capabilities.CopyFrom(action_cache_update_capabilities)
    
    62
    +        return capabilities
    
    63
    +
    
    64
    +    def _get_capabilities_execution(self):
    
    65
    +        capabilities = remote_execution_pb2.ExecutionCapabilities()
    
    66
    +        if self.__execution_instance:
    
    67
    +            capabilities.exec_enabled = True
    
    68
    +            capabilities.digest_function = self.__execution_instance.hash_type()
    
    69
    +            # TODO: execution priority #102
    
    70
    +            # capabilities.execution_priority =
    
    71
    +
    
    72
    +        else:
    
    73
    +            capabilities.exec_enabled = False
    
    74
    +
    
    75
    +        return capabilities

  • buildgrid/server/capabilities/service.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import logging
    
    17
    +
    
    18
    +import grpc
    
    19
    +
    
    20
    +from buildgrid._exceptions import InvalidArgumentError
    
    21
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    22
    +
    
    23
    +
    
    24
    +class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
    
    25
    +
    
    26
    +    def __init__(self, server):
    
    27
    +        self.__logger = logging.getLogger(__name__)
    
    28
    +        self.__instances = {}
    
    29
    +        remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(self, server)
    
    30
    +
    
    31
    +    def add_instance(self, name, instance):
    
    32
    +        self.__instances[name] = instance
    
    33
    +
    
    34
    +    def add_cas_instance(self, name, instance):
    
    35
    +        self.__instances[name].add_cas_instance(instance)
    
    36
    +
    
    37
    +    def add_action_cache_instance(self, name, instance):
    
    38
    +        self.__instances[name].add_action_cache_instance(instance)
    
    39
    +
    
    40
    +    def add_execution_instance(self, name, instance):
    
    41
    +        self.__instances[name].add_execution_instance(instance)
    
    42
    +
    
    43
    +    def GetCapabilities(self, request, context):
    
    44
    +        try:
    
    45
    +            instance = self._get_instance(request.instance_name)
    
    46
    +            return instance.get_capabilities()
    
    47
    +
    
    48
    +        except InvalidArgumentError as e:
    
    49
    +            self.__logger.error(e)
    
    50
    +            context.set_details(str(e))
    
    51
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    52
    +
    
    53
    +        return remote_execution_pb2_grpc.ServerCapabilities()
    
    54
    +
    
    55
    +    def _get_instance(self, name):
    
    56
    +        try:
    
    57
    +            return self.__instances[name]
    
    58
    +
    
    59
    +        except KeyError:
    
    60
    +            raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))

  • buildgrid/server/cas/instance.py
    ... ... @@ -37,6 +37,22 @@ class ContentAddressableStorageInstance:
    37 37
         def register_instance_with_server(self, instance_name, server):
    
    38 38
             server.add_cas_instance(self, instance_name)
    
    39 39
     
    
    40
    +    def hash_type(self):
    
    41
    +        hash_name = HASH().name
    
    42
    +        if hash_name == "sha256":
    
    43
    +            return re_pb2.SHA256
    
    44
    +        return re_pb2.UNKNOWN
    
    45
    +
    
    46
    +    def max_batch_total_size_bytes(self):
    
    47
    +        # TODO: link with max size
    
    48
    +        # Should be added from settings in MR !119
    
    49
    +        return 2000000
    
    50
    +
    
    51
    +    def symlink_absolute_path_strategy(self):
    
    52
    +        # Currently this strategy is hardcoded into BuildGrid
    
    53
    +        # With no setting to reference
    
    54
    +        return re_pb2.CacheCapabilities().DISALLOWED
    
    55
    +
    
    40 56
         def find_missing_blobs(self, blob_digests):
    
    41 57
             storage = self._storage
    
    42 58
             return re_pb2.FindMissingBlobsResponse(
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -22,9 +22,11 @@ An instance of the Remote Execution Service.
    22 22
     import logging
    
    23 23
     
    
    24 24
     from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    25
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    25 26
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26 27
     
    
    27 28
     from ..job import Job
    
    29
    +from ...settings import HASH
    
    28 30
     
    
    29 31
     
    
    30 32
     class ExecutionInstance:
    
    ... ... @@ -38,6 +40,12 @@ class ExecutionInstance:
    38 40
         def register_instance_with_server(self, instance_name, server):
    
    39 41
             server.add_execution_instance(self, instance_name)
    
    40 42
     
    
    43
    +    def hash_type(self):
    
    44
    +        hash_name = HASH().name
    
    45
    +        if hash_name == "sha256":
    
    46
    +            return remote_execution_pb2.SHA256
    
    47
    +        return remote_execution_pb2.UNKNOWN
    
    48
    +
    
    41 49
         def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    42 50
             """ Sends a job for execution.
    
    43 51
             Queues an action and creates an Operation instance to be associated with
    

  • buildgrid/server/instance.py
    ... ... @@ -21,11 +21,14 @@ import grpc
    21 21
     
    
    22 22
     from .cas.service import ByteStreamService, ContentAddressableStorageService
    
    23 23
     from .actioncache.service import ActionCacheService
    
    24
    +from .capabilities.service import CapabilitiesService
    
    24 25
     from .execution.service import ExecutionService
    
    25 26
     from .operations.service import OperationsService
    
    26 27
     from .bots.service import BotsService
    
    27 28
     from .referencestorage.service import ReferenceStorageService
    
    28 29
     
    
    30
    +from .capabilities.instance import CapabilitiesInstance
    
    31
    +
    
    29 32
     
    
    30 33
     class BuildGridServer:
    
    31 34
         """Creates a BuildGrid server.
    
    ... ... @@ -50,6 +53,9 @@ class BuildGridServer:
    50 53
     
    
    51 54
             self._server = server
    
    52 55
     
    
    56
    +        # We always want a capabilities service
    
    57
    +        self._capabilities_service = CapabilitiesService(self._server)
    
    58
    +
    
    53 59
             self._execution_service = None
    
    54 60
             self._bots_service = None
    
    55 61
             self._operations_service = None
    
    ... ... @@ -99,6 +105,7 @@ class BuildGridServer:
    99 105
                 self._execution_service = ExecutionService(self._server)
    
    100 106
     
    
    101 107
             self._execution_service.add_instance(instance_name, instance)
    
    108
    +        self._add_capabilities_instance(instance_name, execution_instance=instance)
    
    102 109
     
    
    103 110
         def add_bots_interface(self, instance, instance_name):
    
    104 111
             """Adds a :obj:`BotsInterface` to the service.
    
    ... ... @@ -155,9 +162,10 @@ class BuildGridServer:
    155 162
                 self._action_cache_service = ActionCacheService(self._server)
    
    156 163
     
    
    157 164
             self._action_cache_service.add_instance(instance_name, instance)
    
    165
    +        self._add_capabilities_instance(instance_name, action_cache_instance=instance)
    
    158 166
     
    
    159 167
         def add_cas_instance(self, instance, instance_name):
    
    160
    -        """Stores a :obj:`ContentAddressableStorageInstance` to the service.
    
    168
    +        """Adds a :obj:`ContentAddressableStorageInstance` to the service.
    
    161 169
     
    
    162 170
             If no service exists, it creates one.
    
    163 171
     
    
    ... ... @@ -168,10 +176,10 @@ class BuildGridServer:
    168 176
             if self._cas_service is None:
    
    169 177
                 self._cas_service = ContentAddressableStorageService(self._server)
    
    170 178
     
    
    171
    -        self._cas_service.add_instance(instance_name, instance)
    
    179
    +        self._add_capabilities_instance(instance_name, cas_instance=instance)
    
    172 180
     
    
    173 181
         def add_bytestream_instance(self, instance, instance_name):
    
    174
    -        """Stores a :obj:`ByteStreamInstance` to the service.
    
    182
    +        """Adds a :obj:`ByteStreamInstance` to the service.
    
    175 183
     
    
    176 184
             If no service exists, it creates one.
    
    177 185
     
    
    ... ... @@ -183,3 +191,28 @@ class BuildGridServer:
    183 191
                 self._bytestream_service = ByteStreamService(self._server)
    
    184 192
     
    
    185 193
             self._bytestream_service.add_instance(instance_name, instance)
    
    194
    +
    
    195
    +    def _add_capabilities_instance(self, instance_name,
    
    196
    +                                   cas_instance=None,
    
    197
    +                                   action_cache_instance=None,
    
    198
    +                                   execution_instance=None):
    
    199
    +        """Adds a :obj:`CapabilitiesInstance` to the service.
    
    200
    +
    
    201
    +        Args:
    
    202
    +            instance (:obj:`CapabilitiesInstance`): Instance to add.
    
    203
    +            instance_name (str): Instance name.
    
    204
    +        """
    
    205
    +
    
    206
    +        try:
    
    207
    +            if cas_instance:
    
    208
    +                self._capabilities_service.add_cas_instance(instance_name, cas_instance)
    
    209
    +            if action_cache_instance:
    
    210
    +                self._capabilities_service.add_action_cache_instance(instance_name, action_cache_instance)
    
    211
    +            if execution_instance:
    
    212
    +                self._capabilities_service.add_execution_instance(instance_name, execution_instance)
    
    213
    +
    
    214
    +        except KeyError:
    
    215
    +            capabilities_instance = CapabilitiesInstance(cas_instance,
    
    216
    +                                                         action_cache_instance,
    
    217
    +                                                         execution_instance)
    
    218
    +            self._capabilities_service.add_instance(instance_name, capabilities_instance)

  • tests/integration/capabilities_service.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +# pylint: disable=redefined-outer-name
    
    16
    +
    
    17
    +
    
    18
    +import grpc
    
    19
    +import pytest
    
    20
    +
    
    21
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    22
    +from buildgrid.client.capabilities import CapabilitiesInterface
    
    23
    +from buildgrid.server.controller import ExecutionController
    
    24
    +from buildgrid.server.actioncache.storage import ActionCache
    
    25
    +from buildgrid.server.cas.instance import ContentAddressableStorageInstance
    
    26
    +from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
    
    27
    +
    
    28
    +from ..utils.utils import run_in_subprocess
    
    29
    +from ..utils.capabilities import serve_capabilities_service
    
    30
    +
    
    31
    +
    
    32
    +INSTANCES = ['', 'instance']
    
    33
    +
    
    34
    +
    
    35
    +# Use subprocess to avoid creation of gRPC threads in main process
    
    36
    +# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
    
    37
    +# Multiprocessing uses pickle which protobufs don't work with
    
    38
    +# Workaround wrapper to send messages as strings
    
    39
    +class ServerInterface:
    
    40
    +
    
    41
    +    def __init__(self, remote):
    
    42
    +        self.__remote = remote
    
    43
    +
    
    44
    +    def get_capabilities(self, instance_name):
    
    45
    +
    
    46
    +        def __get_capabilities(queue, remote, instance_name):
    
    47
    +            interface = CapabilitiesInterface(grpc.insecure_channel(remote))
    
    48
    +
    
    49
    +            result = interface.get_capabilities(instance_name)
    
    50
    +            queue.put(result.SerializeToString())
    
    51
    +
    
    52
    +        result = run_in_subprocess(__get_capabilities,
    
    53
    +                                   self.__remote, instance_name)
    
    54
    +
    
    55
    +        capabilities = remote_execution_pb2.ServerCapabilities()
    
    56
    +        capabilities.ParseFromString(result)
    
    57
    +        return capabilities
    
    58
    +
    
    59
    +
    
    60
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    61
    +def test_execution_not_available_capabilities(instance):
    
    62
    +    with serve_capabilities_service([instance]) as server:
    
    63
    +        server_interface = ServerInterface(server.remote)
    
    64
    +        response = server_interface.get_capabilities(instance)
    
    65
    +
    
    66
    +        assert not response.execution_capabilities.exec_enabled
    
    67
    +
    
    68
    +
    
    69
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    70
    +def test_execution_available_capabilities(instance):
    
    71
    +    controller = ExecutionController()
    
    72
    +
    
    73
    +    with serve_capabilities_service([instance],
    
    74
    +                                    execution_instance=controller.execution_instance) as server:
    
    75
    +        server_interface = ServerInterface(server.remote)
    
    76
    +        response = server_interface.get_capabilities(instance)
    
    77
    +
    
    78
    +        assert response.execution_capabilities.exec_enabled
    
    79
    +        assert response.execution_capabilities.digest_function
    
    80
    +
    
    81
    +
    
    82
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    83
    +def test_action_cache_allow_updates_capabilities(instance):
    
    84
    +    storage = LRUMemoryCache(limit=256)
    
    85
    +    action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=True)
    
    86
    +
    
    87
    +    with serve_capabilities_service([instance],
    
    88
    +                                    action_cache_instance=action_cache) as server:
    
    89
    +        server_interface = ServerInterface(server.remote)
    
    90
    +        response = server_interface.get_capabilities(instance)
    
    91
    +
    
    92
    +        assert response.cache_capabilities.action_cache_update_capabilities.update_enabled
    
    93
    +
    
    94
    +
    
    95
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    96
    +def test_action_cache_not_allow_updates_capabilities(instance):
    
    97
    +    storage = LRUMemoryCache(limit=256)
    
    98
    +    action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=False)
    
    99
    +
    
    100
    +    with serve_capabilities_service([instance],
    
    101
    +                                    action_cache_instance=action_cache) as server:
    
    102
    +        server_interface = ServerInterface(server.remote)
    
    103
    +        response = server_interface.get_capabilities(instance)
    
    104
    +
    
    105
    +        assert not response.cache_capabilities.action_cache_update_capabilities.update_enabled
    
    106
    +
    
    107
    +
    
    108
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    109
    +def test_cas_capabilities(instance):
    
    110
    +    cas = ContentAddressableStorageInstance(None)
    
    111
    +
    
    112
    +    with serve_capabilities_service([instance],
    
    113
    +                                    cas_instance=cas) as server:
    
    114
    +        server_interface = ServerInterface(server.remote)
    
    115
    +        response = server_interface.get_capabilities(instance)
    
    116
    +
    
    117
    +        assert len(response.cache_capabilities.digest_function) == 1
    
    118
    +        assert response.cache_capabilities.digest_function[0]
    
    119
    +        assert response.cache_capabilities.symlink_absolute_path_strategy
    
    120
    +        assert response.cache_capabilities.max_batch_total_size_bytes

  • tests/utils/capabilities.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +from concurrent import futures
    
    17
    +from contextlib import contextmanager
    
    18
    +import multiprocessing
    
    19
    +import os
    
    20
    +import signal
    
    21
    +
    
    22
    +import grpc
    
    23
    +import pytest_cov
    
    24
    +
    
    25
    +from buildgrid.server.capabilities.service import CapabilitiesService
    
    26
    +from buildgrid.server.capabilities.instance import CapabilitiesInstance
    
    27
    +
    
    28
    +
    
    29
    +@contextmanager
    
    30
    +def serve_capabilities_service(instances,
    
    31
    +                               cas_instance=None,
    
    32
    +                               action_cache_instance=None,
    
    33
    +                               execution_instance=None):
    
    34
    +    server = Server(instances,
    
    35
    +                    cas_instance,
    
    36
    +                    action_cache_instance,
    
    37
    +                    execution_instance)
    
    38
    +    try:
    
    39
    +        yield server
    
    40
    +    finally:
    
    41
    +        server.quit()
    
    42
    +
    
    43
    +
    
    44
    +class Server:
    
    45
    +
    
    46
    +    def __init__(self, instances,
    
    47
    +                 cas_instance=None,
    
    48
    +                 action_cache_instance=None,
    
    49
    +                 execution_instance=None):
    
    50
    +        self.instances = instances
    
    51
    +
    
    52
    +        self.__queue = multiprocessing.Queue()
    
    53
    +        self.__process = multiprocessing.Process(
    
    54
    +            target=Server.serve,
    
    55
    +            args=(self.__queue, self.instances, cas_instance, action_cache_instance, execution_instance))
    
    56
    +        self.__process.start()
    
    57
    +
    
    58
    +        self.port = self.__queue.get(timeout=1)
    
    59
    +        self.remote = 'localhost:{}'.format(self.port)
    
    60
    +
    
    61
    +    @staticmethod
    
    62
    +    def serve(queue, instances, cas_instance, action_cache_instance, execution_instance):
    
    63
    +        pytest_cov.embed.cleanup_on_sigterm()
    
    64
    +
    
    65
    +        # Use max_workers default from Python 3.5+
    
    66
    +        max_workers = (os.cpu_count() or 1) * 5
    
    67
    +        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    68
    +        port = server.add_insecure_port('localhost:0')
    
    69
    +
    
    70
    +        capabilities_service = CapabilitiesService(server)
    
    71
    +        for name in instances:
    
    72
    +            capabilities_instance = CapabilitiesInstance(cas_instance, action_cache_instance, execution_instance)
    
    73
    +            capabilities_service.add_instance(name, capabilities_instance)
    
    74
    +
    
    75
    +        server.start()
    
    76
    +        queue.put(port)
    
    77
    +        signal.pause()
    
    78
    +
    
    79
    +    def quit(self):
    
    80
    +        if self.__process:
    
    81
    +            self.__process.terminate()
    
    82
    +            self.__process.join()



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]