[Notes] [Git][BuildGrid/buildgrid][mablanch/23-new-logging] 15 commits: monitoring.py: New monitoring bus class



Title: GitLab

Martin Blanchard pushed to branch mablanch/23-new-logging at BuildGrid / buildgrid

Commits:

15 changed files:

Changes:

  • .pylintrc
    ... ... @@ -460,6 +460,7 @@ known-third-party=boto3,
    460 460
                       enchant,
    
    461 461
                       google,
    
    462 462
                       grpc,
    
    463
    +                  janus,
    
    463 464
                       moto,
    
    464 465
                       yaml
    
    465 466
     
    
    ... ... @@ -523,4 +524,4 @@ valid-metaclass-classmethod-first-arg=mcs
    523 524
     
    
    524 525
     # Exceptions that will emit a warning when being caught. Defaults to
    
    525 526
     # "Exception"
    
    526
    -overgeneral-exceptions=Exception
    527
    +overgeneral-exceptions=Exception
    \ No newline at end of file

  • buildgrid/_app/cli.py
    ... ... @@ -27,6 +27,7 @@ import os
    27 27
     import click
    
    28 28
     import grpc
    
    29 29
     
    
    30
    +from buildgrid.settings import LOG_RECORD_FORMAT
    
    30 31
     from buildgrid.utils import read_file
    
    31 32
     
    
    32 33
     CONTEXT_SETTINGS = dict(auto_envvar_prefix='BUILDGRID')
    
    ... ... @@ -138,11 +139,38 @@ class BuildGridCLI(click.MultiCommand):
    138 139
             return mod.cli
    
    139 140
     
    
    140 141
     
    
    142
    +class DebugFilter(logging.Filter):
    
    143
    +
    
    144
    +    def __init__(self, debug_domains, name=''):
    
    145
    +        super().__init__(name=name)
    
    146
    +        self.__domains_tree = {}
    
    147
    +
    
    148
    +        for domain in debug_domains.split(','):
    
    149
    +            domains_tree = self.__domains_tree
    
    150
    +            for label in domain.split('.'):
    
    151
    +                if label not in domains_tree:
    
    152
    +                    domains_tree[label] = {}
    
    153
    +                domains_tree = domains_tree[label]
    
    154
    +
    
    155
    +    def filter(self, record):
    
    156
    +        domains_tree = self.__domains_tree
    
    157
    +        for label in record.name.split('.'):
    
    158
    +            if '*' in domains_tree:
    
    159
    +                return True
    
    160
    +            if label not in domains_tree:
    
    161
    +                return False
    
    162
    +            domains_tree = domains_tree[label]
    
    163
    +
    
    164
    +        return True
    
    165
    +
    
    166
    +
    
    141 167
     @click.command(cls=BuildGridCLI, context_settings=CONTEXT_SETTINGS)
    
    168
    +@click.option('--no-print', is_flag=True, show_default=True,
    
    169
    +              help="Do not print log records to stdout/stderr.")
    
    142 170
     @click.option('-v', '--verbose', count=True,
    
    143 171
                   help='Increase log verbosity level.')
    
    144 172
     @pass_context
    
    145
    -def cli(context, verbose):
    
    173
    +def cli(context, no_print, verbose):
    
    146 174
         """BuildGrid App"""
    
    147 175
         logger = logging.getLogger()
    
    148 176
     
    
    ... ... @@ -152,8 +180,20 @@ def cli(context, verbose):
    152 180
         for log_filter in logger.filters[:]:
    
    153 181
             logger.removeFilter(log_filter)
    
    154 182
     
    
    155
    -    logging.basicConfig(
    
    156
    -        format='%(asctime)s:%(name)32.32s][%(levelname)5.5s]: %(message)s')
    
    183
    +    # Do not register a stream handler if no-print is requested:
    
    184
    +    if not no_print:
    
    185
    +        log_handler = logging.StreamHandler()
    
    186
    +
    
    187
    +        # Filter debug messages using BGD_MESSAGE_DEBUG value:
    
    188
    +        debug_domains = os.environ.get('BGD_MESSAGE_DEBUG', None)
    
    189
    +        if debug_domains:
    
    190
    +            log_handler.addFilter(DebugFilter(debug_domains))
    
    191
    +
    
    192
    +    else:
    
    193
    +        log_handler = logging.NullHandler()
    
    194
    +
    
    195
    +    logging.basicConfig(format=LOG_RECORD_FORMAT,
    
    196
    +                        handlers=[log_handler])
    
    157 197
     
    
    158 198
         if verbose == 1:
    
    159 199
             logger.setLevel(logging.WARNING)
    
    ... ... @@ -161,5 +201,3 @@ def cli(context, verbose):
    161 201
             logger.setLevel(logging.INFO)
    
    162 202
         elif verbose >= 3:
    
    163 203
             logger.setLevel(logging.DEBUG)
    164
    -    else:
    
    165
    -        logger.setLevel(logging.ERROR)

  • buildgrid/_app/commands/cmd_server.py
    ... ... @@ -20,7 +20,6 @@ Server command
    20 20
     Create a BuildGrid server.
    
    21 21
     """
    
    22 22
     
    
    23
    -import asyncio
    
    24 23
     import logging
    
    25 24
     import sys
    
    26 25
     
    
    ... ... @@ -52,18 +51,14 @@ def start(context, config):
    52 51
             click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True)
    
    53 52
             sys.exit(-1)
    
    54 53
     
    
    55
    -    loop = asyncio.get_event_loop()
    
    56 54
         try:
    
    57 55
             server.start()
    
    58
    -        loop.run_forever()
    
    59 56
     
    
    60 57
         except KeyboardInterrupt:
    
    61 58
             pass
    
    62 59
     
    
    63 60
         finally:
    
    64
    -        context.logger.info("Stopping server")
    
    65 61
             server.stop()
    
    66
    -        loop.close()
    
    67 62
     
    
    68 63
     
    
    69 64
     def _create_server_from_config(config):
    

  • buildgrid/server/_monitoring.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import asyncio
    
    17
    +
    
    18
    +from google.protobuf import json_format
    
    19
    +
    
    20
    +from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    21
    +
    
    22
    +
    
    23
    +class MonitoringBus:
    
    24
    +
    
    25
    +    def __init__(self, loop):
    
    26
    +        self.__event_loop = loop
    
    27
    +        self.__streaming_task = None
    
    28
    +
    
    29
    +        self.__message_queue = asyncio.Queue(loop=loop)
    
    30
    +        self.__sequence_number = 1
    
    31
    +
    
    32
    +    # --- Public API ---
    
    33
    +
    
    34
    +    def start(self):
    
    35
    +        """Starts the monitoring bus worker task."""
    
    36
    +        self.__streaming_task = asyncio.ensure_future(
    
    37
    +            self._streaming_worker(), loop=self.__event_loop)
    
    38
    +
    
    39
    +    def stop(self):
    
    40
    +        """Cancels the monitoring bus worker task."""
    
    41
    +        if self.__streaming_task is not None:
    
    42
    +            self.__streaming_task.cancel()
    
    43
    +
    
    44
    +    async def send_record(self, record):
    
    45
    +        """Publishes a record onto the bus asynchronously.
    
    46
    +
    
    47
    +        Args:
    
    48
    +            record (Message): The
    
    49
    +        """
    
    50
    +        await self.__message_queue.put(record)
    
    51
    +
    
    52
    +    def send_record_nowait(self, record):
    
    53
    +        """Publishes a record onto the bus.
    
    54
    +
    
    55
    +        Args:
    
    56
    +            record (Message): The
    
    57
    +        """
    
    58
    +        self.__message_queue.put_nowait(record)
    
    59
    +
    
    60
    +    # --- Private API ---
    
    61
    +
    
    62
    +    async def _streaming_worker(self):
    
    63
    +        """Handles bus messages steaming work."""
    
    64
    +        async def __streaming_worker():
    
    65
    +            record = await self.__message_queue.get()
    
    66
    +
    
    67
    +            message = monitoring_pb2.BusMessage()
    
    68
    +            message.sequence_number = self.__sequence_number
    
    69
    +
    
    70
    +            if record.DESCRIPTOR is monitoring_pb2.LogRecord.DESCRIPTOR:
    
    71
    +                message.log_record.CopyFrom(record)
    
    72
    +
    
    73
    +            elif record.DESCRIPTOR is monitoring_pb2.MetricRecord.DESCRIPTOR:
    
    74
    +                message.metric_record.CopyFrom(record)
    
    75
    +
    
    76
    +            else:
    
    77
    +                return False
    
    78
    +
    
    79
    +            # print(json_format.MessageToJson(message))
    
    80
    +
    
    81
    +            return True
    
    82
    +
    
    83
    +        try:
    
    84
    +            while True:
    
    85
    +                if await __streaming_worker():
    
    86
    +                    self.__sequence_number += 1
    
    87
    +
    
    88
    +        except asyncio.CancelledError:
    
    89
    +            pass

  • buildgrid/server/bots/instance.py
    ... ... @@ -37,6 +37,10 @@ class BotsInterface:
    37 37
             self._bot_sessions = {}
    
    38 38
             self._scheduler = scheduler
    
    39 39
     
    
    40
    +    @property
    
    41
    +    def scheduler(self):
    
    42
    +        return self._scheduler
    
    43
    +
    
    40 44
         def register_instance_with_server(self, instance_name, server):
    
    41 45
             server.add_bots_interface(self, instance_name)
    
    42 46
     
    

  • buildgrid/server/bots/service.py
    ... ... @@ -23,8 +23,9 @@ import logging
    23 23
     
    
    24 24
     import grpc
    
    25 25
     
    
    26
    -from google.protobuf.empty_pb2 import Empty
    
    26
    +from google.protobuf import empty_pb2, timestamp_pb2
    
    27 27
     
    
    28
    +from buildgrid._enums import BotStatus
    
    28 29
     from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    29 30
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    30 31
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    ... ... @@ -32,24 +33,87 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grp
    32 33
     
    
    33 34
     class BotsService(bots_pb2_grpc.BotsServicer):
    
    34 35
     
    
    35
    -    def __init__(self, server):
    
    36
    +    def __init__(self, server, monitor=True):
    
    36 37
             self.__logger = logging.getLogger(__name__)
    
    37 38
     
    
    39
    +        self.__bots_by_status = None
    
    40
    +        self.__bots_by_instance = None
    
    41
    +        self.__bots = None
    
    42
    +
    
    38 43
             self._instances = {}
    
    39 44
     
    
    40 45
             bots_pb2_grpc.add_BotsServicer_to_server(self, server)
    
    41 46
     
    
    42
    -    def add_instance(self, name, instance):
    
    43
    -        self._instances[name] = instance
    
    47
    +        self._is_monitored = monitor
    
    48
    +
    
    49
    +        if self._is_monitored:
    
    50
    +            self.__bots_by_status = {}
    
    51
    +            self.__bots_by_instance = {}
    
    52
    +            self.__bots = {}
    
    53
    +
    
    54
    +            self.__bots_by_status[BotStatus.OK] = set()
    
    55
    +            self.__bots_by_status[BotStatus.UNHEALTHY] = set()
    
    56
    +            self.__bots_by_status[BotStatus.HOST_REBOOTING] = set()
    
    57
    +            self.__bots_by_status[BotStatus.BOT_TERMINATING] = set()
    
    58
    +
    
    59
    +    # --- Public API ---
    
    60
    +
    
    61
    +    def add_instance(self, instance_name, instance):
    
    62
    +        """Registers a new servicer instance.
    
    63
    +
    
    64
    +        Args:
    
    65
    +            instance_name (str): The new instance's name.
    
    66
    +            instance (BotsInterface): The new instance itself.
    
    67
    +        """
    
    68
    +        self._instances[instance_name] = instance
    
    69
    +
    
    70
    +        if self._is_monitored:
    
    71
    +            self.__bots_by_instance[instance_name] = 0
    
    72
    +
    
    73
    +    def get_scheduler(self, instance_name):
    
    74
    +        """Retrieves a reference to the scheduler for an instance.
    
    75
    +
    
    76
    +        Args:
    
    77
    +            instance_name (str): The name of the instance to query.
    
    78
    +
    
    79
    +        Returns:
    
    80
    +            Scheduler: A reference to the scheduler for `instance_name`.
    
    81
    +
    
    82
    +        Raises:
    
    83
    +            InvalidArgumentError: If no instance named `instance_name` exists.
    
    84
    +        """
    
    85
    +        instance = self._get_instance(instance_name)
    
    86
    +
    
    87
    +        return instance.scheduler
    
    88
    +
    
    89
    +    # --- Public API: Servicer ---
    
    44 90
     
    
    45 91
         def CreateBotSession(self, request, context):
    
    92
    +        """Handles CreateBotSessionRequest messages.
    
    93
    +
    
    94
    +        Args:
    
    95
    +            request (CreateBotSessionRequest): The incoming RPC request.
    
    96
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    97
    +        """
    
    46 98
             self.__logger.debug("CreateBotSession request from [%s]", context.peer())
    
    47 99
     
    
    100
    +        instance_name = request.parent
    
    101
    +        bot_status = BotStatus(request.bot_session.status)
    
    102
    +        bot_id = request.bot_session.bot_id
    
    103
    +
    
    48 104
             try:
    
    49
    -            parent = request.parent
    
    50
    -            instance = self._get_instance(request.parent)
    
    51
    -            return instance.create_bot_session(parent,
    
    52
    -                                               request.bot_session)
    
    105
    +            instance = self._get_instance(instance_name)
    
    106
    +            bot_session = instance.create_bot_session(instance_name,
    
    107
    +                                                      request.bot_session)
    
    108
    +            now = timestamp_pb2.Timestamp()
    
    109
    +            now.GetCurrentTime()
    
    110
    +
    
    111
    +            if self._is_monitored:
    
    112
    +                self.__bots[bot_id] = now
    
    113
    +                self.__bots_by_instance[instance_name] += 1
    
    114
    +                self.__bots_by_status[bot_status].add(bot_id)
    
    115
    +
    
    116
    +            return bot_session
    
    53 117
     
    
    54 118
             except InvalidArgumentError as e:
    
    55 119
                 self.__logger.error(e)
    
    ... ... @@ -59,17 +123,36 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    59 123
             return bots_pb2.BotSession()
    
    60 124
     
    
    61 125
         def UpdateBotSession(self, request, context):
    
    126
    +        """Handles UpdateBotSessionRequest messages.
    
    127
    +
    
    128
    +        Args:
    
    129
    +            request (UpdateBotSessionRequest): The incoming RPC request.
    
    130
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    131
    +        """
    
    62 132
             self.__logger.debug("UpdateBotSession request from [%s]", context.peer())
    
    63 133
     
    
    134
    +        names = request.name.split("/")
    
    135
    +        bot_status = BotStatus(request.bot_session.status)
    
    136
    +        bot_id = request.bot_session.bot_id
    
    137
    +
    
    64 138
             try:
    
    65
    -            names = request.name.split("/")
    
    66
    -            # Operation name should be in format:
    
    67
    -            # {instance/name}/{uuid}
    
    68
    -            instance_name = ''.join(names[0:-1])
    
    139
    +            instance_name = '/'.join(names[:-1])
    
    69 140
     
    
    70 141
                 instance = self._get_instance(instance_name)
    
    71
    -            return instance.update_bot_session(request.name,
    
    72
    -                                               request.bot_session)
    
    142
    +            bot_session = instance.update_bot_session(request.name,
    
    143
    +                                                      request.bot_session)
    
    144
    +
    
    145
    +            if self._is_monitored:
    
    146
    +                self.__bots[bot_id].GetCurrentTime()
    
    147
    +                if bot_id not in self.__bots_by_status[bot_status]:
    
    148
    +                    self.__bots_by_status[BotStatus.OK].discard(bot_id)
    
    149
    +                    self.__bots_by_status[BotStatus.UNHEALTHY].discard(bot_id)
    
    150
    +                    self.__bots_by_status[BotStatus.HOST_REBOOTING].discard(bot_id)
    
    151
    +                    self.__bots_by_status[BotStatus.BOT_TERMINATING].discard(bot_id)
    
    152
    +
    
    153
    +                    self.__bots_by_status[bot_status].add(bot_id)
    
    154
    +
    
    155
    +            return bot_session
    
    73 156
     
    
    74 157
             except InvalidArgumentError as e:
    
    75 158
                 self.__logger.error(e)
    
    ... ... @@ -89,10 +172,40 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    89 172
             return bots_pb2.BotSession()
    
    90 173
     
    
    91 174
         def PostBotEventTemp(self, request, context):
    
    175
    +        """Handles PostBotEventTempRequest messages.
    
    176
    +
    
    177
    +        Args:
    
    178
    +            request (PostBotEventTempRequest): The incoming RPC request.
    
    179
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    180
    +        """
    
    92 181
             self.__logger.debug("PostBotEventTemp request from [%s]", context.peer())
    
    93 182
     
    
    94 183
             context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    95
    -        return Empty()
    
    184
    +
    
    185
    +        return empty_pb2.Empty()
    
    186
    +
    
    187
    +    # --- Public API: Monitoring ---
    
    188
    +
    
    189
    +    @property
    
    190
    +    def is_monitored(self):
    
    191
    +        return self._is_monitored
    
    192
    +
    
    193
    +    def query_n_bots(self):
    
    194
    +        return len(self.__bots)
    
    195
    +
    
    196
    +    def query_n_bots_for_instance(self, instance_name):
    
    197
    +        try:
    
    198
    +            return self.__bots_by_instance[instance_name]
    
    199
    +        except KeyError:
    
    200
    +            return 0
    
    201
    +
    
    202
    +    def query_n_bots_for_status(self, bot_status):
    
    203
    +        try:
    
    204
    +            return len(self.__bots_by_status[bot_status])
    
    205
    +        except KeyError:
    
    206
    +            return 0
    
    207
    +
    
    208
    +    # --- Private API ---
    
    96 209
     
    
    97 210
         def _get_instance(self, name):
    
    98 211
             try:
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -35,6 +35,10 @@ class ExecutionInstance:
    35 35
             self._storage = storage
    
    36 36
             self._scheduler = scheduler
    
    37 37
     
    
    38
    +    @property
    
    39
    +    def scheduler(self):
    
    40
    +        return self._scheduler
    
    41
    +
    
    38 42
         def register_instance_with_server(self, instance_name, server):
    
    39 43
             server.add_execution_instance(self, instance_name)
    
    40 44
     
    

  • buildgrid/server/execution/service.py
    ... ... @@ -33,30 +33,84 @@ from buildgrid._protos.google.longrunning import operations_pb2
    33 33
     
    
    34 34
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    35 35
     
    
    36
    -    def __init__(self, server):
    
    36
    +    def __init__(self, server, monitor=True):
    
    37 37
             self.__logger = logging.getLogger(__name__)
    
    38 38
     
    
    39
    +        self.__peers_by_instance = None
    
    40
    +        self.__peers = None
    
    41
    +
    
    39 42
             self._instances = {}
    
    43
    +
    
    40 44
             remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
    
    41 45
     
    
    42
    -    def add_instance(self, name, instance):
    
    43
    -        self._instances[name] = instance
    
    46
    +        self._is_monitored = monitor
    
    47
    +
    
    48
    +        if self._is_monitored:
    
    49
    +            self.__peers_by_instance = {}
    
    50
    +            self.__peers = {}
    
    51
    +
    
    52
    +    # --- Public API ---
    
    53
    +
    
    54
    +    def add_instance(self, instance_name, instance):
    
    55
    +        """Registers a new servicer instance.
    
    56
    +
    
    57
    +        Args:
    
    58
    +            instance_name (str): The new instance's name.
    
    59
    +            instance (ExecutionInstance): The new instance itself.
    
    60
    +        """
    
    61
    +        self._instances[instance_name] = instance
    
    62
    +
    
    63
    +        if self._is_monitored:
    
    64
    +            self.__peers_by_instance[instance_name] = set()
    
    65
    +
    
    66
    +    def get_scheduler(self, instance_name):
    
    67
    +        """Retrieves a reference to the scheduler for an instance.
    
    68
    +
    
    69
    +        Args:
    
    70
    +            instance_name (str): The name of the instance to query.
    
    71
    +
    
    72
    +        Returns:
    
    73
    +            Scheduler: A reference to the scheduler for `instance_name`.
    
    74
    +
    
    75
    +        Raises:
    
    76
    +            InvalidArgumentError: If no instance named `instance_name` exists.
    
    77
    +        """
    
    78
    +        instance = self._get_instance(instance_name)
    
    79
    +
    
    80
    +        return instance.scheduler
    
    81
    +
    
    82
    +    # --- Public API: Servicer ---
    
    44 83
     
    
    45 84
         def Execute(self, request, context):
    
    85
    +        """Handles ExecuteRequest messages.
    
    86
    +
    
    87
    +        Args:
    
    88
    +            request (ExecuteRequest): The incoming RPC request.
    
    89
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    90
    +        """
    
    46 91
             self.__logger.debug("Execute request from [%s]", context.peer())
    
    47 92
     
    
    93
    +        instance_name = request.instance_name
    
    94
    +        message_queue = queue.Queue()
    
    95
    +        peer = context.peer()
    
    96
    +
    
    48 97
             try:
    
    49
    -            message_queue = queue.Queue()
    
    50
    -            instance = self._get_instance(request.instance_name)
    
    98
    +            instance = self._get_instance(instance_name)
    
    51 99
                 operation = instance.execute(request.action_digest,
    
    52 100
                                              request.skip_cache_lookup,
    
    53 101
                                              message_queue)
    
    54 102
     
    
    55
    -            context.add_callback(partial(instance.unregister_message_client,
    
    56
    -                                         operation.name, message_queue))
    
    103
    +            context.add_callback(partial(self._rpc_termination_callback,
    
    104
    +                                         peer, instance_name, operation.name, message_queue))
    
    105
    +
    
    106
    +            if self._is_monitored:
    
    107
    +                if peer not in self.__peers:
    
    108
    +                    self.__peers_by_instance[instance_name].add(peer)
    
    109
    +                    self.__peers[peer] = 1
    
    110
    +                else:
    
    111
    +                    self.__peers[peer] += 1
    
    57 112
     
    
    58
    -            instanced_op_name = "{}/{}".format(request.instance_name,
    
    59
    -                                               operation.name)
    
    113
    +            instanced_op_name = "{}/{}".format(instance_name, operation.name)
    
    60 114
     
    
    61 115
                 self.__logger.info("Operation name: [%s]", instanced_op_name)
    
    62 116
     
    
    ... ... @@ -80,23 +134,37 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    80 134
                 yield operations_pb2.Operation()
    
    81 135
     
    
    82 136
         def WaitExecution(self, request, context):
    
    137
    +        """Handles WaitExecutionRequest messages.
    
    138
    +
    
    139
    +        Args:
    
    140
    +            request (WaitExecutionRequest): The incoming RPC request.
    
    141
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    142
    +        """
    
    83 143
             self.__logger.debug("WaitExecution request from [%s]", context.peer())
    
    84 144
     
    
    85
    -        try:
    
    86
    -            names = request.name.split("/")
    
    145
    +        names = request.name.split('/')
    
    146
    +        instance_name = '/'.join(names[:-1])
    
    147
    +        operation_name = names[-1]
    
    148
    +        message_queue = queue.Queue()
    
    149
    +        peer = context.peer()
    
    87 150
     
    
    88
    -            # Operation name should be in format:
    
    89
    -            # {instance/name}/{operation_id}
    
    90
    -            instance_name = ''.join(names[0:-1])
    
    151
    +        try:
    
    152
    +            if instance_name != request.instance_name:
    
    153
    +                raise InvalidArgumentError("Invalid operation [{}] for instance [{}]"
    
    154
    +                                            .format(request.name, instance_name))
    
    91 155
     
    
    92
    -            message_queue = queue.Queue()
    
    93
    -            operation_name = names[-1]
    
    94 156
                 instance = self._get_instance(instance_name)
    
    95 157
     
    
    96 158
                 instance.register_message_client(operation_name, message_queue)
    
    159
    +            context.add_callback(partial(self._rpc_termination_callback,
    
    160
    +                                         peer, instance_name, operation_name, message_queue))
    
    97 161
     
    
    98
    -            context.add_callback(partial(instance.unregister_message_client,
    
    99
    -                                         operation_name, message_queue))
    
    162
    +            if self._is_monitored:
    
    163
    +                if peer not in self.__peers:
    
    164
    +                    self.__peers_by_instance[instance_name].add(peer)
    
    165
    +                    self.__peers[peer] = 1
    
    166
    +                else:
    
    167
    +                    self.__peers[peer] += 1
    
    100 168
     
    
    101 169
                 for operation in instance.stream_operation_updates(message_queue,
    
    102 170
                                                                    operation_name):
    
    ... ... @@ -111,6 +179,35 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    111 179
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    112 180
                 yield operations_pb2.Operation()
    
    113 181
     
    
    182
    +    # --- Public API: Monitoring ---
    
    183
    +
    
    184
    +    @property
    
    185
    +    def is_monitored(self):
    
    186
    +        return self._is_monitored
    
    187
    +
    
    188
    +    def query_n_clients(self):
    
    189
    +        return len(self.__peers)
    
    190
    +
    
    191
    +    def query_n_clients_for_instance(self, instance_name):
    
    192
    +        try:
    
    193
    +            return len(self.__peers_by_instance[instance_name])
    
    194
    +        except KeyError:
    
    195
    +            return 0
    
    196
    +
    
    197
    +    # --- Private API ---
    
    198
    +
    
    199
    +    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
    
    200
    +        instance = self._get_instance(instance_name)
    
    201
    +
    
    202
    +        instance.unregister_message_client(job_name, message_queue)
    
    203
    +
    
    204
    +        if self._is_monitored:
    
    205
    +            if self.__peers[peer] > 1:
    
    206
    +                self.__peers[peer] -= 1
    
    207
    +            else:
    
    208
    +                self.__peers_by_instance[instance_name].remove(peer)
    
    209
    +                del self.__peers[peer]
    
    210
    +
    
    114 211
         def _get_instance(self, name):
    
    115 212
             try:
    
    116 213
                 return self._instances[name]
    

  • buildgrid/server/instance.py
    ... ... @@ -13,18 +13,27 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +import asyncio
    
    16 17
     from concurrent import futures
    
    18
    +from datetime import datetime, timedelta
    
    17 19
     import logging
    
    20
    +from logging.handlers import QueueHandler
    
    18 21
     import os
    
    22
    +import time
    
    19 23
     
    
    20 24
     import grpc
    
    25
    +import janus
    
    21 26
     
    
    22
    -from .cas.service import ByteStreamService, ContentAddressableStorageService
    
    23
    -from .actioncache.service import ActionCacheService
    
    24
    -from .execution.service import ExecutionService
    
    25
    -from .operations.service import OperationsService
    
    26
    -from .bots.service import BotsService
    
    27
    -from .referencestorage.service import ReferenceStorageService
    
    27
    +from buildgrid._enums import LogRecordLevel, MetricRecordDomain, MetricRecordType
    
    28
    +from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    29
    +from buildgrid.server.actioncache.service import ActionCacheService
    
    30
    +from buildgrid.server.bots.service import BotsService
    
    31
    +from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    32
    +from buildgrid.server.execution.service import ExecutionService
    
    33
    +from buildgrid.server._monitoring import MonitoringBus
    
    34
    +from buildgrid.server.operations.service import OperationsService
    
    35
    +from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    36
    +from buildgrid.settings import MONITORING_PERIOD
    
    28 37
     
    
    29 38
     
    
    30 39
     class BuildGridServer:
    
    ... ... @@ -34,7 +43,7 @@ class BuildGridServer:
    34 43
         requisite services.
    
    35 44
         """
    
    36 45
     
    
    37
    -    def __init__(self, max_workers=None):
    
    46
    +    def __init__(self, max_workers=None, monitor=True):
    
    38 47
             """Initializes a new :class:`BuildGridServer` instance.
    
    39 48
     
    
    40 49
             Args:
    
    ... ... @@ -46,9 +55,17 @@ class BuildGridServer:
    46 55
                 # Use max_workers default from Python 3.5+
    
    47 56
                 max_workers = (os.cpu_count() or 1) * 5
    
    48 57
     
    
    49
    -        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    58
    +        self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
    
    59
    +        self.__grpc_server = grpc.server(self.__grpc_executor)
    
    50 60
     
    
    51
    -        self._server = server
    
    61
    +        self.__main_loop = asyncio.get_event_loop()
    
    62
    +
    
    63
    +        self.__monitoring_bus = None
    
    64
    +        self.__logging_queue = None
    
    65
    +        self.__logging_handler = None
    
    66
    +
    
    67
    +        self.__monitoring_task = None
    
    68
    +        self.__logging_task = None
    
    52 69
     
    
    53 70
             self._execution_service = None
    
    54 71
             self._bots_service = None
    
    ... ... @@ -58,15 +75,52 @@ class BuildGridServer:
    58 75
             self._cas_service = None
    
    59 76
             self._bytestream_service = None
    
    60 77
     
    
    78
    +        self._is_monitored = monitor
    
    79
    +        self._instances = set()
    
    80
    +
    
    81
    +        if self._is_monitored:
    
    82
    +            self.__monitoring_bus = MonitoringBus(self.__main_loop)
    
    83
    +            self.__logging_queue = janus.Queue(loop=self.__main_loop)
    
    84
    +            self.__logging_handler = QueueHandler(self.__logging_queue.sync_q)
    
    85
    +
    
    86
    +            logging.getLogger().addHandler(self.__logging_handler)
    
    87
    +
    
    88
    +    # --- Public API ---
    
    89
    +
    
    61 90
         def start(self):
    
    62
    -        """Starts the server.
    
    91
    +        """Starts the BuildGrid server.
    
    63 92
             """
    
    64
    -        self._server.start()
    
    93
    +        self.__grpc_server.start()
    
    94
    +
    
    95
    +        if self._is_monitored:
    
    96
    +            self.__monitoring_bus.start()
    
    97
    +
    
    98
    +            self.__monitoring_task = asyncio.ensure_future(
    
    99
    +                self._monitoring_worker(period=MONITORING_PERIOD), loop=self.__main_loop)
    
    100
    +
    
    101
    +            self.__logging_task = asyncio.ensure_future(
    
    102
    +                self._logging_worker(), loop=self.__main_loop)
    
    103
    +
    
    104
    +        self.__main_loop.run_forever()
    
    65 105
     
    
    66 106
         def stop(self, grace=0):
    
    67
    -        """Stops the server.
    
    107
    +        """Stops the BuildGrid server.
    
    108
    +
    
    109
    +        Args:
    
    110
    +            grace (int, optional): A duration of time in seconds. Defaults to 0.
    
    68 111
             """
    
    69
    -        self._server.stop(grace)
    
    112
    +        if self.__monitoring_task is not None:
    
    113
    +            self.__monitoring_task.cancel()
    
    114
    +        if self.__logging_task is not None:
    
    115
    +            self.__logging_task.cancel()
    
    116
    +
    
    117
    +        if self._is_monitored:
    
    118
    +            self.__monitoring_bus.stop()
    
    119
    +
    
    120
    +        self.__grpc_server.stop(grace)
    
    121
    +
    
    122
    +        if grace > 0:
    
    123
    +            time.sleep(grace)
    
    70 124
     
    
    71 125
         def add_port(self, address, credentials):
    
    72 126
             """Adds a port to the server.
    
    ... ... @@ -80,11 +134,11 @@ class BuildGridServer:
    80 134
             """
    
    81 135
             if credentials is not None:
    
    82 136
                 self.__logger.info("Adding secure connection on: [%s]", address)
    
    83
    -            self._server.add_secure_port(address, credentials)
    
    137
    +            self.__grpc_server.add_secure_port(address, credentials)
    
    84 138
     
    
    85 139
             else:
    
    86 140
                 self.__logger.info("Adding insecure connection on [%s]", address)
    
    87
    -            self._server.add_insecure_port(address)
    
    141
    +            self.__grpc_server.add_insecure_port(address)
    
    88 142
     
    
    89 143
         def add_execution_instance(self, instance, instance_name):
    
    90 144
             """Adds an :obj:`ExecutionInstance` to the service.
    
    ... ... @@ -96,10 +150,11 @@ class BuildGridServer:
    96 150
                 instance_name (str): Instance name.
    
    97 151
             """
    
    98 152
             if self._execution_service is None:
    
    99
    -            self._execution_service = ExecutionService(self._server)
    
    100
    -
    
    153
    +            self._execution_service = ExecutionService(self.__grpc_server)
    
    101 154
             self._execution_service.add_instance(instance_name, instance)
    
    102 155
     
    
    156
    +        self._instances.add(instance_name)
    
    157
    +
    
    103 158
         def add_bots_interface(self, instance, instance_name):
    
    104 159
             """Adds a :obj:`BotsInterface` to the service.
    
    105 160
     
    
    ... ... @@ -110,10 +165,11 @@ class BuildGridServer:
    110 165
                 instance_name (str): Instance name.
    
    111 166
             """
    
    112 167
             if self._bots_service is None:
    
    113
    -            self._bots_service = BotsService(self._server)
    
    114
    -
    
    168
    +            self._bots_service = BotsService(self.__grpc_server)
    
    115 169
             self._bots_service.add_instance(instance_name, instance)
    
    116 170
     
    
    171
    +        self._instances.add(instance_name)
    
    172
    +
    
    117 173
         def add_operations_instance(self, instance, instance_name):
    
    118 174
             """Adds an :obj:`OperationsInstance` to the service.
    
    119 175
     
    
    ... ... @@ -124,8 +180,7 @@ class BuildGridServer:
    124 180
                 instance_name (str): Instance name.
    
    125 181
             """
    
    126 182
             if self._operations_service is None:
    
    127
    -            self._operations_service = OperationsService(self._server)
    
    128
    -
    
    183
    +            self._operations_service = OperationsService(self.__grpc_server)
    
    129 184
             self._operations_service.add_instance(instance_name, instance)
    
    130 185
     
    
    131 186
         def add_reference_storage_instance(self, instance, instance_name):
    
    ... ... @@ -138,8 +193,7 @@ class BuildGridServer:
    138 193
                 instance_name (str): Instance name.
    
    139 194
             """
    
    140 195
             if self._reference_storage_service is None:
    
    141
    -            self._reference_storage_service = ReferenceStorageService(self._server)
    
    142
    -
    
    196
    +            self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
    
    143 197
             self._reference_storage_service.add_instance(instance_name, instance)
    
    144 198
     
    
    145 199
         def add_action_cache_instance(self, instance, instance_name):
    
    ... ... @@ -152,8 +206,7 @@ class BuildGridServer:
    152 206
                 instance_name (str): Instance name.
    
    153 207
             """
    
    154 208
             if self._action_cache_service is None:
    
    155
    -            self._action_cache_service = ActionCacheService(self._server)
    
    156
    -
    
    209
    +            self._action_cache_service = ActionCacheService(self.__grpc_server)
    
    157 210
             self._action_cache_service.add_instance(instance_name, instance)
    
    158 211
     
    
    159 212
         def add_cas_instance(self, instance, instance_name):
    
    ... ... @@ -166,8 +219,7 @@ class BuildGridServer:
    166 219
                 instance_name (str): Instance name.
    
    167 220
             """
    
    168 221
             if self._cas_service is None:
    
    169
    -            self._cas_service = ContentAddressableStorageService(self._server)
    
    170
    -
    
    222
    +            self._cas_service = ContentAddressableStorageService(self.__grpc_server)
    
    171 223
             self._cas_service.add_instance(instance_name, instance)
    
    172 224
     
    
    173 225
         def add_bytestream_instance(self, instance, instance_name):
    
    ... ... @@ -180,6 +232,183 @@ class BuildGridServer:
    180 232
                 instance_name (str): Instance name.
    
    181 233
             """
    
    182 234
             if self._bytestream_service is None:
    
    183
    -            self._bytestream_service = ByteStreamService(self._server)
    
    184
    -
    
    235
    +            self._bytestream_service = ByteStreamService(self.__grpc_server)
    
    185 236
             self._bytestream_service.add_instance(instance_name, instance)
    
    237
    +
    
    238
    +    # --- Public API: Monitoring ---
    
    239
    +
    
    240
    +    @property
    
    241
    +    def is_monitored(self):
    
    242
    +        return self._is_monitored
    
    243
    +
    
    244
    +    # --- Private API ---
    
    245
    +
    
    246
    +    async def _logging_worker(self):
    
    247
    +        """Publishes log records internally."""
    
    248
    +        async def __logging_worker():
    
    249
    +            log_record = await self.__logging_queue.async_q.get()
    
    250
    +
    
    251
    +            # Emit a log record:
    
    252
    +            record = monitoring_pb2.LogRecord()
    
    253
    +            creation_time = datetime.fromtimestamp(log_record.created)
    
    254
    +
    
    255
    +            record.creation_timestamp.FromDatetime(creation_time),
    
    256
    +            record.domain = log_record.name
    
    257
    +            record.level = int(log_record.levelno / 10)
    
    258
    +            record.message = log_record.message
    
    259
    +            # logging.LogRecord.extra must be a str to str dict:
    
    260
    +            if 'extra' in log_record.__dict__ and log_record.extra:
    
    261
    +                record.extra.update(log_record.extra)
    
    262
    +
    
    263
    +            await self.__monitoring_bus.send_record(record)
    
    264
    +
    
    265
    +        try:
    
    266
    +            while True:
    
    267
    +                await __logging_worker()
    
    268
    +
    
    269
    +        except asyncio.CancelledError:
    
    270
    +            pass
    
    271
    +        except BaseException as e:
    
    272
    +             print(f'__logging_worker: {e}')
    
    273
    +
    
    274
    +    async def _monitoring_worker(self, period=1.0):
    
    275
    +        """Periodically publishes state metrics to the monitoring bus."""
    
    276
    +        async def __monitoring_worker():
    
    277
    +            # Emit total clients count record:
    
    278
    +            _, record = self._query_n_clients()
    
    279
    +            await self.__monitoring_bus.send_record(record)
    
    280
    +
    
    281
    +            # Emit total bots count record:
    
    282
    +            _, record = self._query_n_bots()
    
    283
    +            await self.__monitoring_bus.send_record(record)
    
    284
    +
    
    285
    +            queue_times = []
    
    286
    +            # Emits records by instance:
    
    287
    +            for instance_name in self._instances:
    
    288
    +                # Emit instance clients count record:
    
    289
    +                _, record = self._query_n_clients_for_instance(instance_name)
    
    290
    +                await self.__monitoring_bus.send_record(record)
    
    291
    +
    
    292
    +                # Emit instance bots count record:
    
    293
    +                _, record = self._query_n_bots_for_instance(instance_name)
    
    294
    +                await self.__monitoring_bus.send_record(record)
    
    295
    +
    
    296
    +                # Emit instance average queue time record:
    
    297
    +                queue_time, record = self._query_am_queue_time_for_instance(instance_name)
    
    298
    +                await self.__monitoring_bus.send_record(record)
    
    299
    +                if queue_time:
    
    300
    +                    queue_times.append(queue_time)
    
    301
    +
    
    302
    +            # Emit overall average queue time record:
    
    303
    +            record = monitoring_pb2.MetricRecord()
    
    304
    +            if len(queue_times) > 0:
    
    305
    +                am_queue_time = sum(queue_times, timedelta()) / len(queue_times)
    
    306
    +            else:
    
    307
    +                am_queue_time = timedelta()
    
    308
    +
    
    309
    +            record.creation_timestamp.GetCurrentTime()
    
    310
    +            record.domain = MetricRecordDomain.STATE.value
    
    311
    +            record.type = MetricRecordType.TIMER.value
    
    312
    +            record.name = 'average-queue-time'
    
    313
    +            record.duration.FromTimedelta(am_queue_time)
    
    314
    +
    
    315
    +            await self.__monitoring_bus.send_record(record)
    
    316
    +
    
    317
    +            print('---')
    
    318
    +            n_clients = self._execution_service.query_n_clients()
    
    319
    +            n_bots = self._bots_service.query_n_bots()
    
    320
    +            print('Totals: n_clients={}, n_bots={}, am_queue_time={}'
    
    321
    +                  .format(n_clients, n_bots, am_queue_time))
    
    322
    +            print('Per instances:')
    
    323
    +            for instance_name in self._instances:
    
    324
    +                n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
    
    325
    +                n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
    
    326
    +                am_queue_time = self._execution_service.get_scheduler(instance_name).query_am_queue_time()
    
    327
    +                instance_name = instance_name or 'empty'
    
    328
    +                print(' - {}: n_clients={}, n_bots={}, am_queue_time={}'
    
    329
    +                      .format(instance_name, n_clients, n_bots, am_queue_time))
    
    330
    +            print('---')
    
    331
    +
    
    332
    +        try:
    
    333
    +            while True:
    
    334
    +                start = time.time()
    
    335
    +                await __monitoring_worker()
    
    336
    +
    
    337
    +                end = time.time()
    
    338
    +                await asyncio.sleep(period - (end - start))
    
    339
    +
    
    340
    +        except asyncio.CancelledError:
    
    341
    +            pass
    
    342
    +        except BaseException as e:
    
    343
    +             print(f'__monitoring_worker: {e}')
    
    344
    +
    
    345
    +    # --- Private API: Monitoring ---
    
    346
    +
    
    347
    +    def _query_n_clients(self):
    
    348
    +        """Queries the number of clients connected."""
    
    349
    +        record = monitoring_pb2.MetricRecord()
    
    350
    +        n_clients = self._execution_service.query_n_clients()
    
    351
    +
    
    352
    +        record.creation_timestamp.GetCurrentTime()
    
    353
    +        record.domain = MetricRecordDomain.STATE.value
    
    354
    +        record.type = MetricRecordType.COUNTER.value
    
    355
    +        record.name = 'clients-count'
    
    356
    +        record.count = n_clients
    
    357
    +
    
    358
    +        return n_clients, record
    
    359
    +
    
    360
    +    def _query_n_clients_for_instance(self, instance_name):
    
    361
    +        """Queries the number of clients connected for a given instance"""
    
    362
    +        record = monitoring_pb2.MetricRecord()
    
    363
    +        n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
    
    364
    +
    
    365
    +        record.creation_timestamp.GetCurrentTime()
    
    366
    +        record.domain = MetricRecordDomain.STATE.value
    
    367
    +        record.type = MetricRecordType.COUNTER.value
    
    368
    +        record.name = 'clients-count'
    
    369
    +        record.count = n_clients
    
    370
    +        record.extra['instance-name'] = instance_name or 'void'
    
    371
    +
    
    372
    +        return n_clients, record
    
    373
    +
    
    374
    +    def _query_n_bots(self):
    
    375
    +        """Queries the number of bots connected."""
    
    376
    +        record = monitoring_pb2.MetricRecord()
    
    377
    +        n_bots = self._bots_service.query_n_bots()
    
    378
    +
    
    379
    +        record.creation_timestamp.GetCurrentTime()
    
    380
    +        record.domain = MetricRecordDomain.STATE.value
    
    381
    +        record.type = MetricRecordType.COUNTER.value
    
    382
    +        record.name = 'bots-count'
    
    383
    +        record.count = n_bots
    
    384
    +
    
    385
    +        return n_bots, record
    
    386
    +
    
    387
    +    def _query_n_bots_for_instance(self, instance_name):
    
    388
    +        """Queries the number of bots connected for a given instance."""
    
    389
    +        record = monitoring_pb2.MetricRecord()
    
    390
    +        n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
    
    391
    +
    
    392
    +        record.creation_timestamp.GetCurrentTime()
    
    393
    +        record.domain = MetricRecordDomain.STATE.value
    
    394
    +        record.type = MetricRecordType.COUNTER.value
    
    395
    +        record.name = 'bots-count'
    
    396
    +        record.count = n_bots
    
    397
    +        record.extra['instance-name'] = instance_name or 'void'
    
    398
    +
    
    399
    +        return n_bots, record
    
    400
    +
    
    401
    +    def _query_am_queue_time_for_instance(self, instance_name):
    
    402
    +        """Queries the average job's queue time for a given instance."""
    
    403
    +        record = monitoring_pb2.MetricRecord()
    
    404
    +        instance_scheduler = self._execution_service.get_scheduler(instance_name)
    
    405
    +        am_queue_time = instance_scheduler.query_am_queue_time()
    
    406
    +
    
    407
    +        record.creation_timestamp.GetCurrentTime()
    
    408
    +        record.domain = MetricRecordDomain.STATE.value
    
    409
    +        record.type = MetricRecordType.TIMER.value
    
    410
    +        record.name = 'average-queue-time'
    
    411
    +        record.duration.FromTimedelta(am_queue_time)
    
    412
    +        record.extra['instance-name'] = instance_name or 'void'
    
    413
    +
    
    414
    +        return am_queue_time, record

  • buildgrid/server/job.py
    ... ... @@ -13,10 +13,11 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +from datetime import datetime
    
    16 17
     import logging
    
    17 18
     import uuid
    
    18 19
     
    
    19
    -from google.protobuf import timestamp_pb2
    
    20
    +from google.protobuf import duration_pb2, timestamp_pb2
    
    20 21
     
    
    21 22
     from buildgrid._enums import LeaseState, OperationStage
    
    22 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    ... ... @@ -37,6 +38,7 @@ class Job:
    37 38
             self.__execute_response = None
    
    38 39
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    39 40
             self.__queued_timestamp = timestamp_pb2.Timestamp()
    
    41
    +        self.__queued_time_duration = duration_pb2.Duration()
    
    40 42
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    41 43
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    42 44
     
    
    ... ... @@ -50,6 +52,8 @@ class Job:
    50 52
             self._operation.done = False
    
    51 53
             self._n_tries = 0
    
    52 54
     
    
    55
    +    # --- Public API ---
    
    56
    +
    
    53 57
         @property
    
    54 58
         def name(self):
    
    55 59
             return self._name
    
    ... ... @@ -179,7 +183,7 @@ class Job:
    179 183
                     result.Unpack(action_result)
    
    180 184
     
    
    181 185
                 action_metadata = action_result.execution_metadata
    
    182
    -            action_metadata.queued_timestamp.CopyFrom(self.__worker_start_timestamp)
    
    186
    +            action_metadata.queued_timestamp.CopyFrom(self.__queued_timestamp)
    
    183 187
                 action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp)
    
    184 188
                 action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp)
    
    185 189
     
    
    ... ... @@ -204,6 +208,10 @@ class Job:
    204 208
                     self.__queued_timestamp.GetCurrentTime()
    
    205 209
                 self._n_tries += 1
    
    206 210
     
    
    211
    +        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    212
    +            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    213
    +            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    214
    +
    
    207 215
             elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    208 216
                 if self.__execute_response is not None:
    
    209 217
                     self._operation.response.Pack(self.__execute_response)
    
    ... ... @@ -213,3 +221,11 @@ class Job:
    213 221
     
    
    214 222
             for queue in self._operation_update_queues:
    
    215 223
                 queue.put(self._operation)
    
    224
    +
    
    225
    +    # --- Public API: Monitoring ---
    
    226
    +
    
    227
    +    def query_queue_time(self):
    
    228
    +        return self.__queued_time_duration.ToTimedelta()
    
    229
    +
    
    230
    +    def query_n_retries(self):
    
    231
    +        return self._n_tries - 1 if self._n_tries > 0 else 0

  • buildgrid/server/operations/instance.py
    ... ... @@ -32,6 +32,10 @@ class OperationsInstance:
    32 32
     
    
    33 33
             self._scheduler = scheduler
    
    34 34
     
    
    35
    +    @property
    
    36
    +    def scheduler(self):
    
    37
    +        return self._scheduler
    
    38
    +
    
    35 39
         def register_instance_with_server(self, instance_name, server):
    
    36 40
             server.add_operations_instance(self, instance_name)
    
    37 41
     
    

  • buildgrid/server/operations/service.py
    ... ... @@ -38,8 +38,34 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    38 38
     
    
    39 39
             operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
    
    40 40
     
    
    41
    -    def add_instance(self, name, instance):
    
    42
    -        self._instances[name] = instance
    
    41
    +    # --- Public API ---
    
    42
    +
    
    43
    +    def add_instance(self, instance_name, instance):
    
    44
    +        """Registers a new servicer instance.
    
    45
    +
    
    46
    +        Args:
    
    47
    +            instance_name (str): The new instance's name.
    
    48
    +            instance (OperationsInstance): The new instance itself.
    
    49
    +        """
    
    50
    +        self._instances[instance_name] = instance
    
    51
    +
    
    52
    +    def get_scheduler(self, instance_name):
    
    53
    +        """Retrieves a reference to the scheduler for an instance.
    
    54
    +
    
    55
    +        Args:
    
    56
    +            instance_name (str): The name of the instance to query.
    
    57
    +
    
    58
    +        Returns:
    
    59
    +            Scheduler: A reference to the scheduler for `instance_name`.
    
    60
    +
    
    61
    +        Raises:
    
    62
    +            InvalidArgumentError: If no instance named `instance_name` exists.
    
    63
    +        """
    
    64
    +        instance = self._get_instance(instance_name)
    
    65
    +
    
    66
    +        return instance.scheduler
    
    67
    +
    
    68
    +    # --- Public API: Servicer ---
    
    43 69
     
    
    44 70
         def GetOperation(self, request, context):
    
    45 71
             self.__logger.debug("GetOperation request from [%s]", context.peer())
    
    ... ... @@ -132,6 +158,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    132 158
     
    
    133 159
             return Empty()
    
    134 160
     
    
    161
    +    # --- Private API ---
    
    162
    +
    
    135 163
         def _parse_instance_name(self, name):
    
    136 164
             """ If the instance name is not blank, 'name' will have the form
    
    137 165
             {instance_name}/{operation_uuid}. Otherwise, it will just be
    

  • buildgrid/server/scheduler.py
    ... ... @@ -20,24 +20,37 @@ Schedules jobs.
    20 20
     """
    
    21 21
     
    
    22 22
     from collections import deque
    
    23
    +from datetime import timedelta
    
    23 24
     import logging
    
    24 25
     
    
    26
    +from buildgrid._enums import LeaseState, OperationStage
    
    25 27
     from buildgrid._exceptions import NotFoundError
    
    26 28
     
    
    27
    -from .job import OperationStage, LeaseState
    
    28
    -
    
    29 29
     
    
    30 30
     class Scheduler:
    
    31 31
     
    
    32 32
         MAX_N_TRIES = 5
    
    33 33
     
    
    34
    -    def __init__(self, action_cache=None):
    
    34
    +    def __init__(self, action_cache=None, monitor=True):
    
    35 35
             self.__logger = logging.getLogger(__name__)
    
    36 36
     
    
    37
    +        self.__queue_times_by_priority = None
    
    38
    +        self.__queue_time_average = 0, timedelta()
    
    39
    +        self.__retries_by_error = None
    
    40
    +        self.__retries_count = 0
    
    41
    +
    
    37 42
             self._action_cache = action_cache
    
    38 43
             self.jobs = {}
    
    39 44
             self.queue = deque()
    
    40 45
     
    
    46
    +        self._is_monitored = monitor
    
    47
    +
    
    48
    +        if self._is_monitored:
    
    49
    +            self.__queue_times_by_priority = {}
    
    50
    +            self.__retries_by_error = {}
    
    51
    +
    
    52
    +    # --- Public API ---
    
    53
    +
    
    41 54
         def register_client(self, job_name, queue):
    
    42 55
             self.jobs[job_name].register_client(queue)
    
    43 56
     
    
    ... ... @@ -66,18 +79,22 @@ class Scheduler:
    66 79
                 operation_stage = OperationStage.QUEUED
    
    67 80
                 self.queue.append(job)
    
    68 81
     
    
    69
    -        job.update_operation_stage(operation_stage)
    
    82
    +        self._update_job_operation_stage(job, operation_stage)
    
    70 83
     
    
    71 84
         def retry_job(self, job_name):
    
    72
    -        if job_name in self.jobs:
    
    73
    -            job = self.jobs[job_name]
    
    74
    -            if job.n_tries >= self.MAX_N_TRIES:
    
    75
    -                # TODO: Decide what to do with these jobs
    
    76
    -                job.update_operation_stage(OperationStage.COMPLETED)
    
    77
    -                # TODO: Mark these jobs as done
    
    78
    -            else:
    
    79
    -                job.update_operation_stage(OperationStage.QUEUED)
    
    80
    -                self.queue.appendleft(job)
    
    85
    +        job = self.jobs[job_name]
    
    86
    +
    
    87
    +        operation_stage = None
    
    88
    +        if job.n_tries >= self.MAX_N_TRIES:
    
    89
    +            # TODO: Decide what to do with these jobs
    
    90
    +            operation_stage = OperationStage.COMPLETED
    
    91
    +            # TODO: Mark these jobs as done
    
    92
    +
    
    93
    +        else:
    
    94
    +            operation_stage = OperationStage.QUEUED
    
    95
    +            self.queue.appendleft(job)
    
    96
    +
    
    97
    +        self._update_job_operation_stage(job, operation_stage)
    
    81 98
     
    
    82 99
         def list_jobs(self):
    
    83 100
             return self.jobs.values()
    
    ... ... @@ -112,13 +129,14 @@ class Scheduler:
    112 129
             """
    
    113 130
             job = self.jobs[job_name]
    
    114 131
     
    
    132
    +        operation_stage = None
    
    115 133
             if lease_state == LeaseState.PENDING:
    
    116 134
                 job.update_lease_state(LeaseState.PENDING)
    
    117
    -            job.update_operation_stage(OperationStage.QUEUED)
    
    135
    +            operation_stage = OperationStage.QUEUED
    
    118 136
     
    
    119 137
             elif lease_state == LeaseState.ACTIVE:
    
    120 138
                 job.update_lease_state(LeaseState.ACTIVE)
    
    121
    -            job.update_operation_stage(OperationStage.EXECUTING)
    
    139
    +            operation_stage = OperationStage.EXECUTING
    
    122 140
     
    
    123 141
             elif lease_state == LeaseState.COMPLETED:
    
    124 142
                 job.update_lease_state(LeaseState.COMPLETED,
    
    ... ... @@ -127,7 +145,9 @@ class Scheduler:
    127 145
                 if self._action_cache is not None and not job.do_not_cache:
    
    128 146
                     self._action_cache.update_action_result(job.action_digest, job.action_result)
    
    129 147
     
    
    130
    -            job.update_operation_stage(OperationStage.COMPLETED)
    
    148
    +            operation_stage = OperationStage.COMPLETED
    
    149
    +
    
    150
    +        self._update_job_operation_stage(job, operation_stage)
    
    131 151
     
    
    132 152
         def get_job_lease(self, job_name):
    
    133 153
             """Returns the lease associated to job, if any have been emitted yet."""
    
    ... ... @@ -136,3 +156,60 @@ class Scheduler:
    136 156
         def get_job_operation(self, job_name):
    
    137 157
             """Returns the operation associated to job."""
    
    138 158
             return self.jobs[job_name].operation
    
    159
    +
    
    160
    +    # --- Public API: Monitoring ---
    
    161
    +
    
    162
    +    @property
    
    163
    +    def is_monitored(self):
    
    164
    +        return self._is_monitored
    
    165
    +
    
    166
    +    def query_n_jobs(self):
    
    167
    +        return len(self.jobs)
    
    168
    +
    
    169
    +    def query_n_operations(self):
    
    170
    +        return len(self.jobs)
    
    171
    +
    
    172
    +    def query_n_operations_by_stage(self):
    
    173
    +        return len(self.jobs)
    
    174
    +
    
    175
    +    def query_n_leases(self):
    
    176
    +        return len(self.jobs)
    
    177
    +
    
    178
    +    def query_n_leases_by_state(self):
    
    179
    +        return len(self.jobs)
    
    180
    +
    
    181
    +    def query_n_retries(self):
    
    182
    +        return self.__retries_count
    
    183
    +
    
    184
    +    def query_n_retries_for_error(self, error_type):
    
    185
    +        try:
    
    186
    +            return self.__retries_by_error[error_type]
    
    187
    +        except KeyError:
    
    188
    +            return 0
    
    189
    +
    
    190
    +    def query_am_queue_time(self):
    
    191
    +        return self.__queue_time_average[1]
    
    192
    +
    
    193
    +    def query_am_queue_time_for_priority(self, priority_level):
    
    194
    +        try:
    
    195
    +            return self.__queue_times_by_priority[priority_level]
    
    196
    +        except KeyError:
    
    197
    +            return 0
    
    198
    +
    
    199
    +    # --- Private API ---
    
    200
    +
    
    201
    +    def _update_job_operation_stage(self, job, stage):
    
    202
    +        job.update_operation_stage(stage)
    
    203
    +
    
    204
    +        if self._is_monitored and stage == OperationStage.COMPLETED:
    
    205
    +            average_order, average_time = self.__queue_time_average
    
    206
    +
    
    207
    +            average_order += 1
    
    208
    +            if average_order <= 1:
    
    209
    +                average_time = job.query_queue_time()
    
    210
    +
    
    211
    +            else:
    
    212
    +                queue_time = job.query_queue_time()
    
    213
    +                average_time = average_time + ((queue_time - average_time) / average_order)
    
    214
    +
    
    215
    +            self.__queue_time_average = average_order, average_time

  • buildgrid/settings.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    1 16
     import hashlib
    
    2 17
     
    
    3 18
     
    
    4
    -# The hash function that CAS uses
    
    19
    +# Hash function used for computing digests:
    
    5 20
     HASH = hashlib.sha256
    
    21
    +
    
    22
    +# Lenght in bytes of a hash string returned by HASH:
    
    6 23
     HASH_LENGTH = HASH().digest_size * 2
    
    24
    +
    
    25
    +# Period, in seconds, for the monitoring cycle:
    
    26
    +MONITORING_PERIOD = 5.0
    
    27
    +
    
    28
    +# String format for log records:
    
    29
    +LOG_RECORD_FORMAT = '%(asctime)s:%(name)36.36s][%(levelname)5.5s]: %(message)s'
    
    30
    +# The different log record attributes are documented here:
    
    31
    +# https://docs.python.org/3/library/logging.html#logrecord-attributes

  • setup.py
    ... ... @@ -112,13 +112,15 @@ setup(
    112 112
         license="Apache License, Version 2.0",
    
    113 113
         description="A remote execution service",
    
    114 114
         packages=find_packages(),
    
    115
    +    python_requires='>= 3.5.3',  # janus requirement
    
    115 116
         install_requires=[
    
    116
    -        'protobuf',
    
    117
    -        'grpcio',
    
    118
    -        'Click',
    
    119
    -        'PyYAML',
    
    120 117
             'boto3 < 1.8.0',
    
    121 118
             'botocore < 1.11.0',
    
    119
    +        'click',
    
    120
    +        'grpcio',
    
    121
    +        'janus',
    
    122
    +        'protobuf',
    
    123
    +        'pyyaml',
    
    122 124
         ],
    
    123 125
         entry_points={
    
    124 126
             'console_scripts': [
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]