... |
... |
@@ -15,15 +15,18 @@ |
15
|
15
|
|
16
|
16
|
import asyncio
|
17
|
17
|
from concurrent import futures
|
18
|
|
-from datetime import timedelta
|
|
18
|
+from datetime import datetime, timedelta
|
19
|
19
|
import logging
|
|
20
|
+import logging.handlers
|
20
|
21
|
import os
|
21
|
22
|
import signal
|
|
23
|
+import sys
|
22
|
24
|
import time
|
23
|
25
|
|
24
|
26
|
import grpc
|
|
27
|
+import janus
|
25
|
28
|
|
26
|
|
-from buildgrid._enums import MetricRecordDomain, MetricRecordType
|
|
29
|
+from buildgrid._enums import LogRecordLevel, MetricRecordDomain, MetricRecordType
|
27
|
30
|
from buildgrid._protos.buildgrid.v2 import monitoring_pb2
|
28
|
31
|
from buildgrid.server.actioncache.service import ActionCacheService
|
29
|
32
|
from buildgrid.server.bots.service import BotsService
|
... |
... |
@@ -32,7 +35,7 @@ from buildgrid.server.execution.service import ExecutionService |
32
|
35
|
from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
|
33
|
36
|
from buildgrid.server.operations.service import OperationsService
|
34
|
37
|
from buildgrid.server.referencestorage.service import ReferenceStorageService
|
35
|
|
-from buildgrid.settings import MONITORING_PERIOD
|
|
38
|
+from buildgrid.settings import LOG_RECORD_FORMAT, MONITORING_PERIOD
|
36
|
39
|
|
37
|
40
|
|
38
|
41
|
class BuildGridServer:
|
... |
... |
@@ -58,9 +61,16 @@ class BuildGridServer: |
58
|
61
|
self.__grpc_server = grpc.server(self.__grpc_executor)
|
59
|
62
|
|
60
|
63
|
self.__main_loop = asyncio.get_event_loop()
|
|
64
|
+
|
61
|
65
|
self.__monitoring_bus = None
|
62
|
66
|
|
|
67
|
+ self.__logging_queue = janus.Queue(loop=self.__main_loop)
|
|
68
|
+ self.__logging_handler = logging.handlers.QueueHandler(self.__logging_queue.sync_q)
|
|
69
|
+ self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT)
|
|
70
|
+ self.__print_log_records = True
|
|
71
|
+
|
63
|
72
|
self.__state_monitoring_task = None
|
|
73
|
+ self.__logging_task = None
|
64
|
74
|
|
65
|
75
|
self._execution_service = None
|
66
|
76
|
self._bots_service = None
|
... |
... |
@@ -80,6 +90,17 @@ class BuildGridServer: |
80
|
90
|
self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
|
81
|
91
|
serialisation_format=MonitoringOutputFormat.JSON)
|
82
|
92
|
|
|
93
|
+ # Setup the main logging handler:
|
|
94
|
+ root_logger = logging.getLogger()
|
|
95
|
+
|
|
96
|
+ for log_filter in root_logger.filters[:]:
|
|
97
|
+ self.__logging_handler.addFilter(log_filter)
|
|
98
|
+ root_logger.removeFilter(log_filter)
|
|
99
|
+
|
|
100
|
+ for log_handler in root_logger.handlers[:]:
|
|
101
|
+ root_logger.removeHandler(log_handler)
|
|
102
|
+ root_logger.addHandler(self.__logging_handler)
|
|
103
|
+
|
83
|
104
|
# --- Public API ---
|
84
|
105
|
|
85
|
106
|
def start(self):
|
... |
... |
@@ -93,6 +114,9 @@ class BuildGridServer: |
93
|
114
|
self._state_monitoring_worker(period=MONITORING_PERIOD),
|
94
|
115
|
loop=self.__main_loop)
|
95
|
116
|
|
|
117
|
+ self.__logging_task = asyncio.ensure_future(
|
|
118
|
+ self._logging_worker(), loop=self.__main_loop)
|
|
119
|
+
|
96
|
120
|
self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
|
97
|
121
|
|
98
|
122
|
self.__main_loop.run_forever()
|
... |
... |
@@ -105,6 +129,9 @@ class BuildGridServer: |
105
|
129
|
|
106
|
130
|
self.__monitoring_bus.stop()
|
107
|
131
|
|
|
132
|
+ if self.__logging_task is not None:
|
|
133
|
+ self.__logging_task.cancel()
|
|
134
|
+
|
108
|
135
|
self.__main_loop.stop()
|
109
|
136
|
|
110
|
137
|
self.__grpc_server.stop(None)
|
... |
... |
@@ -245,6 +272,53 @@ class BuildGridServer: |
245
|
272
|
|
246
|
273
|
# --- Private API ---
|
247
|
274
|
|
|
275
|
+ async def _logging_worker(self):
|
|
276
|
+ """Publishes log records to the monitoring bus."""
|
|
277
|
+ async def __logging_worker():
|
|
278
|
+ log_record = await self.__logging_queue.async_q.get()
|
|
279
|
+
|
|
280
|
+ # Print log records to stdout, if required:
|
|
281
|
+ if self.__print_log_records:
|
|
282
|
+ record = self.__logging_formatter.format(log_record)
|
|
283
|
+
|
|
284
|
+ # TODO: Investigate if async write would be worth here.
|
|
285
|
+ sys.stdout.write('{}\n'.format(record))
|
|
286
|
+ sys.stdout.flush()
|
|
287
|
+
|
|
288
|
+ # Emit a log record if server is instrumented:
|
|
289
|
+ if self._is_instrumented:
|
|
290
|
+ log_record_level = LogRecordLevel(int(log_record.levelno / 10))
|
|
291
|
+ log_record_creation_time = datetime.fromtimestamp(log_record.created)
|
|
292
|
+ # logging.LogRecord.extra must be a str to str dict:
|
|
293
|
+ if 'extra' in log_record.__dict__ and log_record.extra:
|
|
294
|
+ log_record_metadata = log_record.extra
|
|
295
|
+ else:
|
|
296
|
+ log_record_metadata = None
|
|
297
|
+ record = self._forge_log_record(
|
|
298
|
+ log_record.name, log_record_level, log_record.message,
|
|
299
|
+ log_record_creation_time, metadata=log_record_metadata)
|
|
300
|
+
|
|
301
|
+ await self.__monitoring_bus.send_record(record)
|
|
302
|
+
|
|
303
|
+ try:
|
|
304
|
+ while True:
|
|
305
|
+ await __logging_worker()
|
|
306
|
+
|
|
307
|
+ except asyncio.CancelledError:
|
|
308
|
+ pass
|
|
309
|
+
|
|
310
|
+ def _forge_log_record(self, domain, level, message, creation_time, metadata=None):
|
|
311
|
+ log_record = monitoring_pb2.LogRecord()
|
|
312
|
+
|
|
313
|
+ log_record.creation_timestamp.FromDatetime(creation_time)
|
|
314
|
+ log_record.domain = domain
|
|
315
|
+ log_record.level = level.value
|
|
316
|
+ log_record.message = message
|
|
317
|
+ if metadata is not None:
|
|
318
|
+ log_record.metadata.update(metadata)
|
|
319
|
+
|
|
320
|
+ return log_record
|
|
321
|
+
|
248
|
322
|
async def _state_monitoring_worker(self, period=1.0):
|
249
|
323
|
"""Periodically publishes state metrics to the monitoring bus."""
|
250
|
324
|
async def __state_monitoring_worker():
|