[Notes] [Git][BuildGrid/buildgrid][mablanch/135-monitoring-bus] 2 commits: Handle the server's main event loop internally



Title: GitLab

Martin Blanchard pushed to branch mablanch/135-monitoring-bus at BuildGrid / buildgrid

Commits:

2 changed files:

Changes:

  • buildgrid/_app/commands/cmd_server.py
    ... ... @@ -20,7 +20,6 @@ Server command
    20 20
     Create a BuildGrid server.
    
    21 21
     """
    
    22 22
     
    
    23
    -import asyncio
    
    24 23
     import logging
    
    25 24
     import sys
    
    26 25
     
    
    ... ... @@ -52,18 +51,14 @@ def start(context, config):
    52 51
             click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True)
    
    53 52
             sys.exit(-1)
    
    54 53
     
    
    55
    -    loop = asyncio.get_event_loop()
    
    56 54
         try:
    
    57 55
             server.start()
    
    58
    -        loop.run_forever()
    
    59 56
     
    
    60 57
         except KeyboardInterrupt:
    
    61 58
             pass
    
    62 59
     
    
    63 60
         finally:
    
    64
    -        context.logger.info("Stopping server")
    
    65 61
             server.stop()
    
    66
    -        loop.close()
    
    67 62
     
    
    68 63
     
    
    69 64
     def _create_server_from_config(config):
    

  • buildgrid/server/instance.py
    ... ... @@ -13,18 +13,21 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +import asyncio
    
    16 17
     from concurrent import futures
    
    17 18
     import logging
    
    18 19
     import os
    
    20
    +import time
    
    19 21
     
    
    20 22
     import grpc
    
    21 23
     
    
    22
    -from .cas.service import ByteStreamService, ContentAddressableStorageService
    
    23
    -from .actioncache.service import ActionCacheService
    
    24
    -from .execution.service import ExecutionService
    
    25
    -from .operations.service import OperationsService
    
    26
    -from .bots.service import BotsService
    
    27
    -from .referencestorage.service import ReferenceStorageService
    
    24
    +from buildgrid.server.actioncache.service import ActionCacheService
    
    25
    +from buildgrid.server.bots.service import BotsService
    
    26
    +from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    27
    +from buildgrid.server.execution.service import ExecutionService
    
    28
    +from buildgrid.server._monitoring import MonitoringBus
    
    29
    +from buildgrid.server.operations.service import OperationsService
    
    30
    +from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    28 31
     
    
    29 32
     
    
    30 33
     class BuildGridServer:
    
    ... ... @@ -34,7 +37,7 @@ class BuildGridServer:
    34 37
         requisite services.
    
    35 38
         """
    
    36 39
     
    
    37
    -    def __init__(self, max_workers=None):
    
    40
    +    def __init__(self, max_workers=None, monitor=True):
    
    38 41
             """Initializes a new :class:`BuildGridServer` instance.
    
    39 42
     
    
    40 43
             Args:
    
    ... ... @@ -46,9 +49,11 @@ class BuildGridServer:
    46 49
                 # Use max_workers default from Python 3.5+
    
    47 50
                 max_workers = (os.cpu_count() or 1) * 5
    
    48 51
     
    
    49
    -        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    52
    +        self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
    
    53
    +        self.__grpc_server = grpc.server(self.__grpc_executor)
    
    50 54
     
    
    51
    -        self._server = server
    
    55
    +        self.__main_loop = asyncio.get_event_loop()
    
    56
    +        self.__monitoring_bus = None
    
    52 57
     
    
    53 58
             self._execution_service = None
    
    54 59
             self._bots_service = None
    
    ... ... @@ -58,15 +63,35 @@ class BuildGridServer:
    58 63
             self._cas_service = None
    
    59 64
             self._bytestream_service = None
    
    60 65
     
    
    66
    +        self._is_monitored = monitor
    
    67
    +
    
    68
    +        if self._is_monitored:
    
    69
    +            self.__monitoring_bus = MonitoringBus(self.__main_loop)
    
    70
    +
    
    71
    +    # --- Public API ---
    
    72
    +
    
    61 73
         def start(self):
    
    62
    -        """Starts the server.
    
    74
    +        """Starts the BuildGrid server.
    
    63 75
             """
    
    64
    -        self._server.start()
    
    76
    +        self.__grpc_server.start()
    
    77
    +
    
    78
    +        if self._is_monitored:
    
    79
    +            self.__monitoring_bus.start()
    
    80
    +        self.__main_loop.run_forever()
    
    65 81
     
    
    66 82
         def stop(self, grace=0):
    
    67
    -        """Stops the server.
    
    83
    +        """Stops the BuildGrid server.
    
    84
    +
    
    85
    +        Args:
    
    86
    +            grace (int, optional): A duration of time in seconds. Defaults to 0.
    
    68 87
             """
    
    69
    -        self._server.stop(grace)
    
    88
    +        if self._is_monitored:
    
    89
    +            self.__monitoring_bus.stop()
    
    90
    +
    
    91
    +        self.__grpc_server.stop(grace)
    
    92
    +
    
    93
    +        if grace > 0:
    
    94
    +            time.sleep(grace)
    
    70 95
     
    
    71 96
         def add_port(self, address, credentials):
    
    72 97
             """Adds a port to the server.
    
    ... ... @@ -80,11 +105,11 @@ class BuildGridServer:
    80 105
             """
    
    81 106
             if credentials is not None:
    
    82 107
                 self.__logger.info("Adding secure connection on: [%s]", address)
    
    83
    -            self._server.add_secure_port(address, credentials)
    
    108
    +            self.__grpc_server.add_secure_port(address, credentials)
    
    84 109
     
    
    85 110
             else:
    
    86 111
                 self.__logger.info("Adding insecure connection on [%s]", address)
    
    87
    -            self._server.add_insecure_port(address)
    
    112
    +            self.__grpc_server.add_insecure_port(address)
    
    88 113
     
    
    89 114
         def add_execution_instance(self, instance, instance_name):
    
    90 115
             """Adds an :obj:`ExecutionInstance` to the service.
    
    ... ... @@ -96,7 +121,7 @@ class BuildGridServer:
    96 121
                 instance_name (str): Instance name.
    
    97 122
             """
    
    98 123
             if self._execution_service is None:
    
    99
    -            self._execution_service = ExecutionService(self._server)
    
    124
    +            self._execution_service = ExecutionService(self.__grpc_server)
    
    100 125
     
    
    101 126
             self._execution_service.add_instance(instance_name, instance)
    
    102 127
     
    
    ... ... @@ -110,7 +135,7 @@ class BuildGridServer:
    110 135
                 instance_name (str): Instance name.
    
    111 136
             """
    
    112 137
             if self._bots_service is None:
    
    113
    -            self._bots_service = BotsService(self._server)
    
    138
    +            self._bots_service = BotsService(self.__grpc_server)
    
    114 139
     
    
    115 140
             self._bots_service.add_instance(instance_name, instance)
    
    116 141
     
    
    ... ... @@ -124,7 +149,7 @@ class BuildGridServer:
    124 149
                 instance_name (str): Instance name.
    
    125 150
             """
    
    126 151
             if self._operations_service is None:
    
    127
    -            self._operations_service = OperationsService(self._server)
    
    152
    +            self._operations_service = OperationsService(self.__grpc_server)
    
    128 153
     
    
    129 154
             self._operations_service.add_instance(instance_name, instance)
    
    130 155
     
    
    ... ... @@ -138,7 +163,7 @@ class BuildGridServer:
    138 163
                 instance_name (str): Instance name.
    
    139 164
             """
    
    140 165
             if self._reference_storage_service is None:
    
    141
    -            self._reference_storage_service = ReferenceStorageService(self._server)
    
    166
    +            self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
    
    142 167
     
    
    143 168
             self._reference_storage_service.add_instance(instance_name, instance)
    
    144 169
     
    
    ... ... @@ -152,7 +177,7 @@ class BuildGridServer:
    152 177
                 instance_name (str): Instance name.
    
    153 178
             """
    
    154 179
             if self._action_cache_service is None:
    
    155
    -            self._action_cache_service = ActionCacheService(self._server)
    
    180
    +            self._action_cache_service = ActionCacheService(self.__grpc_server)
    
    156 181
     
    
    157 182
             self._action_cache_service.add_instance(instance_name, instance)
    
    158 183
     
    
    ... ... @@ -166,7 +191,7 @@ class BuildGridServer:
    166 191
                 instance_name (str): Instance name.
    
    167 192
             """
    
    168 193
             if self._cas_service is None:
    
    169
    -            self._cas_service = ContentAddressableStorageService(self._server)
    
    194
    +            self._cas_service = ContentAddressableStorageService(self.__grpc_server)
    
    170 195
     
    
    171 196
             self._cas_service.add_instance(instance_name, instance)
    
    172 197
     
    
    ... ... @@ -180,6 +205,12 @@ class BuildGridServer:
    180 205
                 instance_name (str): Instance name.
    
    181 206
             """
    
    182 207
             if self._bytestream_service is None:
    
    183
    -            self._bytestream_service = ByteStreamService(self._server)
    
    208
    +            self._bytestream_service = ByteStreamService(self.__grpc_server)
    
    184 209
     
    
    185 210
             self._bytestream_service.add_instance(instance_name, instance)
    
    211
    +
    
    212
    +    # --- Public API: Monitoring ---
    
    213
    +
    
    214
    +    @property
    
    215
    +    def is_monitored(self):
    
    216
    +        return self._is_monitored



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]