[Notes] [Git][BuildGrid/buildgrid][mablanch/23-new-logging] server/instance.py: Setup a logging task at startup



Title: GitLab

Martin Blanchard pushed to branch mablanch/23-new-logging at BuildGrid / buildgrid

Commits:

1 changed file:

Changes:

  • buildgrid/server/instance.py
    ... ... @@ -15,15 +15,18 @@
    15 15
     
    
    16 16
     import asyncio
    
    17 17
     from concurrent import futures
    
    18
    -from datetime import timedelta
    
    18
    +from datetime import datetime, timedelta
    
    19 19
     import logging
    
    20
    +import logging.handlers
    
    20 21
     import os
    
    21 22
     import signal
    
    23
    +import sys
    
    22 24
     import time
    
    23 25
     
    
    24 26
     import grpc
    
    27
    +import janus
    
    25 28
     
    
    26
    -from buildgrid._enums import MetricRecordDomain, MetricRecordType
    
    29
    +from buildgrid._enums import LogRecordLevel, MetricRecordDomain, MetricRecordType
    
    27 30
     from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    28 31
     from buildgrid.server.actioncache.service import ActionCacheService
    
    29 32
     from buildgrid.server.bots.service import BotsService
    
    ... ... @@ -32,7 +35,7 @@ from buildgrid.server.execution.service import ExecutionService
    32 35
     from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
    
    33 36
     from buildgrid.server.operations.service import OperationsService
    
    34 37
     from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    35
    -from buildgrid.settings import MONITORING_PERIOD
    
    38
    +from buildgrid.settings import LOG_RECORD_FORMAT, MONITORING_PERIOD
    
    36 39
     
    
    37 40
     
    
    38 41
     class BuildGridServer:
    
    ... ... @@ -58,9 +61,16 @@ class BuildGridServer:
    58 61
             self.__grpc_server = grpc.server(self.__grpc_executor)
    
    59 62
     
    
    60 63
             self.__main_loop = asyncio.get_event_loop()
    
    64
    +
    
    61 65
             self.__monitoring_bus = None
    
    62 66
     
    
    67
    +        self.__logging_queue = janus.Queue(loop=self.__main_loop)
    
    68
    +        self.__logging_handler = logging.handlers.QueueHandler(self.__logging_queue.sync_q)
    
    69
    +        self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT)
    
    70
    +        self.__print_log_records = True
    
    71
    +
    
    63 72
             self.__state_monitoring_task = None
    
    73
    +        self.__logging_task = None
    
    64 74
     
    
    65 75
             self._execution_service = None
    
    66 76
             self._bots_service = None
    
    ... ... @@ -80,6 +90,17 @@ class BuildGridServer:
    80 90
                     self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
    
    81 91
                     serialisation_format=MonitoringOutputFormat.JSON)
    
    82 92
     
    
    93
    +        # Setup the main logging handler:
    
    94
    +        root_logger = logging.getLogger()
    
    95
    +
    
    96
    +        for log_filter in root_logger.filters[:]:
    
    97
    +            self.__logging_handler.addFilter(log_filter)
    
    98
    +            root_logger.removeFilter(log_filter)
    
    99
    +
    
    100
    +        for log_handler in root_logger.handlers[:]:
    
    101
    +            root_logger.removeHandler(log_handler)
    
    102
    +        root_logger.addHandler(self.__logging_handler)
    
    103
    +
    
    83 104
         # --- Public API ---
    
    84 105
     
    
    85 106
         def start(self):
    
    ... ... @@ -93,6 +114,9 @@ class BuildGridServer:
    93 114
                     self._state_monitoring_worker(period=MONITORING_PERIOD),
    
    94 115
                     loop=self.__main_loop)
    
    95 116
     
    
    117
    +        self.__logging_task = asyncio.ensure_future(
    
    118
    +            self._logging_worker(), loop=self.__main_loop)
    
    119
    +
    
    96 120
             self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
    
    97 121
     
    
    98 122
             self.__main_loop.run_forever()
    
    ... ... @@ -105,6 +129,9 @@ class BuildGridServer:
    105 129
     
    
    106 130
                 self.__monitoring_bus.stop()
    
    107 131
     
    
    132
    +        if self.__logging_task is not None:
    
    133
    +            self.__logging_task.cancel()
    
    134
    +
    
    108 135
             self.__main_loop.stop()
    
    109 136
     
    
    110 137
             self.__grpc_server.stop(None)
    
    ... ... @@ -245,6 +272,53 @@ class BuildGridServer:
    245 272
     
    
    246 273
         # --- Private API ---
    
    247 274
     
    
    275
    +    async def _logging_worker(self):
    
    276
    +        """Publishes log records to the monitoring bus."""
    
    277
    +        async def __logging_worker():
    
    278
    +            log_record = await self.__logging_queue.async_q.get()
    
    279
    +
    
    280
    +            # Print log records to stdout, if required:
    
    281
    +            if self.__print_log_records:
    
    282
    +                record = self.__logging_formatter.format(log_record)
    
    283
    +
    
    284
    +                # TODO: Investigate if async write would be worth here.
    
    285
    +                sys.stdout.write('{}\n'.format(record))
    
    286
    +                sys.stdout.flush()
    
    287
    +
    
    288
    +            # Emit a log record if server is instrumented:
    
    289
    +            if self._is_instrumented:
    
    290
    +                log_record_level = LogRecordLevel(int(log_record.levelno / 10))
    
    291
    +                log_record_creation_time = datetime.fromtimestamp(log_record.created)
    
    292
    +                # logging.LogRecord.extra must be a str to str dict:
    
    293
    +                if 'extra' in log_record.__dict__ and log_record.extra:
    
    294
    +                    log_record_metadata = log_record.extra
    
    295
    +                else:
    
    296
    +                    log_record_metadata = None
    
    297
    +                record = self._forge_log_record(
    
    298
    +                    log_record.name, log_record_level, log_record.message,
    
    299
    +                    log_record_creation_time, metadata=log_record_metadata)
    
    300
    +
    
    301
    +                await self.__monitoring_bus.send_record(record)
    
    302
    +
    
    303
    +        try:
    
    304
    +            while True:
    
    305
    +                await __logging_worker()
    
    306
    +
    
    307
    +        except asyncio.CancelledError:
    
    308
    +            pass
    
    309
    +
    
    310
    +    def _forge_log_record(self, domain, level, message, creation_time, metadata=None):
    
    311
    +        log_record = monitoring_pb2.LogRecord()
    
    312
    +
    
    313
    +        log_record.creation_timestamp.FromDatetime(creation_time)
    
    314
    +        log_record.domain = domain
    
    315
    +        log_record.level = level.value
    
    316
    +        log_record.message = message
    
    317
    +        if metadata is not None:
    
    318
    +            log_record.metadata.update(metadata)
    
    319
    +
    
    320
    +        return log_record
    
    321
    +
    
    248 322
         async def _state_monitoring_worker(self, period=1.0):
    
    249 323
             """Periodically publishes state metrics to the monitoring bus."""
    
    250 324
             async def __state_monitoring_worker():
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]