[Notes] [Git][BuildGrid/buildgrid][mablanch/132-gather-state-metrics] server/instance.py: Run a monitoring task periodically



Title: GitLab

Martin Blanchard pushed to branch mablanch/132-gather-state-metrics at BuildGrid / buildgrid

Commits:

1 changed file:

Changes:

  • buildgrid/server/instance.py
    ... ... @@ -15,12 +15,15 @@
    15 15
     
    
    16 16
     import asyncio
    
    17 17
     from concurrent import futures
    
    18
    +from datetime import timedelta
    
    18 19
     import logging
    
    19 20
     import os
    
    20 21
     import time
    
    21 22
     
    
    22 23
     import grpc
    
    23 24
     
    
    25
    +from buildgrid._enums import MetricRecordDomain, MetricRecordType
    
    26
    +from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    24 27
     from buildgrid.server.actioncache.service import ActionCacheService
    
    25 28
     from buildgrid.server.bots.service import BotsService
    
    26 29
     from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    ... ... @@ -28,6 +31,7 @@ from buildgrid.server.execution.service import ExecutionService
    28 31
     from buildgrid.server._monitoring import MonitoringBus
    
    29 32
     from buildgrid.server.operations.service import OperationsService
    
    30 33
     from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    34
    +from buildgrid.settings import MONITORING_PERIOD
    
    31 35
     
    
    32 36
     
    
    33 37
     class BuildGridServer:
    
    ... ... @@ -55,6 +59,8 @@ class BuildGridServer:
    55 59
             self.__main_loop = asyncio.get_event_loop()
    
    56 60
             self.__monitoring_bus = None
    
    57 61
     
    
    62
    +        self.__monitoring_task = None
    
    63
    +
    
    58 64
             self._execution_service = None
    
    59 65
             self._bots_service = None
    
    60 66
             self._operations_service = None
    
    ... ... @@ -64,6 +70,7 @@ class BuildGridServer:
    64 70
             self._bytestream_service = None
    
    65 71
     
    
    66 72
             self._is_monitored = monitor
    
    73
    +        self._instances = set()
    
    67 74
     
    
    68 75
             if self._is_monitored:
    
    69 76
                 self.__monitoring_bus = MonitoringBus(self.__main_loop)
    
    ... ... @@ -77,6 +84,10 @@ class BuildGridServer:
    77 84
     
    
    78 85
             if self._is_monitored:
    
    79 86
                 self.__monitoring_bus.start()
    
    87
    +
    
    88
    +            self.__monitoring_task = asyncio.ensure_future(
    
    89
    +                self._monitoring_worker(period=MONITORING_PERIOD), loop=self.__main_loop)
    
    90
    +
    
    80 91
             self.__main_loop.run_forever()
    
    81 92
     
    
    82 93
         def stop(self, grace=0):
    
    ... ... @@ -85,6 +96,9 @@ class BuildGridServer:
    85 96
             Args:
    
    86 97
                 grace (int, optional): A duration of time in seconds. Defaults to 0.
    
    87 98
             """
    
    99
    +        if self.__monitoring_task is not None:
    
    100
    +            self.__monitoring_task.cancel()
    
    101
    +
    
    88 102
             if self._is_monitored:
    
    89 103
                 self.__monitoring_bus.stop()
    
    90 104
     
    
    ... ... @@ -122,9 +136,10 @@ class BuildGridServer:
    122 136
             """
    
    123 137
             if self._execution_service is None:
    
    124 138
                 self._execution_service = ExecutionService(self.__grpc_server)
    
    125
    -
    
    126 139
             self._execution_service.add_instance(instance_name, instance)
    
    127 140
     
    
    141
    +        self._instances.add(instance_name)
    
    142
    +
    
    128 143
         def add_bots_interface(self, instance, instance_name):
    
    129 144
             """Adds a :obj:`BotsInterface` to the service.
    
    130 145
     
    
    ... ... @@ -136,9 +151,10 @@ class BuildGridServer:
    136 151
             """
    
    137 152
             if self._bots_service is None:
    
    138 153
                 self._bots_service = BotsService(self.__grpc_server)
    
    139
    -
    
    140 154
             self._bots_service.add_instance(instance_name, instance)
    
    141 155
     
    
    156
    +        self._instances.add(instance_name)
    
    157
    +
    
    142 158
         def add_operations_instance(self, instance, instance_name):
    
    143 159
             """Adds an :obj:`OperationsInstance` to the service.
    
    144 160
     
    
    ... ... @@ -150,7 +166,6 @@ class BuildGridServer:
    150 166
             """
    
    151 167
             if self._operations_service is None:
    
    152 168
                 self._operations_service = OperationsService(self.__grpc_server)
    
    153
    -
    
    154 169
             self._operations_service.add_instance(instance_name, instance)
    
    155 170
     
    
    156 171
         def add_reference_storage_instance(self, instance, instance_name):
    
    ... ... @@ -164,7 +179,6 @@ class BuildGridServer:
    164 179
             """
    
    165 180
             if self._reference_storage_service is None:
    
    166 181
                 self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
    
    167
    -
    
    168 182
             self._reference_storage_service.add_instance(instance_name, instance)
    
    169 183
     
    
    170 184
         def add_action_cache_instance(self, instance, instance_name):
    
    ... ... @@ -178,7 +192,6 @@ class BuildGridServer:
    178 192
             """
    
    179 193
             if self._action_cache_service is None:
    
    180 194
                 self._action_cache_service = ActionCacheService(self.__grpc_server)
    
    181
    -
    
    182 195
             self._action_cache_service.add_instance(instance_name, instance)
    
    183 196
     
    
    184 197
         def add_cas_instance(self, instance, instance_name):
    
    ... ... @@ -192,7 +205,6 @@ class BuildGridServer:
    192 205
             """
    
    193 206
             if self._cas_service is None:
    
    194 207
                 self._cas_service = ContentAddressableStorageService(self.__grpc_server)
    
    195
    -
    
    196 208
             self._cas_service.add_instance(instance_name, instance)
    
    197 209
     
    
    198 210
         def add_bytestream_instance(self, instance, instance_name):
    
    ... ... @@ -206,7 +218,6 @@ class BuildGridServer:
    206 218
             """
    
    207 219
             if self._bytestream_service is None:
    
    208 220
                 self._bytestream_service = ByteStreamService(self.__grpc_server)
    
    209
    -
    
    210 221
             self._bytestream_service.add_instance(instance_name, instance)
    
    211 222
     
    
    212 223
         # --- Public API: Monitoring ---
    
    ... ... @@ -214,3 +225,147 @@ class BuildGridServer:
    214 225
         @property
    
    215 226
         def is_monitored(self):
    
    216 227
             return self._is_monitored
    
    228
    +
    
    229
    +    # --- Private API ---
    
    230
    +
    
    231
    +    async def _monitoring_worker(self, period=1.0):
    
    232
    +        """Periodically publishes state metrics to the monitoring bus."""
    
    233
    +        async def __monitoring_worker():
    
    234
    +            # Emit total clients count record:
    
    235
    +            _, record = self._query_n_clients()
    
    236
    +            await self.__monitoring_bus.publish_record(record)
    
    237
    +
    
    238
    +            # Emit total bots count record:
    
    239
    +            _, record = self._query_n_bots()
    
    240
    +            await self.__monitoring_bus.publish_record(record)
    
    241
    +
    
    242
    +            queue_times = []
    
    243
    +            # Emits records by instance:
    
    244
    +            for instance_name in self._instances:
    
    245
    +                # Emit instance clients count record:
    
    246
    +                _, record = self._query_n_clients_for_instance(instance_name)
    
    247
    +                await self.__monitoring_bus.publish_record(record)
    
    248
    +
    
    249
    +                # Emit instance bots count record:
    
    250
    +                _, record = self._query_n_bots_for_instance(instance_name)
    
    251
    +                await self.__monitoring_bus.publish_record(record)
    
    252
    +
    
    253
    +                # Emit instance average queue time record:
    
    254
    +                queue_time, record = self._query_am_queue_time_for_instance(instance_name)
    
    255
    +                await self.__monitoring_bus.publish_record(record)
    
    256
    +                if queue_time:
    
    257
    +                    queue_times.append(queue_time)
    
    258
    +
    
    259
    +            # Emit overall average queue time record:
    
    260
    +            record = monitoring_pb2.MetricRecord()
    
    261
    +            if len(queue_times) > 0:
    
    262
    +                am_queue_time = sum(queue_times, timedelta()) / len(queue_times)
    
    263
    +            else:
    
    264
    +                am_queue_time = timedelta()
    
    265
    +
    
    266
    +            record.creation_timestamp.GetCurrentTime()
    
    267
    +            record.domain = MetricRecordDomain.STATE.value
    
    268
    +            record.type = MetricRecordType.TIMER.value
    
    269
    +            record.name = 'average-queue-time'
    
    270
    +            record.duration.FromTimedelta(am_queue_time)
    
    271
    +
    
    272
    +            await self.__monitoring_bus.publish_record(record)
    
    273
    +
    
    274
    +            print('---')
    
    275
    +            n_clients = self._execution_service.query_n_clients()
    
    276
    +            n_bots = self._bots_service.query_n_bots()
    
    277
    +            print('Totals: n_clients={}, n_bots={}, am_queue_time={}'
    
    278
    +                  .format(n_clients, n_bots, am_queue_time))
    
    279
    +            print('Per instances:')
    
    280
    +            for instance_name in self._instances:
    
    281
    +                n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
    
    282
    +                n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
    
    283
    +                am_queue_time = self._execution_service.get_scheduler(instance_name).query_am_queue_time()
    
    284
    +                instance_name = instance_name or 'empty'
    
    285
    +                print(' - {}: n_clients={}, n_bots={}, am_queue_time={}'
    
    286
    +                      .format(instance_name, n_clients, n_bots, am_queue_time))
    
    287
    +            print('---')
    
    288
    +
    
    289
    +        try:
    
    290
    +            while True:
    
    291
    +                start = time.time()
    
    292
    +                await __monitoring_worker()
    
    293
    +
    
    294
    +                end = time.time()
    
    295
    +                await asyncio.sleep(period - (end - start))
    
    296
    +
    
    297
    +        except asyncio.CancelledError:
    
    298
    +            pass
    
    299
    +        except BaseException as e:
    
    300
    +             print(f'__monitoring_worker: {e}')
    
    301
    +
    
    302
    +    # --- Private API: Monitoring ---
    
    303
    +
    
    304
    +    def _query_n_clients(self):
    
    305
    +        """Queries the number of clients connected."""
    
    306
    +        record = monitoring_pb2.MetricRecord()
    
    307
    +        n_clients = self._execution_service.query_n_clients()
    
    308
    +
    
    309
    +        record.creation_timestamp.GetCurrentTime()
    
    310
    +        record.domain = MetricRecordDomain.STATE.value
    
    311
    +        record.type = MetricRecordType.COUNTER.value
    
    312
    +        record.name = 'clients-count'
    
    313
    +        record.count = n_clients
    
    314
    +
    
    315
    +        return n_clients, record
    
    316
    +
    
    317
    +    def _query_n_clients_for_instance(self, instance_name):
    
    318
    +        """Queries the number of clients connected for a given instance"""
    
    319
    +        record = monitoring_pb2.MetricRecord()
    
    320
    +        n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
    
    321
    +
    
    322
    +        record.creation_timestamp.GetCurrentTime()
    
    323
    +        record.domain = MetricRecordDomain.STATE.value
    
    324
    +        record.type = MetricRecordType.COUNTER.value
    
    325
    +        record.name = 'clients-count'
    
    326
    +        record.count = n_clients
    
    327
    +        record.extra['instance-name'] = instance_name or 'void'
    
    328
    +
    
    329
    +        return n_clients, record
    
    330
    +
    
    331
    +    def _query_n_bots(self):
    
    332
    +        """Queries the number of bots connected."""
    
    333
    +        record = monitoring_pb2.MetricRecord()
    
    334
    +        n_bots = self._bots_service.query_n_bots()
    
    335
    +
    
    336
    +        record.creation_timestamp.GetCurrentTime()
    
    337
    +        record.domain = MetricRecordDomain.STATE.value
    
    338
    +        record.type = MetricRecordType.COUNTER.value
    
    339
    +        record.name = 'bots-count'
    
    340
    +        record.count = n_bots
    
    341
    +
    
    342
    +        return n_bots, record
    
    343
    +
    
    344
    +    def _query_n_bots_for_instance(self, instance_name):
    
    345
    +        """Queries the number of bots connected for a given instance."""
    
    346
    +        record = monitoring_pb2.MetricRecord()
    
    347
    +        n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
    
    348
    +
    
    349
    +        record.creation_timestamp.GetCurrentTime()
    
    350
    +        record.domain = MetricRecordDomain.STATE.value
    
    351
    +        record.type = MetricRecordType.COUNTER.value
    
    352
    +        record.name = 'bots-count'
    
    353
    +        record.count = n_bots
    
    354
    +        record.extra['instance-name'] = instance_name or 'void'
    
    355
    +
    
    356
    +        return n_bots, record
    
    357
    +
    
    358
    +    def _query_am_queue_time_for_instance(self, instance_name):
    
    359
    +        """Queries the average job's queue time for a given instance."""
    
    360
    +        record = monitoring_pb2.MetricRecord()
    
    361
    +        instance_scheduler = self._execution_service.get_scheduler(instance_name)
    
    362
    +        am_queue_time = instance_scheduler.query_am_queue_time()
    
    363
    +
    
    364
    +        record.creation_timestamp.GetCurrentTime()
    
    365
    +        record.domain = MetricRecordDomain.STATE.value
    
    366
    +        record.type = MetricRecordType.TIMER.value
    
    367
    +        record.name = 'average-queue-time'
    
    368
    +        record.duration.FromTimedelta(am_queue_time)
    
    369
    +        record.extra['instance-name'] = instance_name or 'void'
    
    370
    +
    
    371
    +        return am_queue_time, record



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]