[Notes] [Git][BuildGrid/buildgrid][mablanch/23-new-logging] 7 commits: Implementation of the getTree method.



Title: GitLab

Martin Blanchard pushed to branch mablanch/23-new-logging at BuildGrid / buildgrid

Commits:

10 changed files:

Changes:

  • .gitlab-ci.yml
    ... ... @@ -2,7 +2,7 @@
    2 2
     image: python:3.5-stretch
    
    3 3
     
    
    4 4
     variables:
    
    5
    -  BGD: bgd --verbose
    
    5
    +  BGD: bgd
    
    6 6
     
    
    7 7
     stages:
    
    8 8
       - test
    

  • buildgrid/_app/cli.py
    ... ... @@ -23,10 +23,12 @@ will be attempted to be imported.
    23 23
     
    
    24 24
     import logging
    
    25 25
     import os
    
    26
    +import sys
    
    26 27
     
    
    27 28
     import click
    
    28 29
     import grpc
    
    29 30
     
    
    31
    +from buildgrid.settings import LOG_RECORD_FORMAT
    
    30 32
     from buildgrid.utils import read_file
    
    31 33
     
    
    32 34
     CONTEXT_SETTINGS = dict(auto_envvar_prefix='BUILDGRID')
    
    ... ... @@ -138,28 +140,71 @@ class BuildGridCLI(click.MultiCommand):
    138 140
             return mod.cli
    
    139 141
     
    
    140 142
     
    
    143
    +class DebugFilter(logging.Filter):
    
    144
    +
    
    145
    +    def __init__(self, debug_domains, name=''):
    
    146
    +        super().__init__(name=name)
    
    147
    +        self.__domains_tree = {}
    
    148
    +
    
    149
    +        for domain in debug_domains.split(':'):
    
    150
    +            domains_tree = self.__domains_tree
    
    151
    +            for label in domain.split('.'):
    
    152
    +                if all(key not in domains_tree for key in [label, '*']):
    
    153
    +                    domains_tree[label] = {}
    
    154
    +                domains_tree = domains_tree[label]
    
    155
    +
    
    156
    +    def filter(self, record):
    
    157
    +        domains_tree, last_match = self.__domains_tree, None
    
    158
    +        for label in record.name.split('.'):
    
    159
    +            if all(key not in domains_tree for key in [label, '*']):
    
    160
    +                return False
    
    161
    +            last_match = label if label in domains_tree else '*'
    
    162
    +            domains_tree = domains_tree[last_match]
    
    163
    +        if domains_tree and '*' not in domains_tree:
    
    164
    +            return False
    
    165
    +        return True
    
    166
    +
    
    167
    +
    
    168
    +def setup_logging(verbosity=0, debug_mode=False):
    
    169
    +    """Deals with loggers verbosity"""
    
    170
    +    asyncio_logger = logging.getLogger('asyncio')
    
    171
    +    root_logger = logging.getLogger()
    
    172
    +
    
    173
    +    log_handler = logging.StreamHandler(stream=sys.stdout)
    
    174
    +    for log_filter in root_logger.filters:
    
    175
    +        log_handler.addFilter(log_filter)
    
    176
    +
    
    177
    +    logging.basicConfig(format=LOG_RECORD_FORMAT, handlers=[log_handler])
    
    178
    +
    
    179
    +    if verbosity == 1:
    
    180
    +        root_logger.setLevel(logging.WARNING)
    
    181
    +    elif verbosity == 2:
    
    182
    +        root_logger.setLevel(logging.INFO)
    
    183
    +    elif verbosity >= 3:
    
    184
    +        root_logger.setLevel(logging.DEBUG)
    
    185
    +    else:
    
    186
    +        root_logger.setLevel(logging.ERROR)
    
    187
    +
    
    188
    +    if not debug_mode:
    
    189
    +        asyncio_logger.setLevel(logging.CRITICAL)
    
    190
    +    else:
    
    191
    +        asyncio_logger.setLevel(logging.DEBUG)
    
    192
    +        root_logger.setLevel(logging.DEBUG)
    
    193
    +
    
    194
    +
    
    141 195
     @click.command(cls=BuildGridCLI, context_settings=CONTEXT_SETTINGS)
    
    142
    -@click.option('-v', '--verbose', count=True,
    
    143
    -              help='Increase log verbosity level.')
    
    144 196
     @pass_context
    
    145
    -def cli(context, verbose):
    
    197
    +def cli(context):
    
    146 198
         """BuildGrid App"""
    
    147
    -    logger = logging.getLogger()
    
    199
    +    root_logger = logging.getLogger()
    
    148 200
     
    
    149 201
         # Clean-up root logger for any pre-configuration:
    
    150
    -    for log_handler in logger.handlers[:]:
    
    151
    -        logger.removeHandler(log_handler)
    
    152
    -    for log_filter in logger.filters[:]:
    
    153
    -        logger.removeFilter(log_filter)
    
    154
    -
    
    155
    -    logging.basicConfig(
    
    156
    -        format='%(asctime)s:%(name)32.32s][%(levelname)5.5s]: %(message)s')
    
    157
    -
    
    158
    -    if verbose == 1:
    
    159
    -        logger.setLevel(logging.WARNING)
    
    160
    -    elif verbose == 2:
    
    161
    -        logger.setLevel(logging.INFO)
    
    162
    -    elif verbose >= 3:
    
    163
    -        logger.setLevel(logging.DEBUG)
    
    164
    -    else:
    
    165
    -        logger.setLevel(logging.ERROR)
    202
    +    for log_handler in root_logger.handlers[:]:
    
    203
    +        root_logger.removeHandler(log_handler)
    
    204
    +    for log_filter in root_logger.filters[:]:
    
    205
    +        root_logger.removeFilter(log_filter)
    
    206
    +
    
    207
    +    # Filter debug messages using BGD_MESSAGE_DEBUG value:
    
    208
    +    debug_domains = os.environ.get('BGD_MESSAGE_DEBUG', None)
    
    209
    +    if debug_domains:
    
    210
    +        root_logger.addFilter(DebugFilter(debug_domains))

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -34,7 +34,7 @@ from buildgrid.bot.hardware.worker import Worker
    34 34
     
    
    35 35
     
    
    36 36
     from ..bots import buildbox, dummy, host
    
    37
    -from ..cli import pass_context
    
    37
    +from ..cli import pass_context, setup_logging
    
    38 38
     
    
    39 39
     
    
    40 40
     @click.group(name='bot', short_help="Create and register bot clients.")
    
    ... ... @@ -58,9 +58,12 @@ from ..cli import pass_context
    58 58
                   help="Time period for bot updates to the server in seconds.")
    
    59 59
     @click.option('--parent', type=click.STRING, default='main', show_default=True,
    
    60 60
                   help="Targeted farm resource.")
    
    61
    +@click.option('-v', '--verbose', count=True,
    
    62
    +              help='Increase log verbosity level.')
    
    61 63
     @pass_context
    
    62 64
     def cli(context, parent, update_period, remote, client_key, client_cert, server_cert,
    
    63
    -        remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
    
    65
    +        remote_cas, cas_client_key, cas_client_cert, cas_server_cert, verbose):
    
    66
    +    setup_logging(verbosity=verbose)
    
    64 67
         # Setup the remote execution server channel:
    
    65 68
         url = urlparse(remote)
    
    66 69
     
    
    ... ... @@ -122,9 +125,8 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
    122 125
             context.cas_client_cert = context.client_cert
    
    123 126
             context.cas_server_cert = context.server_cert
    
    124 127
     
    
    125
    -    click.echo("Starting for remote=[{}]".format(context.remote))
    
    126
    -
    
    127 128
         bot_interface = interface.BotInterface(context.channel)
    
    129
    +
    
    128 130
         worker = Worker()
    
    129 131
         worker.add_device(Device())
    
    130 132
         hardware_interface = HardwareInterface(worker)
    

  • buildgrid/_app/commands/cmd_server.py
    ... ... @@ -26,7 +26,7 @@ import click
    26 26
     
    
    27 27
     from buildgrid.server.instance import BuildGridServer
    
    28 28
     
    
    29
    -from ..cli import pass_context
    
    29
    +from ..cli import pass_context, setup_logging
    
    30 30
     from ..settings import parser
    
    31 31
     
    
    32 32
     
    
    ... ... @@ -37,9 +37,14 @@ def cli(context):
    37 37
     
    
    38 38
     
    
    39 39
     @cli.command('start', short_help="Setup a new server instance.")
    
    40
    -@click.argument('CONFIG', type=click.Path(file_okay=True, dir_okay=False, writable=False))
    
    40
    +@click.argument('CONFIG',
    
    41
    +                type=click.Path(file_okay=True, dir_okay=False, writable=False))
    
    42
    +@click.option('-v', '--verbose', count=True,
    
    43
    +              help='Increase log verbosity level.')
    
    41 44
     @pass_context
    
    42
    -def start(context, config):
    
    45
    +def start(context, config, verbose):
    
    46
    +    setup_logging(verbosity=verbose)
    
    47
    +
    
    43 48
         with open(config) as f:
    
    44 49
             settings = parser.get_parser().safe_load(f)
    
    45 50
     
    

  • buildgrid/client/cas.py
    ... ... @@ -23,19 +23,13 @@ from buildgrid._exceptions import NotFoundError
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    24 24
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    25 25
     from buildgrid._protos.google.rpc import code_pb2
    
    26
    -from buildgrid.settings import HASH
    
    26
    +from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
    
    27 27
     from buildgrid.utils import merkle_tree_maker
    
    28 28
     
    
    29 29
     
    
    30 30
     # Maximum size for a queueable file:
    
    31 31
     FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    32 32
     
    
    33
    -# Maximum size for a single gRPC request:
    
    34
    -MAX_REQUEST_SIZE = 2 * 1024 * 1024
    
    35
    -
    
    36
    -# Maximum number of elements per gRPC request:
    
    37
    -MAX_REQUEST_COUNT = 500
    
    38
    -
    
    39 33
     
    
    40 34
     class _CallCache:
    
    41 35
         """Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
    
    ... ... @@ -390,11 +384,10 @@ class Downloader:
    390 384
                     assert digest.hash in directories
    
    391 385
     
    
    392 386
                     directory = directories[digest.hash]
    
    393
    -                self._write_directory(digest.hash, directory_path,
    
    387
    +                self._write_directory(directory, directory_path,
    
    394 388
                                           directories=directories, root_barrier=directory_path)
    
    395 389
     
    
    396 390
                     directory_fetched = True
    
    397
    -
    
    398 391
                 except grpc.RpcError as e:
    
    399 392
                     status_code = e.code()
    
    400 393
                     if status_code == grpc.StatusCode.UNIMPLEMENTED:
    

  • buildgrid/server/_monitoring.py
    ... ... @@ -156,9 +156,11 @@ class MonitoringBus:
    156 156
                     output_writers.append(output_file)
    
    157 157
     
    
    158 158
                     while True:
    
    159
    -                    if await __streaming_worker(iter(output_file)):
    
    159
    +                    if await __streaming_worker([output_file]):
    
    160 160
                             self.__sequence_number += 1
    
    161 161
     
    
    162
    +                        output_file.flush()
    
    163
    +
    
    162 164
                 else:
    
    163 165
                     output_writers.append(sys.stdout.buffer)
    
    164 166
     
    

  • buildgrid/server/cas/instance.py
    ... ... @@ -24,7 +24,7 @@ import logging
    24 24
     from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    25 25
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    26 26
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    27
    -from buildgrid.settings import HASH, HASH_LENGTH
    
    27
    +from buildgrid.settings import HASH, HASH_LENGTH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
    
    28 28
     from buildgrid.utils import get_hash_type
    
    29 29
     
    
    30 30
     
    
    ... ... @@ -42,9 +42,7 @@ class ContentAddressableStorageInstance:
    42 42
             return get_hash_type()
    
    43 43
     
    
    44 44
         def max_batch_total_size_bytes(self):
    
    45
    -        # TODO: link with max size
    
    46
    -        # Should be added from settings in MR !119
    
    47
    -        return 2000000
    
    45
    +        return MAX_REQUEST_SIZE
    
    48 46
     
    
    49 47
         def symlink_absolute_path_strategy(self):
    
    50 48
             # Currently this strategy is hardcoded into BuildGrid
    
    ... ... @@ -72,6 +70,41 @@ class ContentAddressableStorageInstance:
    72 70
     
    
    73 71
             return response
    
    74 72
     
    
    73
    +    def get_tree(self, request):
    
    74
    +        storage = self._storage
    
    75
    +
    
    76
    +        response = re_pb2.GetTreeResponse()
    
    77
    +        page_size = request.page_size
    
    78
    +
    
    79
    +        if not request.page_size:
    
    80
    +            request.page_size = MAX_REQUEST_COUNT
    
    81
    +
    
    82
    +        root_digest = request.root_digest
    
    83
    +        page_size = request.page_size
    
    84
    +
    
    85
    +        def __get_tree(node_digest):
    
    86
    +            nonlocal response, page_size, request
    
    87
    +
    
    88
    +            if not page_size:
    
    89
    +                page_size = request.page_size
    
    90
    +                yield response
    
    91
    +                response = re_pb2.GetTreeResponse()
    
    92
    +
    
    93
    +            if response.ByteSize() >= (MAX_REQUEST_SIZE):
    
    94
    +                yield response
    
    95
    +                response = re_pb2.GetTreeResponse()
    
    96
    +
    
    97
    +            directory_from_digest = storage.get_message(node_digest, re_pb2.Directory)
    
    98
    +            page_size -= 1
    
    99
    +            response.directories.extend([directory_from_digest])
    
    100
    +
    
    101
    +            for directory in directory_from_digest.directories:
    
    102
    +                yield from __get_tree(directory.digest)
    
    103
    +
    
    104
    +            yield response
    
    105
    +
    
    106
    +        return __get_tree(root_digest)
    
    107
    +
    
    75 108
     
    
    76 109
     class ByteStreamInstance:
    
    77 110
     
    

  • buildgrid/server/cas/service.py
    ... ... @@ -86,10 +86,16 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    86 86
         def GetTree(self, request, context):
    
    87 87
             self.__logger.debug("GetTree request from [%s]", context.peer())
    
    88 88
     
    
    89
    -        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    90
    -        context.set_details('Method not implemented!')
    
    89
    +        try:
    
    90
    +            instance = self._get_instance(request.instance_name)
    
    91
    +            yield from instance.get_tree(request)
    
    92
    +
    
    93
    +        except InvalidArgumentError as e:
    
    94
    +            self.__logger.error(e)
    
    95
    +            context.set_details(str(e))
    
    96
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    91 97
     
    
    92
    -        return iter([remote_execution_pb2.GetTreeResponse()])
    
    98
    +            yield remote_execution_pb2.GetTreeResponse()
    
    93 99
     
    
    94 100
         def _get_instance(self, instance_name):
    
    95 101
             try:
    

  • buildgrid/server/instance.py
    ... ... @@ -15,26 +15,29 @@
    15 15
     
    
    16 16
     import asyncio
    
    17 17
     from concurrent import futures
    
    18
    -from datetime import timedelta
    
    18
    +from datetime import datetime, timedelta
    
    19 19
     import logging
    
    20
    +import logging.handlers
    
    20 21
     import os
    
    21 22
     import signal
    
    23
    +import sys
    
    22 24
     import time
    
    23 25
     
    
    24 26
     import grpc
    
    27
    +import janus
    
    25 28
     
    
    26
    -from buildgrid._enums import BotStatus, MetricRecordDomain, MetricRecordType
    
    29
    +from buildgrid._enums import BotStatus, LogRecordLevel, MetricRecordDomain, MetricRecordType
    
    27 30
     from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    28 31
     from buildgrid.server.actioncache.service import ActionCacheService
    
    29 32
     from buildgrid.server.bots.service import BotsService
    
    33
    +from buildgrid.server.capabilities.instance import CapabilitiesInstance
    
    34
    +from buildgrid.server.capabilities.service import CapabilitiesService
    
    30 35
     from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    31 36
     from buildgrid.server.execution.service import ExecutionService
    
    32 37
     from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
    
    33 38
     from buildgrid.server.operations.service import OperationsService
    
    34 39
     from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    35
    -from buildgrid.server.capabilities.instance import CapabilitiesInstance
    
    36
    -from buildgrid.server.capabilities.service import CapabilitiesService
    
    37
    -from buildgrid.settings import MONITORING_PERIOD
    
    40
    +from buildgrid.settings import LOG_RECORD_FORMAT, MONITORING_PERIOD
    
    38 41
     
    
    39 42
     
    
    40 43
     class BuildGridServer:
    
    ... ... @@ -60,9 +63,16 @@ class BuildGridServer:
    60 63
             self.__grpc_server = grpc.server(self.__grpc_executor)
    
    61 64
     
    
    62 65
             self.__main_loop = asyncio.get_event_loop()
    
    66
    +
    
    63 67
             self.__monitoring_bus = None
    
    64 68
     
    
    69
    +        self.__logging_queue = janus.Queue(loop=self.__main_loop)
    
    70
    +        self.__logging_handler = logging.handlers.QueueHandler(self.__logging_queue.sync_q)
    
    71
    +        self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT)
    
    72
    +        self.__print_log_records = True
    
    73
    +
    
    65 74
             self.__state_monitoring_task = None
    
    75
    +        self.__logging_task = None
    
    66 76
     
    
    67 77
             # We always want a capabilities service
    
    68 78
             self._capabilities_service = CapabilitiesService(self.__grpc_server)
    
    ... ... @@ -85,6 +95,17 @@ class BuildGridServer:
    85 95
                     self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
    
    86 96
                     serialisation_format=MonitoringOutputFormat.JSON)
    
    87 97
     
    
    98
    +        # Setup the main logging handler:
    
    99
    +        root_logger = logging.getLogger()
    
    100
    +
    
    101
    +        for log_filter in root_logger.filters[:]:
    
    102
    +            self.__logging_handler.addFilter(log_filter)
    
    103
    +            root_logger.removeFilter(log_filter)
    
    104
    +
    
    105
    +        for log_handler in root_logger.handlers[:]:
    
    106
    +            root_logger.removeHandler(log_handler)
    
    107
    +        root_logger.addHandler(self.__logging_handler)
    
    108
    +
    
    88 109
         # --- Public API ---
    
    89 110
     
    
    90 111
         def start(self):
    
    ... ... @@ -98,6 +119,9 @@ class BuildGridServer:
    98 119
                     self._state_monitoring_worker(period=MONITORING_PERIOD),
    
    99 120
                     loop=self.__main_loop)
    
    100 121
     
    
    122
    +        self.__logging_task = asyncio.ensure_future(
    
    123
    +            self._logging_worker(), loop=self.__main_loop)
    
    124
    +
    
    101 125
             self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
    
    102 126
     
    
    103 127
             self.__main_loop.run_forever()
    
    ... ... @@ -110,6 +134,9 @@ class BuildGridServer:
    110 134
     
    
    111 135
                 self.__monitoring_bus.stop()
    
    112 136
     
    
    137
    +        if self.__logging_task is not None:
    
    138
    +            self.__logging_task.cancel()
    
    139
    +
    
    113 140
             self.__main_loop.stop()
    
    114 141
     
    
    115 142
             self.__grpc_server.stop(None)
    
    ... ... @@ -278,6 +305,53 @@ class BuildGridServer:
    278 305
                                                              execution_instance)
    
    279 306
                 self._capabilities_service.add_instance(instance_name, capabilities_instance)
    
    280 307
     
    
    308
    +    async def _logging_worker(self):
    
    309
    +        """Publishes log records to the monitoring bus."""
    
    310
    +        async def __logging_worker():
    
    311
    +            log_record = await self.__logging_queue.async_q.get()
    
    312
    +
    
    313
    +            # Print log records to stdout, if required:
    
    314
    +            if self.__print_log_records:
    
    315
    +                record = self.__logging_formatter.format(log_record)
    
    316
    +
    
    317
    +                # TODO: Investigate if async write would be worth here.
    
    318
    +                sys.stdout.write('{}\n'.format(record))
    
    319
    +                sys.stdout.flush()
    
    320
    +
    
    321
    +            # Emit a log record if server is instrumented:
    
    322
    +            if self._is_instrumented:
    
    323
    +                log_record_level = LogRecordLevel(int(log_record.levelno / 10))
    
    324
    +                log_record_creation_time = datetime.fromtimestamp(log_record.created)
    
    325
    +                # logging.LogRecord.extra must be a str to str dict:
    
    326
    +                if 'extra' in log_record.__dict__ and log_record.extra:
    
    327
    +                    log_record_metadata = log_record.extra
    
    328
    +                else:
    
    329
    +                    log_record_metadata = None
    
    330
    +                record = self._forge_log_record(
    
    331
    +                    log_record.name, log_record_level, log_record.message,
    
    332
    +                    log_record_creation_time, metadata=log_record_metadata)
    
    333
    +
    
    334
    +                await self.__monitoring_bus.send_record(record)
    
    335
    +
    
    336
    +        try:
    
    337
    +            while True:
    
    338
    +                await __logging_worker()
    
    339
    +
    
    340
    +        except asyncio.CancelledError:
    
    341
    +            pass
    
    342
    +
    
    343
    +    def _forge_log_record(self, domain, level, message, creation_time, metadata=None):
    
    344
    +        log_record = monitoring_pb2.LogRecord()
    
    345
    +
    
    346
    +        log_record.creation_timestamp.FromDatetime(creation_time)
    
    347
    +        log_record.domain = domain
    
    348
    +        log_record.level = level.value
    
    349
    +        log_record.message = message
    
    350
    +        if metadata is not None:
    
    351
    +            log_record.metadata.update(metadata)
    
    352
    +
    
    353
    +        return log_record
    
    354
    +
    
    281 355
         async def _state_monitoring_worker(self, period=1.0):
    
    282 356
             """Periodically publishes state metrics to the monitoring bus."""
    
    283 357
             async def __state_monitoring_worker():
    

  • buildgrid/settings.py
    ... ... @@ -24,3 +24,14 @@ HASH_LENGTH = HASH().digest_size * 2
    24 24
     
    
    25 25
     # Period, in seconds, for the monitoring cycle:
    
    26 26
     MONITORING_PERIOD = 5.0
    
    27
    +
    
    28
    +# Maximum size for a single gRPC request:
    
    29
    +MAX_REQUEST_SIZE = 2 * 1024 * 1024
    
    30
    +
    
    31
    +# Maximum number of elements per gRPC request:
    
    32
    +MAX_REQUEST_COUNT = 500
    
    33
    +
    
    34
    +# String format for log records:
    
    35
    +LOG_RECORD_FORMAT = '%(asctime)s:[%(name)36.36s][%(levelname)5.5s]: %(message)s'
    
    36
    +# The different log record attributes are documented here:
    
    37
    +# https://docs.python.org/3/library/logging.html#logrecord-attributes



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]