[Notes] [Git][BuildGrid/buildgrid][mablanch/135-monitoring-bus] 8 commits: monitoring.py: New monitoring bus class



Title: GitLab

Martin Blanchard pushed to branch mablanch/135-monitoring-bus at BuildGrid / buildgrid

Commits:

8 changed files:

Changes:

  • buildgrid/_app/commands/cmd_server.py
    ... ... @@ -20,7 +20,6 @@ Server command
    20 20
     Create a BuildGrid server.
    
    21 21
     """
    
    22 22
     
    
    23
    -import asyncio
    
    24 23
     import logging
    
    25 24
     import sys
    
    26 25
     
    
    ... ... @@ -52,18 +51,14 @@ def start(context, config):
    52 51
             click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True)
    
    53 52
             sys.exit(-1)
    
    54 53
     
    
    55
    -    loop = asyncio.get_event_loop()
    
    56 54
         try:
    
    57 55
             server.start()
    
    58
    -        loop.run_forever()
    
    59 56
     
    
    60 57
         except KeyboardInterrupt:
    
    61 58
             pass
    
    62 59
     
    
    63 60
         finally:
    
    64
    -        context.logger.info("Stopping server")
    
    65 61
             server.stop()
    
    66
    -        loop.close()
    
    67 62
     
    
    68 63
     
    
    69 64
     def _create_server_from_config(config):
    

  • buildgrid/server/_monitoring.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import asyncio
    
    17
    +from enum import Enum
    
    18
    +import sys
    
    19
    +
    
    20
    +from google.protobuf import json_format
    
    21
    +
    
    22
    +from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    23
    +
    
    24
    +
    
    25
    +class MonitoringOutputType(Enum):
    
    26
    +    # Standard output stream.
    
    27
    +    STDOUT = 'stdout'
    
    28
    +    # On-disk file.
    
    29
    +    FILE = 'file'
    
    30
    +    # UNIX domain socket.
    
    31
    +    SOCKET = 'socket'
    
    32
    +
    
    33
    +
    
    34
    +class MonitoringOutputFormat(Enum):
    
    35
    +    # Protobuf binary format.
    
    36
    +    BINARY = 'binary'
    
    37
    +    # JSON format.
    
    38
    +    JSON = 'json'
    
    39
    +
    
    40
    +
    
    41
    +class MonitoringBus:
    
    42
    +
    
    43
    +    def __init__(self, event_loop,
    
    44
    +                 endpoint_type=MonitoringOutputType.SOCKET, endpoint_location=None,
    
    45
    +                 serialisation_format=MonitoringOutputFormat.BINARY):
    
    46
    +        self.__event_loop = event_loop
    
    47
    +        self.__streaming_task = None
    
    48
    +
    
    49
    +        self.__message_queue = asyncio.Queue(loop=self.__event_loop)
    
    50
    +        self.__sequence_number = 1
    
    51
    +
    
    52
    +        self.__output_location = None
    
    53
    +        self.__async_output = False
    
    54
    +        self.__json_output = False
    
    55
    +
    
    56
    +        if endpoint_type == MonitoringOutputType.FILE:
    
    57
    +            self.__output_location = endpoint_location
    
    58
    +
    
    59
    +        elif endpoint_type == MonitoringOutputType.SOCKET:
    
    60
    +            self.__output_location = endpoint_location
    
    61
    +            self.__async_output = True
    
    62
    +
    
    63
    +        if serialisation_format == MonitoringOutputFormat.JSON:
    
    64
    +            self.__json_output = True
    
    65
    +
    
    66
    +    # --- Public API ---
    
    67
    +
    
    68
    +    def start(self):
    
    69
    +        """Starts the monitoring bus worker task."""
    
    70
    +        if self.__streaming_task is not None:
    
    71
    +            return
    
    72
    +
    
    73
    +        self.__streaming_task = asyncio.ensure_future(
    
    74
    +            self._streaming_worker(), loop=self.__event_loop)
    
    75
    +
    
    76
    +    def stop(self):
    
    77
    +        """Cancels the monitoring bus worker task."""
    
    78
    +        if self.__streaming_task is None:
    
    79
    +            return
    
    80
    +
    
    81
    +        self.__streaming_task.cancel()
    
    82
    +
    
    83
    +    async def send_record(self, record):
    
    84
    +        """Publishes a record onto the bus asynchronously.
    
    85
    +
    
    86
    +        Args:
    
    87
    +            record (Message): The
    
    88
    +        """
    
    89
    +        await self.__message_queue.put(record)
    
    90
    +
    
    91
    +    def send_record_nowait(self, record):
    
    92
    +        """Publishes a record onto the bus.
    
    93
    +
    
    94
    +        Args:
    
    95
    +            record (Message): The
    
    96
    +        """
    
    97
    +        self.__message_queue.put_nowait(record)
    
    98
    +
    
    99
    +    # --- Private API ---
    
    100
    +
    
    101
    +    async def _streaming_worker(self):
    
    102
    +        """Handles bus messages steaming work."""
    
    103
    +        async def __streaming_worker(end_points):
    
    104
    +            record = await self.__message_queue.get()
    
    105
    +
    
    106
    +            message = monitoring_pb2.BusMessage()
    
    107
    +            message.sequence_number = self.__sequence_number
    
    108
    +
    
    109
    +            if record.DESCRIPTOR is monitoring_pb2.LogRecord.DESCRIPTOR:
    
    110
    +                message.log_record.CopyFrom(record)
    
    111
    +
    
    112
    +            elif record.DESCRIPTOR is monitoring_pb2.MetricRecord.DESCRIPTOR:
    
    113
    +                message.metric_record.CopyFrom(record)
    
    114
    +
    
    115
    +            else:
    
    116
    +                return False
    
    117
    +
    
    118
    +            if self.__json_output:
    
    119
    +                binary_message = json_format.MessageToJson(message).encode()
    
    120
    +            else:
    
    121
    +                binary_message = message.SerializeToString()
    
    122
    +
    
    123
    +            for end_point in end_points:
    
    124
    +                end_point.write(binary_message)
    
    125
    +
    
    126
    +            return True
    
    127
    +
    
    128
    +        output_writers, output_file = [], None
    
    129
    +
    
    130
    +        async def __client_connected_callback(reader, writer):
    
    131
    +            output_writers.append(writer)
    
    132
    +
    
    133
    +        try:
    
    134
    +            if self.__async_output and self.__output_location:
    
    135
    +                await asyncio.start_unix_server(
    
    136
    +                    __client_connected_callback, path=self.__output_location,
    
    137
    +                    loop=self.__event_loop)
    
    138
    +
    
    139
    +                while True:
    
    140
    +                    if await __streaming_worker(output_writers):
    
    141
    +                        self.__sequence_number += 1
    
    142
    +
    
    143
    +                        for writer in output_writers:
    
    144
    +                            await writer.drain()
    
    145
    +
    
    146
    +            elif self.__output_location:
    
    147
    +                output_file = open(self.__output_location, mode='wb')
    
    148
    +
    
    149
    +                output_writers.append(output_file)
    
    150
    +
    
    151
    +                while True:
    
    152
    +                    if await __streaming_worker(iter(output_file)):
    
    153
    +                        self.__sequence_number += 1
    
    154
    +
    
    155
    +            else:
    
    156
    +                output_writers.append(sys.stdout.buffer)
    
    157
    +
    
    158
    +                while True:
    
    159
    +                    if await __streaming_worker(output_writers):
    
    160
    +                        self.__sequence_number += 1
    
    161
    +
    
    162
    +        except asyncio.CancelledError:
    
    163
    +            if output_file is not None:
    
    164
    +                output_file.close()
    
    165
    +
    
    166
    +            elif output_writers:
    
    167
    +                for writer in output_writers:
    
    168
    +                    writer.close()
    
    169
    +                    await writer.wait_closed()

  • buildgrid/server/cas/instance.py
    ... ... @@ -24,7 +24,7 @@ import logging
    24 24
     from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    25 25
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    26 26
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    27
    -from buildgrid.settings import HASH
    
    27
    +from buildgrid.settings import HASH, HASH_LENGTH
    
    28 28
     
    
    29 29
     
    
    30 30
     class ContentAddressableStorageInstance:
    
    ... ... @@ -71,15 +71,12 @@ class ByteStreamInstance:
    71 71
         def register_instance_with_server(self, instance_name, server):
    
    72 72
             server.add_bytestream_instance(self, instance_name)
    
    73 73
     
    
    74
    -    def read(self, path, read_offset, read_limit):
    
    75
    -        storage = self._storage
    
    76
    -
    
    77
    -        if path[0] == "blobs":
    
    78
    -            path = [""] + path
    
    74
    +    def read(self, digest_hash, digest_size, read_offset, read_limit):
    
    75
    +        if len(digest_hash) != HASH_LENGTH or not digest_size.isdigit():
    
    76
    +            raise InvalidArgumentError("Invalid digest [{}/{}]"
    
    77
    +                                       .format(digest_hash, digest_size))
    
    79 78
     
    
    80
    -        # Parse/verify resource name.
    
    81
    -        # Read resource names look like "[instance/]blobs/abc123hash/99".
    
    82
    -        digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
    
    79
    +        digest = re_pb2.Digest(hash=digest_hash, size_bytes=int(digest_size))
    
    83 80
     
    
    84 81
             # Check the given read offset and limit.
    
    85 82
             if read_offset < 0 or read_offset > digest.size_bytes:
    
    ... ... @@ -95,7 +92,7 @@ class ByteStreamInstance:
    95 92
                 raise InvalidArgumentError("Negative read_limit is invalid")
    
    96 93
     
    
    97 94
             # Read the blob from storage and send its contents to the client.
    
    98
    -        result = storage.get_blob(digest)
    
    95
    +        result = self._storage.get_blob(digest)
    
    99 96
             if result is None:
    
    100 97
                 raise NotFoundError("Blob not found")
    
    101 98
     
    
    ... ... @@ -110,51 +107,35 @@ class ByteStreamInstance:
    110 107
                     data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
    
    111 108
                 bytes_remaining -= self.BLOCK_SIZE
    
    112 109
     
    
    113
    -    def write(self, requests):
    
    114
    -        storage = self._storage
    
    110
    +    def write(self, digest_hash, digest_size, first_block, other_blocks):
    
    111
    +        if len(digest_hash) != HASH_LENGTH or not digest_size.isdigit():
    
    112
    +            raise InvalidArgumentError("Invalid digest [{}/{}]"
    
    113
    +                                       .format(digest_hash, digest_size))
    
    115 114
     
    
    116
    -        first_request = next(requests)
    
    117
    -        path = first_request.resource_name.split("/")
    
    115
    +        digest = re_pb2.Digest(hash=digest_hash, size_bytes=int(digest_size))
    
    118 116
     
    
    119
    -        if path[0] == "uploads":
    
    120
    -            path = [""] + path
    
    121
    -
    
    122
    -        digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
    
    123
    -        write_session = storage.begin_write(digest)
    
    117
    +        write_session = self._storage.begin_write(digest)
    
    124 118
     
    
    125 119
             # Start the write session and write the first request's data.
    
    126
    -        write_session.write(first_request.data)
    
    127
    -        hash_ = HASH(first_request.data)
    
    128
    -        bytes_written = len(first_request.data)
    
    129
    -        finished = first_request.finish_write
    
    130
    -
    
    131
    -        # Handle subsequent write requests.
    
    132
    -        while not finished:
    
    133
    -
    
    134
    -            for request in requests:
    
    135
    -                if finished:
    
    136
    -                    raise InvalidArgumentError("Write request sent after write finished")
    
    137
    -
    
    138
    -                elif request.write_offset != bytes_written:
    
    139
    -                    raise InvalidArgumentError("Invalid write offset")
    
    120
    +        write_session.write(first_block)
    
    140 121
     
    
    141
    -                elif request.resource_name and request.resource_name != first_request.resource_name:
    
    142
    -                    raise InvalidArgumentError("Resource name changed mid-write")
    
    122
    +        computed_hash = HASH(first_block)
    
    123
    +        bytes_written = len(first_block)
    
    143 124
     
    
    144
    -                finished = request.finish_write
    
    145
    -                bytes_written += len(request.data)
    
    146
    -                if bytes_written > digest.size_bytes:
    
    147
    -                    raise InvalidArgumentError("Wrote too much data to blob")
    
    125
    +        # Handle subsequent write requests.
    
    126
    +        for next_block in other_blocks:
    
    127
    +            write_session.write(next_block)
    
    148 128
     
    
    149
    -                write_session.write(request.data)
    
    150
    -                hash_.update(request.data)
    
    129
    +            computed_hash.update(next_block)
    
    130
    +            bytes_written += len(next_block)
    
    151 131
     
    
    152 132
             # Check that the data matches the provided digest.
    
    153
    -        if bytes_written != digest.size_bytes or not finished:
    
    133
    +        if bytes_written != digest.size_bytes:
    
    154 134
                 raise NotImplementedError("Cannot close stream before finishing write")
    
    155 135
     
    
    156
    -        elif hash_.hexdigest() != digest.hash:
    
    136
    +        elif computed_hash.hexdigest() != digest.hash:
    
    157 137
                 raise InvalidArgumentError("Data does not match hash")
    
    158 138
     
    
    159
    -        storage.commit_write(digest, write_session)
    
    139
    +        self._storage.commit_write(digest, write_session)
    
    140
    +
    
    160 141
             return bytestream_pb2.WriteResponse(committed_size=bytes_written)

  • buildgrid/server/cas/service.py
    ... ... @@ -21,7 +21,6 @@ Implements the Content Addressable Storage API and ByteStream API.
    21 21
     """
    
    22 22
     
    
    23 23
     
    
    24
    -from itertools import tee
    
    25 24
     import logging
    
    26 25
     
    
    27 26
     import grpc
    
    ... ... @@ -115,27 +114,30 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    115 114
         def Read(self, request, context):
    
    116 115
             self.__logger.debug("Read request from [%s]", context.peer())
    
    117 116
     
    
    117
    +        names = request.resource_name.split('/')
    
    118
    +
    
    118 119
             try:
    
    119
    -            path = request.resource_name.split("/")
    
    120
    -            instance_name = path[0]
    
    120
    +            instance_name = ''
    
    121
    +            # Format: "{instance_name}/blobs/{hash}/{size}":
    
    122
    +            if len(names) < 3 or names[-3] != 'blobs':
    
    123
    +                raise InvalidArgumentError("Invalid resource name: [{}]"
    
    124
    +                                           .format(request.resource_name))
    
    121 125
     
    
    122
    -            # TODO: Decide on default instance name
    
    123
    -            if path[0] == "blobs":
    
    124
    -                if len(path) < 3 or not path[2].isdigit():
    
    125
    -                    raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
    
    126
    -                instance_name = ""
    
    126
    +            elif names[0] != 'blobs':
    
    127
    +                index = names.index('blobs')
    
    128
    +                instance_name = '/'.join(names[:index])
    
    129
    +                names = names[index:]
    
    127 130
     
    
    128
    -            elif path[1] == "blobs":
    
    129
    -                if len(path) < 4 or not path[3].isdigit():
    
    130
    -                    raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
    
    131
    +            if len(names) < 3:
    
    132
    +                raise InvalidArgumentError("Invalid resource name: [{}]"
    
    133
    +                                           .format(request.resource_name))
    
    131 134
     
    
    132
    -            else:
    
    133
    -                raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
    
    135
    +            hash_, size_bytes = names[1], names[2]
    
    134 136
     
    
    135 137
                 instance = self._get_instance(instance_name)
    
    136
    -            yield from instance.read(path,
    
    137
    -                                     request.read_offset,
    
    138
    -                                     request.read_limit)
    
    138
    +
    
    139
    +            yield from instance.read(hash_, size_bytes,
    
    140
    +                                     request.read_offset, request.read_limit)
    
    139 141
     
    
    140 142
             except InvalidArgumentError as e:
    
    141 143
                 self.__logger.error(e)
    
    ... ... @@ -158,31 +160,31 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    158 160
         def Write(self, requests, context):
    
    159 161
             self.__logger.debug("Write request from [%s]", context.peer())
    
    160 162
     
    
    161
    -        try:
    
    162
    -            requests, request_probe = tee(requests, 2)
    
    163
    -            first_request = next(request_probe)
    
    164
    -
    
    165
    -            path = first_request.resource_name.split("/")
    
    163
    +        request = next(requests)
    
    164
    +        names = request.resource_name.split('/')
    
    166 165
     
    
    167
    -            instance_name = path[0]
    
    166
    +        try:
    
    167
    +            instance_name = ''
    
    168
    +            # Format: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}/{anything}":
    
    169
    +            if len(names) < 5 or 'uploads' not in names or 'blobs' not in names:
    
    170
    +                raise InvalidArgumentError("Invalid resource name: [{}]"
    
    171
    +                                           .format(request.resource_name))
    
    168 172
     
    
    169
    -            # TODO: Sort out no instance name
    
    170
    -            if path[0] == "uploads":
    
    171
    -                if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit():
    
    172
    -                    raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
    
    173
    -                instance_name = ""
    
    173
    +            elif names[0] != 'uploads':
    
    174
    +                index = names.index('uploads')
    
    175
    +                instance_name = '/'.join(names[:index])
    
    176
    +                names = names[index:]
    
    174 177
     
    
    175
    -            elif path[1] == "uploads":
    
    176
    -                if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit():
    
    177
    -                    raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
    
    178
    +            if len(names) < 5:
    
    179
    +                raise InvalidArgumentError("Invalid resource name: [{}]"
    
    180
    +                                           .format(request.resource_name))
    
    178 181
     
    
    179
    -            else:
    
    180
    -                raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
    
    182
    +            _, hash_, size_bytes = names[1], names[3], names[4]
    
    181 183
     
    
    182 184
                 instance = self._get_instance(instance_name)
    
    183
    -            response = instance.write(requests)
    
    184 185
     
    
    185
    -            return response
    
    186
    +            return instance.write(hash_, size_bytes, request.data,
    
    187
    +                                  [request.data for request in requests])
    
    186 188
     
    
    187 189
             except NotImplementedError as e:
    
    188 190
                 self.__logger.error(e)
    

  • buildgrid/server/instance.py
    ... ... @@ -13,18 +13,21 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +import asyncio
    
    16 17
     from concurrent import futures
    
    17 18
     import logging
    
    18 19
     import os
    
    20
    +import signal
    
    19 21
     
    
    20 22
     import grpc
    
    21 23
     
    
    22
    -from .cas.service import ByteStreamService, ContentAddressableStorageService
    
    23
    -from .actioncache.service import ActionCacheService
    
    24
    -from .execution.service import ExecutionService
    
    25
    -from .operations.service import OperationsService
    
    26
    -from .bots.service import BotsService
    
    27
    -from .referencestorage.service import ReferenceStorageService
    
    24
    +from buildgrid.server.actioncache.service import ActionCacheService
    
    25
    +from buildgrid.server.bots.service import BotsService
    
    26
    +from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    27
    +from buildgrid.server.execution.service import ExecutionService
    
    28
    +from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
    
    29
    +from buildgrid.server.operations.service import OperationsService
    
    30
    +from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    28 31
     
    
    29 32
     
    
    30 33
     class BuildGridServer:
    
    ... ... @@ -34,7 +37,7 @@ class BuildGridServer:
    34 37
         requisite services.
    
    35 38
         """
    
    36 39
     
    
    37
    -    def __init__(self, max_workers=None):
    
    40
    +    def __init__(self, max_workers=None, monitor=True):
    
    38 41
             """Initializes a new :class:`BuildGridServer` instance.
    
    39 42
     
    
    40 43
             Args:
    
    ... ... @@ -46,9 +49,11 @@ class BuildGridServer:
    46 49
                 # Use max_workers default from Python 3.5+
    
    47 50
                 max_workers = (os.cpu_count() or 1) * 5
    
    48 51
     
    
    49
    -        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    52
    +        self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
    
    53
    +        self.__grpc_server = grpc.server(self.__grpc_executor)
    
    50 54
     
    
    51
    -        self._server = server
    
    55
    +        self.__main_loop = asyncio.get_event_loop()
    
    56
    +        self.__monitoring_bus = None
    
    52 57
     
    
    53 58
             self._execution_service = None
    
    54 59
             self._bots_service = None
    
    ... ... @@ -58,15 +63,34 @@ class BuildGridServer:
    58 63
             self._cas_service = None
    
    59 64
             self._bytestream_service = None
    
    60 65
     
    
    66
    +        self._is_instrumented = monitor
    
    67
    +
    
    68
    +        if self._is_instrumented:
    
    69
    +            self.__monitoring_bus = MonitoringBus(
    
    70
    +                self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
    
    71
    +                serialisation_format=MonitoringOutputFormat.JSON)
    
    72
    +
    
    73
    +    # --- Public API ---
    
    74
    +
    
    61 75
         def start(self):
    
    62
    -        """Starts the server.
    
    63
    -        """
    
    64
    -        self._server.start()
    
    76
    +        """Starts the BuildGrid server."""
    
    77
    +        self.__grpc_server.start()
    
    78
    +
    
    79
    +        if self._is_instrumented:
    
    80
    +            self.__monitoring_bus.start()
    
    81
    +
    
    82
    +        self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
    
    83
    +
    
    84
    +        self.__main_loop.run_forever()
    
    65 85
     
    
    66 86
         def stop(self, grace=0):
    
    67
    -        """Stops the server.
    
    68
    -        """
    
    69
    -        self._server.stop(grace)
    
    87
    +        """Stops the BuildGrid server."""
    
    88
    +        if self._is_instrumented:
    
    89
    +            self.__monitoring_bus.stop()
    
    90
    +
    
    91
    +        self.__main_loop.stop()
    
    92
    +
    
    93
    +        self.__grpc_server.stop(None)
    
    70 94
     
    
    71 95
         def add_port(self, address, credentials):
    
    72 96
             """Adds a port to the server.
    
    ... ... @@ -77,14 +101,19 @@ class BuildGridServer:
    77 101
             Args:
    
    78 102
                 address (str): The address with port number.
    
    79 103
                 credentials (:obj:`grpc.ChannelCredentials`): Credentials object.
    
    104
    +
    
    105
    +        Returns:
    
    106
    +            int: Number of the bound port.
    
    80 107
             """
    
    81 108
             if credentials is not None:
    
    82 109
                 self.__logger.info("Adding secure connection on: [%s]", address)
    
    83
    -            self._server.add_secure_port(address, credentials)
    
    110
    +            port_number = self.__grpc_server.add_secure_port(address, credentials)
    
    84 111
     
    
    85 112
             else:
    
    86 113
                 self.__logger.info("Adding insecure connection on [%s]", address)
    
    87
    -            self._server.add_insecure_port(address)
    
    114
    +            port_number = self.__grpc_server.add_insecure_port(address)
    
    115
    +
    
    116
    +        return port_number
    
    88 117
     
    
    89 118
         def add_execution_instance(self, instance, instance_name):
    
    90 119
             """Adds an :obj:`ExecutionInstance` to the service.
    
    ... ... @@ -96,7 +125,7 @@ class BuildGridServer:
    96 125
                 instance_name (str): Instance name.
    
    97 126
             """
    
    98 127
             if self._execution_service is None:
    
    99
    -            self._execution_service = ExecutionService(self._server)
    
    128
    +            self._execution_service = ExecutionService(self.__grpc_server)
    
    100 129
     
    
    101 130
             self._execution_service.add_instance(instance_name, instance)
    
    102 131
     
    
    ... ... @@ -110,7 +139,7 @@ class BuildGridServer:
    110 139
                 instance_name (str): Instance name.
    
    111 140
             """
    
    112 141
             if self._bots_service is None:
    
    113
    -            self._bots_service = BotsService(self._server)
    
    142
    +            self._bots_service = BotsService(self.__grpc_server)
    
    114 143
     
    
    115 144
             self._bots_service.add_instance(instance_name, instance)
    
    116 145
     
    
    ... ... @@ -124,7 +153,7 @@ class BuildGridServer:
    124 153
                 instance_name (str): Instance name.
    
    125 154
             """
    
    126 155
             if self._operations_service is None:
    
    127
    -            self._operations_service = OperationsService(self._server)
    
    156
    +            self._operations_service = OperationsService(self.__grpc_server)
    
    128 157
     
    
    129 158
             self._operations_service.add_instance(instance_name, instance)
    
    130 159
     
    
    ... ... @@ -138,7 +167,7 @@ class BuildGridServer:
    138 167
                 instance_name (str): Instance name.
    
    139 168
             """
    
    140 169
             if self._reference_storage_service is None:
    
    141
    -            self._reference_storage_service = ReferenceStorageService(self._server)
    
    170
    +            self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
    
    142 171
     
    
    143 172
             self._reference_storage_service.add_instance(instance_name, instance)
    
    144 173
     
    
    ... ... @@ -152,7 +181,7 @@ class BuildGridServer:
    152 181
                 instance_name (str): Instance name.
    
    153 182
             """
    
    154 183
             if self._action_cache_service is None:
    
    155
    -            self._action_cache_service = ActionCacheService(self._server)
    
    184
    +            self._action_cache_service = ActionCacheService(self.__grpc_server)
    
    156 185
     
    
    157 186
             self._action_cache_service.add_instance(instance_name, instance)
    
    158 187
     
    
    ... ... @@ -166,7 +195,7 @@ class BuildGridServer:
    166 195
                 instance_name (str): Instance name.
    
    167 196
             """
    
    168 197
             if self._cas_service is None:
    
    169
    -            self._cas_service = ContentAddressableStorageService(self._server)
    
    198
    +            self._cas_service = ContentAddressableStorageService(self.__grpc_server)
    
    170 199
     
    
    171 200
             self._cas_service.add_instance(instance_name, instance)
    
    172 201
     
    
    ... ... @@ -180,6 +209,12 @@ class BuildGridServer:
    180 209
                 instance_name (str): Instance name.
    
    181 210
             """
    
    182 211
             if self._bytestream_service is None:
    
    183
    -            self._bytestream_service = ByteStreamService(self._server)
    
    212
    +            self._bytestream_service = ByteStreamService(self.__grpc_server)
    
    184 213
     
    
    185 214
             self._bytestream_service.add_instance(instance_name, instance)
    
    215
    +
    
    216
    +    # --- Public API: Monitoring ---
    
    217
    +
    
    218
    +    @property
    
    219
    +    def is_instrumented(self):
    
    220
    +        return self._is_instrumented

  • tests/cas/test_services.py
    ... ... @@ -137,7 +137,7 @@ def test_bytestream_write(mocked, instance, extra_data):
    137 137
             bytestream_pb2.WriteRequest(data=b'def', write_offset=3, finish_write=True)
    
    138 138
         ]
    
    139 139
     
    
    140
    -    response = servicer.Write(requests, context)
    
    140
    +    response = servicer.Write(iter(requests), context)
    
    141 141
         assert response.committed_size == 6
    
    142 142
         assert len(storage.data) == 1
    
    143 143
         assert (hash_, 6) in storage.data
    
    ... ... @@ -159,7 +159,7 @@ def test_bytestream_write_rejects_wrong_hash(mocked):
    159 159
             bytestream_pb2.WriteRequest(resource_name=resource_name, data=data, finish_write=True)
    
    160 160
         ]
    
    161 161
     
    
    162
    -    servicer.Write(requests, context)
    
    162
    +    servicer.Write(iter(requests), context)
    
    163 163
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    164 164
     
    
    165 165
         assert len(storage.data) is 0
    

  • tests/server_instance.py
    ... ... @@ -13,19 +13,24 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    -from buildgrid._app.settings import parser
    
    17
    -from buildgrid._app.commands.cmd_server import _create_server_from_config
    
    18
    -from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    19
    -from buildgrid.server.actioncache.service import ActionCacheService
    
    20
    -from buildgrid.server.execution.service import ExecutionService
    
    21
    -from buildgrid.server.operations.service import OperationsService
    
    22
    -from buildgrid.server.bots.service import BotsService
    
    23
    -from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    16
    +import grpc
    
    17
    +
    
    18
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    19
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    20
    +from buildgrid._protos.buildstream.v2 import buildstream_pb2
    
    21
    +from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
    
    22
    +from buildgrid._protos.google.bytestream import bytestream_pb2
    
    23
    +from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    24
    +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    25
    +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    26
    +from buildgrid._protos.google.longrunning import operations_pb2
    
    27
    +from buildgrid._protos.google.longrunning import operations_pb2_grpc
    
    24 28
     
    
    25 29
     from .utils.cas import run_in_subprocess
    
    30
    +from .utils.server import serve
    
    26 31
     
    
    27 32
     
    
    28
    -config = """
    
    33
    +CONFIGURATION = """
    
    29 34
     server:
    
    30 35
       - !channel
    
    31 36
         port: 50051
    
    ... ... @@ -72,24 +77,102 @@ instances:
    72 77
     
    
    73 78
     def test_create_server():
    
    74 79
         # Actual test function, to be run in a subprocess:
    
    75
    -    def __test_create_server(queue, config_data):
    
    76
    -        settings = parser.get_parser().safe_load(config)
    
    77
    -        server = _create_server_from_config(settings)
    
    80
    +    def __test_create_server(queue, remote):
    
    81
    +        # Open a channel to the remote server:
    
    82
    +        channel = grpc.insecure_channel(remote)
    
    78 83
     
    
    79
    -        server.start()
    
    80
    -        server.stop()
    
    84
    +        try:
    
    85
    +            stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    86
    +            request = remote_execution_pb2.ExecuteRequest(instance_name='main')
    
    87
    +            response = next(stub.Execute(request))
    
    88
    +
    
    89
    +            assert response.DESCRIPTOR is operations_pb2.Operation.DESCRIPTOR
    
    90
    +
    
    91
    +        except grpc.RpcError as e:
    
    92
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    93
    +                queue.put(False)
    
    94
    +        except AssertionError:
    
    95
    +            queue.put(False)
    
    96
    +
    
    97
    +        try:
    
    98
    +            stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
    
    99
    +            request = remote_execution_pb2.GetActionResultRequest(instance_name='main')
    
    100
    +            response = stub.GetActionResult(request)
    
    101
    +
    
    102
    +            assert response.DESCRIPTOR is remote_execution_pb2.ActionResult.DESCRIPTOR
    
    103
    +
    
    104
    +        except grpc.RpcError as e:
    
    105
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    106
    +                queue.put(False)
    
    107
    +        except AssertionError:
    
    108
    +            queue.put(False)
    
    109
    +
    
    110
    +        try:
    
    111
    +            stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
    
    112
    +            request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name='main')
    
    113
    +            response = stub.BatchUpdateBlobs(request)
    
    114
    +
    
    115
    +            assert response.DESCRIPTOR is remote_execution_pb2.BatchUpdateBlobsResponse.DESCRIPTOR
    
    116
    +
    
    117
    +        except grpc.RpcError as e:
    
    118
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    119
    +                queue.put(False)
    
    120
    +        except AssertionError:
    
    121
    +            queue.put(False)
    
    122
    +
    
    123
    +        try:
    
    124
    +            stub = buildstream_pb2_grpc.ReferenceStorageStub(channel)
    
    125
    +            request = buildstream_pb2.GetReferenceRequest(instance_name='main')
    
    126
    +            response = stub.GetReference(request)
    
    127
    +
    
    128
    +            assert response.DESCRIPTOR is buildstream_pb2.GetReferenceResponse.DESCRIPTOR
    
    129
    +
    
    130
    +        except grpc.RpcError as e:
    
    131
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    132
    +                queue.put(False)
    
    133
    +        except AssertionError:
    
    134
    +            queue.put(False)
    
    81 135
     
    
    82 136
             try:
    
    83
    -            assert isinstance(server._execution_service, ExecutionService)
    
    84
    -            assert isinstance(server._operations_service, OperationsService)
    
    85
    -            assert isinstance(server._bots_service, BotsService)
    
    86
    -            assert isinstance(server._reference_storage_service, ReferenceStorageService)
    
    87
    -            assert isinstance(server._action_cache_service, ActionCacheService)
    
    88
    -            assert isinstance(server._cas_service, ContentAddressableStorageService)
    
    89
    -            assert isinstance(server._bytestream_service, ByteStreamService)
    
    137
    +            stub = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    138
    +            request = bytestream_pb2.ReadRequest()
    
    139
    +            response = stub.Read(request)
    
    140
    +
    
    141
    +            assert next(response).DESCRIPTOR is bytestream_pb2.ReadResponse.DESCRIPTOR
    
    142
    +
    
    143
    +        except grpc.RpcError as e:
    
    144
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    145
    +                queue.put(False)
    
    90 146
             except AssertionError:
    
    91 147
                 queue.put(False)
    
    92
    -        else:
    
    93
    -            queue.put(True)
    
    94 148
     
    
    95
    -    assert run_in_subprocess(__test_create_server, config)
    149
    +        try:
    
    150
    +            stub = operations_pb2_grpc.OperationsStub(channel)
    
    151
    +            request = operations_pb2.ListOperationsRequest(name='main')
    
    152
    +            response = stub.ListOperations(request)
    
    153
    +
    
    154
    +            assert response.DESCRIPTOR is operations_pb2.ListOperationsResponse.DESCRIPTOR
    
    155
    +
    
    156
    +        except grpc.RpcError as e:
    
    157
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    158
    +                queue.put(False)
    
    159
    +        except AssertionError:
    
    160
    +            queue.put(False)
    
    161
    +
    
    162
    +        try:
    
    163
    +            stub = bots_pb2_grpc.BotsStub(channel)
    
    164
    +            request = bots_pb2.CreateBotSessionRequest()
    
    165
    +            response = stub.CreateBotSession(request)
    
    166
    +
    
    167
    +            assert response.DESCRIPTOR is bots_pb2.BotSession.DESCRIPTOR
    
    168
    +
    
    169
    +        except grpc.RpcError as e:
    
    170
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    171
    +                queue.put(False)
    
    172
    +        except AssertionError:
    
    173
    +            queue.put(False)
    
    174
    +
    
    175
    +        queue.put(True)
    
    176
    +
    
    177
    +    with serve(CONFIGURATION) as server:
    
    178
    +        assert run_in_subprocess(__test_create_server, server.remote)

  • tests/utils/server.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +from contextlib import contextmanager
    
    17
    +import multiprocessing
    
    18
    +import signal
    
    19
    +
    
    20
    +import pytest_cov
    
    21
    +
    
    22
    +from buildgrid._app.settings import parser
    
    23
    +from buildgrid.server.instance import BuildGridServer
    
    24
    +
    
    25
    +
    
    26
    +@contextmanager
    
    27
    +def serve(configuration):
    
    28
    +    server = Server(configuration)
    
    29
    +    try:
    
    30
    +        yield server
    
    31
    +    finally:
    
    32
    +        server.quit()
    
    33
    +
    
    34
    +
    
    35
    +class Server:
    
    36
    +
    
    37
    +    def __init__(self, configuration):
    
    38
    +
    
    39
    +        self.configuration = configuration
    
    40
    +
    
    41
    +        self.__queue = multiprocessing.Queue()
    
    42
    +        self.__process = multiprocessing.Process(
    
    43
    +            target=Server.serve,
    
    44
    +            args=(self.__queue, self.configuration))
    
    45
    +        self.__process.start()
    
    46
    +
    
    47
    +        self.port = self.__queue.get()
    
    48
    +        self.remote = 'localhost:{}'.format(self.port)
    
    49
    +
    
    50
    +    @classmethod
    
    51
    +    def serve(cls, queue, configuration):
    
    52
    +        pytest_cov.embed.cleanup_on_sigterm()
    
    53
    +
    
    54
    +        server = BuildGridServer()
    
    55
    +
    
    56
    +        def __signal_handler(signum, frame):
    
    57
    +            server.stop()
    
    58
    +
    
    59
    +        signal.signal(signal.SIGINT, signal.SIG_IGN)
    
    60
    +        signal.signal(signal.SIGTERM, __signal_handler)
    
    61
    +
    
    62
    +        instances = parser.get_parser().safe_load(configuration)['instances']
    
    63
    +        for instance in instances:
    
    64
    +            instance_name = instance['name']
    
    65
    +            services = instance['services']
    
    66
    +            for service in services:
    
    67
    +                service.register_instance_with_server(instance_name, server)
    
    68
    +
    
    69
    +        port = server.add_port('localhost:0', None)
    
    70
    +
    
    71
    +        queue.put(port)
    
    72
    +
    
    73
    +        server.start()
    
    74
    +
    
    75
    +    def quit(self):
    
    76
    +        if self.__process:
    
    77
    +            self.__process.terminate()
    
    78
    +            self.__process.join()



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]