[Notes] [Git][BuildGrid/buildgrid][mablanch/144-jwt-authentication] 13 commits: buildgrid/utils.py: New `get_hash_type` function.



Title: GitLab

Martin Blanchard pushed to branch mablanch/144-jwt-authentication at BuildGrid / buildgrid

Commits:

13 changed files:

Changes:

  • buildgrid/_app/commands/cmd_capabilities.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import sys
    
    17
    +from urllib.parse import urlparse
    
    18
    +
    
    19
    +import click
    
    20
    +import grpc
    
    21
    +
    
    22
    +from buildgrid.client.capabilities import CapabilitiesInterface
    
    23
    +
    
    24
    +from ..cli import pass_context
    
    25
    +
    
    26
    +
    
    27
    +@click.command(name='capabilities', short_help="Capabilities service.")
    
    28
    +@click.option('--remote', type=click.STRING, default='http://localhost:50051', show_default=True,
    
    29
    +              help="Remote execution server's URL (port defaults to 50051 if no specified).")
    
    30
    +@click.option('--client-key', type=click.Path(exists=True, dir_okay=False), default=None,
    
    31
    +              help="Private client key for TLS (PEM-encoded)")
    
    32
    +@click.option('--client-cert', type=click.Path(exists=True, dir_okay=False), default=None,
    
    33
    +              help="Public client certificate for TLS (PEM-encoded)")
    
    34
    +@click.option('--server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
    
    35
    +              help="Public server certificate for TLS (PEM-encoded)")
    
    36
    +@click.option('--instance-name', type=click.STRING, default='main', show_default=True,
    
    37
    +              help="Targeted farm instance name.")
    
    38
    +@pass_context
    
    39
    +def cli(context, remote, instance_name, client_key, client_cert, server_cert):
    
    40
    +    click.echo("Getting capabilities...")
    
    41
    +    url = urlparse(remote)
    
    42
    +
    
    43
    +    remote = '{}:{}'.format(url.hostname, url.port or 50051)
    
    44
    +    instance_name = instance_name
    
    45
    +
    
    46
    +    if url.scheme == 'http':
    
    47
    +        channel = grpc.insecure_channel(remote)
    
    48
    +    else:
    
    49
    +        credentials = context.load_client_credentials(client_key, client_cert, server_cert)
    
    50
    +        if not credentials:
    
    51
    +            click.echo("ERROR: no TLS keys were specified and no defaults could be found.", err=True)
    
    52
    +            sys.exit(-1)
    
    53
    +
    
    54
    +        channel = grpc.secure_channel(remote, credentials)
    
    55
    +
    
    56
    +    interface = CapabilitiesInterface(channel)
    
    57
    +    response = interface.get_capabilities(instance_name)
    
    58
    +    click.echo(response)

  • buildgrid/client/capabilities.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import logging
    
    17
    +import grpc
    
    18
    +
    
    19
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    20
    +
    
    21
    +
    
    22
    +class CapabilitiesInterface:
    
    23
    +    """Interface for calls the the Capabilities Service."""
    
    24
    +
    
    25
    +    def __init__(self, channel):
    
    26
    +        """Initialises an instance of the capabilities service.
    
    27
    +
    
    28
    +        Args:
    
    29
    +            channel (grpc.Channel): A gRPC channel to the CAS endpoint.
    
    30
    +        """
    
    31
    +        self.__logger = logging.getLogger(__name__)
    
    32
    +        self.__stub = remote_execution_pb2_grpc.CapabilitiesStub(channel)
    
    33
    +
    
    34
    +    def get_capabilities(self, instance_name):
    
    35
    +        """Returns the capabilities or the server to the user.
    
    36
    +
    
    37
    +        Args:
    
    38
    +            instance_name (str): The name of the instance."""
    
    39
    +
    
    40
    +        request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=instance_name)
    
    41
    +        try:
    
    42
    +            return self.__stub.GetCapabilities(request)
    
    43
    +
    
    44
    +        except grpc.RpcError as e:
    
    45
    +            self.__logger.error(e)
    
    46
    +            raise

  • buildgrid/server/_authentication.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +from datetime import datetime
    
    17
    +from enum import Enum
    
    18
    +
    
    19
    +import grpc
    
    20
    +import jwt
    
    21
    +
    
    22
    +from buildgrid._exceptions import InvalidArgumentError
    
    23
    +
    
    24
    +
    
    25
    +class JwtAlgorithm(Enum):
    
    26
    +    # HMAC algorithms:
    
    27
    +    HS256 = 'HS256'
    
    28
    +    HS384 = 'HS384'
    
    29
    +    HS512 = 'HS512'
    
    30
    +
    
    31
    +    # RSASSA-PKCS algorithms:
    
    32
    +    RS256 = 'RS256'
    
    33
    +    RS384 = 'RS384'
    
    34
    +    RS512 = 'RS512'
    
    35
    +
    
    36
    +    # RSASSA-PSS algorithms:
    
    37
    +    PS256 = 'PS256'
    
    38
    +    PS384 = 'PS384'
    
    39
    +    PS512 = 'PS512'
    
    40
    +
    
    41
    +    # ECDSA algorithms:
    
    42
    +    ES256 = 'ES256'
    
    43
    +    ES384 = 'ES384'
    
    44
    +    ES521 = 'ES521'
    
    45
    +    ES512 = 'ES512'
    
    46
    +
    
    47
    +
    
    48
    +class JwtAuthMetadataInterceptor(grpc.ServerInterceptor):
    
    49
    +
    
    50
    +    __auth_errors = {
    
    51
    +        'missing-bearer': 'Missing authentication header field.',
    
    52
    +        'invalid-bearer': 'Invalid authentication header field.',
    
    53
    +        'invalid-token': 'Invalid authentication token.',
    
    54
    +        'expired-token': 'Expired authentication token.',
    
    55
    +        'unbounded-token': 'Unbounded authentication token.',
    
    56
    +    }
    
    57
    +
    
    58
    +    def __init__(self, secret, algorithm):
    
    59
    +        """Initialises a new :class:`JwtAuthMetadataInterceptor`.
    
    60
    +
    
    61
    +        Args:
    
    62
    +            secret (str): Symetric secret key or asymetric public key.
    
    63
    +            algorithm (JwtAlgorithm): Algorithm used to encode `secret`.
    
    64
    +
    
    65
    +        Raises:
    
    66
    +            InvalidArgumentError: If `algorithm` is not supported.
    
    67
    +        """
    
    68
    +        self.__bearer_cache = {}
    
    69
    +        self.__terminators = {}
    
    70
    +        self.__secret = secret
    
    71
    +
    
    72
    +        self._algorithm = algorithm.value
    
    73
    +
    
    74
    +        try:
    
    75
    +            jwt.register_algorithm(self._algorithm, None)
    
    76
    +
    
    77
    +        except TypeError:
    
    78
    +            raise InvalidArgumentError('Algorithm not supported for JWT decoding: [{}]'
    
    79
    +                                       .format(self._algorithm))
    
    80
    +
    
    81
    +        except ValueError:
    
    82
    +            pass
    
    83
    +
    
    84
    +        for code, message in self.__auth_errors.items():
    
    85
    +            self.__terminators[code] = _unary_unary_rpc_terminator(message)
    
    86
    +
    
    87
    +    @property
    
    88
    +    def algorithm(self):
    
    89
    +        return JwtAlgorithm(self._algorithm)
    
    90
    +
    
    91
    +    def intercept_service(self, continuation, handler_call_details):
    
    92
    +        try:
    
    93
    +            # Reject requests not carrying a token:
    
    94
    +            bearer = dict(handler_call_details.invocation_metadata)['Authorization']
    
    95
    +
    
    96
    +        except KeyError:
    
    97
    +            return self.__terminators['missing-bearer']  # Rejected
    
    98
    +
    
    99
    +        # Reject requests with malformated bearer:
    
    100
    +        if not bearer.startswith('Bearer '):
    
    101
    +            return self.__terminators['invalid-bearer']  # Rejected
    
    102
    +
    
    103
    +        try:
    
    104
    +            # Hit the cache for already validated token:
    
    105
    +            expiration_time = self.__bearer_cache[bearer]
    
    106
    +
    
    107
    +            # Accept request if cached token hasn't expired yet:
    
    108
    +            if expiration_time < datetime.utcnow():
    
    109
    +                return continuation(handler_call_details)  # Accepted
    
    110
    +
    
    111
    +        except KeyError:
    
    112
    +            pass
    
    113
    +
    
    114
    +        try:
    
    115
    +            # Decode and validate the new token:
    
    116
    +            payload = jwt.decode(bearer[7:], self.__secret, algorithm=self._algorithm)
    
    117
    +
    
    118
    +        except jwt.exceptions.ExpiredSignatureError:
    
    119
    +            return self.__terminators['expired-token']  # Rejected
    
    120
    +
    
    121
    +        except jwt.exceptions.InvalidTokenError:
    
    122
    +            return self.__terminators['invalid-token']  # Rejected
    
    123
    +
    
    124
    +        # Do not accept token without an expiration time:
    
    125
    +        if 'exp' not in payload or isinstance(payload['exp'], int):
    
    126
    +            return self.__terminators['unbounded-token']  # Rejected
    
    127
    +
    
    128
    +        # Cache the validated token and store expiration time:
    
    129
    +        self.__bearer_cache[bearer] = datetime.fromtimestamp(payload['exp'])
    
    130
    +
    
    131
    +        return continuation(handler_call_details)  # Accepted
    
    132
    +
    
    133
    +
    
    134
    +def _unary_unary_rpc_terminator(details):
    
    135
    +
    
    136
    +    def terminate(ignored_request, context):
    
    137
    +        context.abort(grpc.StatusCode.UNAUTHENTICATED, details)
    
    138
    +
    
    139
    +    return grpc.unary_unary_rpc_method_handler(terminate)

  • buildgrid/server/capabilities/__init__.py

  • buildgrid/server/capabilities/instance.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import logging
    
    17
    +
    
    18
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    19
    +
    
    20
    +
    
    21
    +class CapabilitiesInstance:
    
    22
    +
    
    23
    +    def __init__(self, cas_instance=None, action_cache_instance=None, execution_instance=None):
    
    24
    +        self.__logger = logging.getLogger(__name__)
    
    25
    +        self.__cas_instance = cas_instance
    
    26
    +        self.__action_cache_instance = action_cache_instance
    
    27
    +        self.__execution_instance = execution_instance
    
    28
    +
    
    29
    +    def register_instance_with_server(self, instance_name, server):
    
    30
    +        server.add_capabilities_instance(self, instance_name)
    
    31
    +
    
    32
    +    def add_cas_instance(self, cas_instance):
    
    33
    +        self.__cas_instance = cas_instance
    
    34
    +
    
    35
    +    def add_action_cache_instance(self, action_cache_instance):
    
    36
    +        self.__action_cache_instance = action_cache_instance
    
    37
    +
    
    38
    +    def add_execution_instance(self, execution_instance):
    
    39
    +        self.__execution_instance = execution_instance
    
    40
    +
    
    41
    +    def get_capabilities(self):
    
    42
    +        server_capabilities = remote_execution_pb2.ServerCapabilities()
    
    43
    +        server_capabilities.cache_capabilities.CopyFrom(self._get_cache_capabilities())
    
    44
    +        server_capabilities.execution_capabilities.CopyFrom(self._get_capabilities_execution())
    
    45
    +        # TODO
    
    46
    +        # When API is stable, fill out SemVer values
    
    47
    +        # server_capabilities.deprecated_api_version =
    
    48
    +        # server_capabilities.low_api_version =
    
    49
    +        # server_capabilities.low_api_version =
    
    50
    +        # server_capabilities.hig_api_version =
    
    51
    +        return server_capabilities
    
    52
    +
    
    53
    +    def _get_cache_capabilities(self):
    
    54
    +        capabilities = remote_execution_pb2.CacheCapabilities()
    
    55
    +        action_cache_update_capabilities = remote_execution_pb2.ActionCacheUpdateCapabilities()
    
    56
    +
    
    57
    +        if self.__cas_instance:
    
    58
    +            capabilities.digest_function.extend([self.__cas_instance.hash_type()])
    
    59
    +            capabilities.max_batch_total_size_bytes = self.__cas_instance.max_batch_total_size_bytes()
    
    60
    +            capabilities.symlink_absolute_path_strategy = self.__cas_instance.symlink_absolute_path_strategy()
    
    61
    +            # TODO: execution priority #102
    
    62
    +            # capabilities.cache_priority_capabilities =
    
    63
    +
    
    64
    +        if self.__action_cache_instance:
    
    65
    +            action_cache_update_capabilities.update_enabled = self.__action_cache_instance.allow_updates
    
    66
    +
    
    67
    +        capabilities.action_cache_update_capabilities.CopyFrom(action_cache_update_capabilities)
    
    68
    +        return capabilities
    
    69
    +
    
    70
    +    def _get_capabilities_execution(self):
    
    71
    +        capabilities = remote_execution_pb2.ExecutionCapabilities()
    
    72
    +        if self.__execution_instance:
    
    73
    +            capabilities.exec_enabled = True
    
    74
    +            capabilities.digest_function = self.__execution_instance.hash_type()
    
    75
    +            # TODO: execution priority #102
    
    76
    +            # capabilities.execution_priority =
    
    77
    +
    
    78
    +        else:
    
    79
    +            capabilities.exec_enabled = False
    
    80
    +
    
    81
    +        return capabilities

  • buildgrid/server/capabilities/service.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import logging
    
    17
    +
    
    18
    +import grpc
    
    19
    +
    
    20
    +from buildgrid._exceptions import InvalidArgumentError
    
    21
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    22
    +
    
    23
    +
    
    24
    +class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
    
    25
    +
    
    26
    +    def __init__(self, server):
    
    27
    +        self.__logger = logging.getLogger(__name__)
    
    28
    +        self.__instances = {}
    
    29
    +        remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(self, server)
    
    30
    +
    
    31
    +    def add_instance(self, name, instance):
    
    32
    +        self.__instances[name] = instance
    
    33
    +
    
    34
    +    def add_cas_instance(self, name, instance):
    
    35
    +        self.__instances[name].add_cas_instance(instance)
    
    36
    +
    
    37
    +    def add_action_cache_instance(self, name, instance):
    
    38
    +        self.__instances[name].add_action_cache_instance(instance)
    
    39
    +
    
    40
    +    def add_execution_instance(self, name, instance):
    
    41
    +        self.__instances[name].add_execution_instance(instance)
    
    42
    +
    
    43
    +    def GetCapabilities(self, request, context):
    
    44
    +        try:
    
    45
    +            instance = self._get_instance(request.instance_name)
    
    46
    +            return instance.get_capabilities()
    
    47
    +
    
    48
    +        except InvalidArgumentError as e:
    
    49
    +            self.__logger.error(e)
    
    50
    +            context.set_details(str(e))
    
    51
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    52
    +
    
    53
    +        return remote_execution_pb2.ServerCapabilities()
    
    54
    +
    
    55
    +    def _get_instance(self, name):
    
    56
    +        try:
    
    57
    +            return self.__instances[name]
    
    58
    +
    
    59
    +        except KeyError:
    
    60
    +            raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))

  • buildgrid/server/cas/instance.py
    ... ... @@ -25,6 +25,7 @@ from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRang
    25 25
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    26 26
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    27 27
     from buildgrid.settings import HASH, HASH_LENGTH
    
    28
    +from buildgrid.utils import get_hash_type
    
    28 29
     
    
    29 30
     
    
    30 31
     class ContentAddressableStorageInstance:
    
    ... ... @@ -37,6 +38,19 @@ class ContentAddressableStorageInstance:
    37 38
         def register_instance_with_server(self, instance_name, server):
    
    38 39
             server.add_cas_instance(self, instance_name)
    
    39 40
     
    
    41
    +    def hash_type(self):
    
    42
    +        return get_hash_type()
    
    43
    +
    
    44
    +    def max_batch_total_size_bytes(self):
    
    45
    +        # TODO: link with max size
    
    46
    +        # Should be added from settings in MR !119
    
    47
    +        return 2000000
    
    48
    +
    
    49
    +    def symlink_absolute_path_strategy(self):
    
    50
    +        # Currently this strategy is hardcoded into BuildGrid
    
    51
    +        # With no setting to reference
    
    52
    +        return re_pb2.CacheCapabilities().DISALLOWED
    
    53
    +
    
    40 54
         def find_missing_blobs(self, blob_digests):
    
    41 55
             storage = self._storage
    
    42 56
             return re_pb2.FindMissingBlobsResponse(
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -25,6 +25,7 @@ from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    25 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26 26
     
    
    27 27
     from ..job import Job
    
    28
    +from ...utils import get_hash_type
    
    28 29
     
    
    29 30
     
    
    30 31
     class ExecutionInstance:
    
    ... ... @@ -38,6 +39,9 @@ class ExecutionInstance:
    38 39
         def register_instance_with_server(self, instance_name, server):
    
    39 40
             server.add_execution_instance(self, instance_name)
    
    40 41
     
    
    42
    +    def hash_type(self):
    
    43
    +        return get_hash_type()
    
    44
    +
    
    41 45
         def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    42 46
             """ Sends a job for execution.
    
    43 47
             Queues an action and creates an Operation instance to be associated with
    

  • buildgrid/server/instance.py
    ... ... @@ -22,12 +22,15 @@ import signal
    22 22
     import grpc
    
    23 23
     
    
    24 24
     from buildgrid.server.actioncache.service import ActionCacheService
    
    25
    +from buildgrid.server._authentication import JwtAlgorithm, JwtAuthMetadataInterceptor
    
    25 26
     from buildgrid.server.bots.service import BotsService
    
    26 27
     from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    27 28
     from buildgrid.server.execution.service import ExecutionService
    
    28 29
     from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
    
    29 30
     from buildgrid.server.operations.service import OperationsService
    
    30 31
     from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    32
    +from buildgrid.server.capabilities.instance import CapabilitiesInstance
    
    33
    +from buildgrid.server.capabilities.service import CapabilitiesService
    
    31 34
     
    
    32 35
     
    
    33 36
     class BuildGridServer:
    
    ... ... @@ -50,11 +53,16 @@ class BuildGridServer:
    50 53
                 max_workers = (os.cpu_count() or 1) * 5
    
    51 54
     
    
    52 55
             self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
    
    53
    -        self.__grpc_server = grpc.server(self.__grpc_executor)
    
    56
    +        self.__grpc_auth_interceptor = JwtAuthMetadataInterceptor('your-256-bit-secret', JwtAlgorithm.HS256)
    
    57
    +        self.__grpc_server = grpc.server(
    
    58
    +            self.__grpc_executor, interceptors=(self.__grpc_auth_interceptor,))
    
    54 59
     
    
    55 60
             self.__main_loop = asyncio.get_event_loop()
    
    56 61
             self.__monitoring_bus = None
    
    57 62
     
    
    63
    +        # We always want a capabilities service
    
    64
    +        self._capabilities_service = CapabilitiesService(self.__grpc_server)
    
    65
    +
    
    58 66
             self._execution_service = None
    
    59 67
             self._bots_service = None
    
    60 68
             self._operations_service = None
    
    ... ... @@ -128,6 +136,7 @@ class BuildGridServer:
    128 136
                 self._execution_service = ExecutionService(self.__grpc_server)
    
    129 137
     
    
    130 138
             self._execution_service.add_instance(instance_name, instance)
    
    139
    +        self._add_capabilities_instance(instance_name, execution_instance=instance)
    
    131 140
     
    
    132 141
         def add_bots_interface(self, instance, instance_name):
    
    133 142
             """Adds a :obj:`BotsInterface` to the service.
    
    ... ... @@ -184,9 +193,10 @@ class BuildGridServer:
    184 193
                 self._action_cache_service = ActionCacheService(self.__grpc_server)
    
    185 194
     
    
    186 195
             self._action_cache_service.add_instance(instance_name, instance)
    
    196
    +        self._add_capabilities_instance(instance_name, action_cache_instance=instance)
    
    187 197
     
    
    188 198
         def add_cas_instance(self, instance, instance_name):
    
    189
    -        """Stores a :obj:`ContentAddressableStorageInstance` to the service.
    
    199
    +        """Adds a :obj:`ContentAddressableStorageInstance` to the service.
    
    190 200
     
    
    191 201
             If no service exists, it creates one.
    
    192 202
     
    
    ... ... @@ -198,9 +208,10 @@ class BuildGridServer:
    198 208
                 self._cas_service = ContentAddressableStorageService(self.__grpc_server)
    
    199 209
     
    
    200 210
             self._cas_service.add_instance(instance_name, instance)
    
    211
    +        self._add_capabilities_instance(instance_name, cas_instance=instance)
    
    201 212
     
    
    202 213
         def add_bytestream_instance(self, instance, instance_name):
    
    203
    -        """Stores a :obj:`ByteStreamInstance` to the service.
    
    214
    +        """Adds a :obj:`ByteStreamInstance` to the service.
    
    204 215
     
    
    205 216
             If no service exists, it creates one.
    
    206 217
     
    
    ... ... @@ -213,6 +224,31 @@ class BuildGridServer:
    213 224
     
    
    214 225
             self._bytestream_service.add_instance(instance_name, instance)
    
    215 226
     
    
    227
    +    def _add_capabilities_instance(self, instance_name,
    
    228
    +                                   cas_instance=None,
    
    229
    +                                   action_cache_instance=None,
    
    230
    +                                   execution_instance=None):
    
    231
    +        """Adds a :obj:`CapabilitiesInstance` to the service.
    
    232
    +
    
    233
    +        Args:
    
    234
    +            instance (:obj:`CapabilitiesInstance`): Instance to add.
    
    235
    +            instance_name (str): Instance name.
    
    236
    +        """
    
    237
    +
    
    238
    +        try:
    
    239
    +            if cas_instance:
    
    240
    +                self._capabilities_service.add_cas_instance(instance_name, cas_instance)
    
    241
    +            if action_cache_instance:
    
    242
    +                self._capabilities_service.add_action_cache_instance(instance_name, action_cache_instance)
    
    243
    +            if execution_instance:
    
    244
    +                self._capabilities_service.add_execution_instance(instance_name, execution_instance)
    
    245
    +
    
    246
    +        except KeyError:
    
    247
    +            capabilities_instance = CapabilitiesInstance(cas_instance,
    
    248
    +                                                         action_cache_instance,
    
    249
    +                                                         execution_instance)
    
    250
    +            self._capabilities_service.add_instance(instance_name, capabilities_instance)
    
    251
    +
    
    216 252
         # --- Public API: Monitoring ---
    
    217 253
     
    
    218 254
         @property
    

  • buildgrid/utils.py
    ... ... @@ -30,6 +30,14 @@ def get_hostname():
    30 30
         return socket.gethostname()
    
    31 31
     
    
    32 32
     
    
    33
    +def get_hash_type():
    
    34
    +    """Returns the hash type."""
    
    35
    +    hash_name = HASH().name
    
    36
    +    if hash_name == "sha256":
    
    37
    +        return remote_execution_pb2.SHA256
    
    38
    +    return remote_execution_pb2.UNKNOWN
    
    39
    +
    
    40
    +
    
    33 41
     def create_digest(bytes_to_digest):
    
    34 42
         """Computes the :obj:`Digest` of a piece of data.
    
    35 43
     
    

  • tests/cas/test_storage.py
    ... ... @@ -21,8 +21,8 @@ import tempfile
    21 21
     
    
    22 22
     import boto3
    
    23 23
     import grpc
    
    24
    -import pytest
    
    25 24
     from moto import mock_s3
    
    25
    +import pytest
    
    26 26
     
    
    27 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 28
     from buildgrid.server.cas.storage.remote import RemoteStorage
    

  • tests/integration/capabilities_service.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +# pylint: disable=redefined-outer-name
    
    16
    +
    
    17
    +
    
    18
    +import grpc
    
    19
    +import pytest
    
    20
    +
    
    21
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    22
    +from buildgrid.client.capabilities import CapabilitiesInterface
    
    23
    +from buildgrid.server.controller import ExecutionController
    
    24
    +from buildgrid.server.actioncache.storage import ActionCache
    
    25
    +from buildgrid.server.cas.instance import ContentAddressableStorageInstance
    
    26
    +from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
    
    27
    +
    
    28
    +from ..utils.utils import run_in_subprocess
    
    29
    +from ..utils.capabilities import serve_capabilities_service
    
    30
    +
    
    31
    +
    
    32
    +INSTANCES = ['', 'instance']
    
    33
    +
    
    34
    +
    
    35
    +# Use subprocess to avoid creation of gRPC threads in main process
    
    36
    +# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
    
    37
    +# Multiprocessing uses pickle which protobufs don't work with
    
    38
    +# Workaround wrapper to send messages as strings
    
    39
    +class ServerInterface:
    
    40
    +
    
    41
    +    def __init__(self, remote):
    
    42
    +        self.__remote = remote
    
    43
    +
    
    44
    +    def get_capabilities(self, instance_name):
    
    45
    +
    
    46
    +        def __get_capabilities(queue, remote, instance_name):
    
    47
    +            interface = CapabilitiesInterface(grpc.insecure_channel(remote))
    
    48
    +
    
    49
    +            result = interface.get_capabilities(instance_name)
    
    50
    +            queue.put(result.SerializeToString())
    
    51
    +
    
    52
    +        result = run_in_subprocess(__get_capabilities,
    
    53
    +                                   self.__remote, instance_name)
    
    54
    +
    
    55
    +        capabilities = remote_execution_pb2.ServerCapabilities()
    
    56
    +        capabilities.ParseFromString(result)
    
    57
    +        return capabilities
    
    58
    +
    
    59
    +
    
    60
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    61
    +def test_execution_not_available_capabilities(instance):
    
    62
    +    with serve_capabilities_service([instance]) as server:
    
    63
    +        server_interface = ServerInterface(server.remote)
    
    64
    +        response = server_interface.get_capabilities(instance)
    
    65
    +
    
    66
    +        assert not response.execution_capabilities.exec_enabled
    
    67
    +
    
    68
    +
    
    69
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    70
    +def test_execution_available_capabilities(instance):
    
    71
    +    controller = ExecutionController()
    
    72
    +
    
    73
    +    with serve_capabilities_service([instance],
    
    74
    +                                    execution_instance=controller.execution_instance) as server:
    
    75
    +        server_interface = ServerInterface(server.remote)
    
    76
    +        response = server_interface.get_capabilities(instance)
    
    77
    +
    
    78
    +        assert response.execution_capabilities.exec_enabled
    
    79
    +        assert response.execution_capabilities.digest_function
    
    80
    +
    
    81
    +
    
    82
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    83
    +def test_action_cache_allow_updates_capabilities(instance):
    
    84
    +    storage = LRUMemoryCache(limit=256)
    
    85
    +    action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=True)
    
    86
    +
    
    87
    +    with serve_capabilities_service([instance],
    
    88
    +                                    action_cache_instance=action_cache) as server:
    
    89
    +        server_interface = ServerInterface(server.remote)
    
    90
    +        response = server_interface.get_capabilities(instance)
    
    91
    +
    
    92
    +        assert response.cache_capabilities.action_cache_update_capabilities.update_enabled
    
    93
    +
    
    94
    +
    
    95
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    96
    +def test_action_cache_not_allow_updates_capabilities(instance):
    
    97
    +    storage = LRUMemoryCache(limit=256)
    
    98
    +    action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=False)
    
    99
    +
    
    100
    +    with serve_capabilities_service([instance],
    
    101
    +                                    action_cache_instance=action_cache) as server:
    
    102
    +        server_interface = ServerInterface(server.remote)
    
    103
    +        response = server_interface.get_capabilities(instance)
    
    104
    +
    
    105
    +        assert not response.cache_capabilities.action_cache_update_capabilities.update_enabled
    
    106
    +
    
    107
    +
    
    108
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    109
    +def test_cas_capabilities(instance):
    
    110
    +    cas = ContentAddressableStorageInstance(None)
    
    111
    +
    
    112
    +    with serve_capabilities_service([instance],
    
    113
    +                                    cas_instance=cas) as server:
    
    114
    +        server_interface = ServerInterface(server.remote)
    
    115
    +        response = server_interface.get_capabilities(instance)
    
    116
    +
    
    117
    +        assert len(response.cache_capabilities.digest_function) == 1
    
    118
    +        assert response.cache_capabilities.digest_function[0]
    
    119
    +        assert response.cache_capabilities.symlink_absolute_path_strategy
    
    120
    +        assert response.cache_capabilities.max_batch_total_size_bytes

  • tests/utils/capabilities.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +from concurrent import futures
    
    17
    +from contextlib import contextmanager
    
    18
    +import multiprocessing
    
    19
    +import os
    
    20
    +import signal
    
    21
    +
    
    22
    +import grpc
    
    23
    +import pytest_cov
    
    24
    +
    
    25
    +from buildgrid.server.capabilities.service import CapabilitiesService
    
    26
    +from buildgrid.server.capabilities.instance import CapabilitiesInstance
    
    27
    +
    
    28
    +
    
    29
    +@contextmanager
    
    30
    +def serve_capabilities_service(instances,
    
    31
    +                               cas_instance=None,
    
    32
    +                               action_cache_instance=None,
    
    33
    +                               execution_instance=None):
    
    34
    +    server = Server(instances,
    
    35
    +                    cas_instance,
    
    36
    +                    action_cache_instance,
    
    37
    +                    execution_instance)
    
    38
    +    try:
    
    39
    +        yield server
    
    40
    +    finally:
    
    41
    +        server.quit()
    
    42
    +
    
    43
    +
    
    44
    +class Server:
    
    45
    +
    
    46
    +    def __init__(self, instances,
    
    47
    +                 cas_instance=None,
    
    48
    +                 action_cache_instance=None,
    
    49
    +                 execution_instance=None):
    
    50
    +        self.instances = instances
    
    51
    +
    
    52
    +        self.__queue = multiprocessing.Queue()
    
    53
    +        self.__process = multiprocessing.Process(
    
    54
    +            target=Server.serve,
    
    55
    +            args=(self.__queue, self.instances, cas_instance, action_cache_instance, execution_instance))
    
    56
    +        self.__process.start()
    
    57
    +
    
    58
    +        self.port = self.__queue.get(timeout=1)
    
    59
    +        self.remote = 'localhost:{}'.format(self.port)
    
    60
    +
    
    61
    +    @staticmethod
    
    62
    +    def serve(queue, instances, cas_instance, action_cache_instance, execution_instance):
    
    63
    +        pytest_cov.embed.cleanup_on_sigterm()
    
    64
    +
    
    65
    +        # Use max_workers default from Python 3.5+
    
    66
    +        max_workers = (os.cpu_count() or 1) * 5
    
    67
    +        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    68
    +        port = server.add_insecure_port('localhost:0')
    
    69
    +
    
    70
    +        capabilities_service = CapabilitiesService(server)
    
    71
    +        for name in instances:
    
    72
    +            capabilities_instance = CapabilitiesInstance(cas_instance, action_cache_instance, execution_instance)
    
    73
    +            capabilities_service.add_instance(name, capabilities_instance)
    
    74
    +
    
    75
    +        server.start()
    
    76
    +        queue.put(port)
    
    77
    +        signal.pause()
    
    78
    +
    
    79
    +    def quit(self):
    
    80
    +        if self.__process:
    
    81
    +            self.__process.terminate()
    
    82
    +            self.__process.join()



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]