Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid
Commits:
- 
49586d88
by Martin Blanchard at 2018-11-30T17:19:40Z
- 
5673b009
by Martin Blanchard at 2018-11-30T17:20:19Z
- 
b763ec5f
by Martin Blanchard at 2018-11-30T17:20:19Z
- 
76459e0a
by Martin Blanchard at 2018-11-30T17:20:19Z
- 
94bf76a7
by Martin Blanchard at 2018-11-30T17:20:19Z
- 
8f5d71c5
by Martin Blanchard at 2018-11-30T17:20:19Z
- 
d0a84116
by Raoul Hidalgo Charman at 2018-12-05T10:00:46Z
- 
0d241b27
by Raoul Hidalgo Charman at 2018-12-05T10:02:34Z
- 
a0ada9f4
by Raoul Hidalgo Charman at 2018-12-05T10:02:34Z
- 
2f86fb10
by Raoul Hidalgo Charman at 2018-12-05T10:02:34Z
17 changed files:
- .gitlab-ci.yml
- buildgrid/_app/cli.py
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/_app/commands/cmd_server.py
- buildgrid/bot/bot.py
- buildgrid/bot/interface.py
- buildgrid/bot/session.py
- buildgrid/bot/tenantmanager.py
- buildgrid/server/_monitoring.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/instance.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- tests/integration/bot_session.py
- tests/integration/bots_service.py
- tests/utils/bots_interface.py
Changes:
| ... | ... | @@ -2,7 +2,7 @@ | 
| 2 | 2 |  image: python:3.5-stretch
 | 
| 3 | 3 |  | 
| 4 | 4 |  variables:
 | 
| 5 | -  BGD: bgd --verbose
 | |
| 5 | +  BGD: bgd
 | |
| 6 | 6 |  | 
| 7 | 7 |  stages:
 | 
| 8 | 8 |    - test
 | 
| ... | ... | @@ -23,10 +23,12 @@ will be attempted to be imported. | 
| 23 | 23 |  | 
| 24 | 24 |  import logging
 | 
| 25 | 25 |  import os
 | 
| 26 | +import sys
 | |
| 26 | 27 |  | 
| 27 | 28 |  import click
 | 
| 28 | 29 |  import grpc
 | 
| 29 | 30 |  | 
| 31 | +from buildgrid.settings import LOG_RECORD_FORMAT
 | |
| 30 | 32 |  from buildgrid.utils import read_file
 | 
| 31 | 33 |  | 
| 32 | 34 |  CONTEXT_SETTINGS = dict(auto_envvar_prefix='BUILDGRID')
 | 
| ... | ... | @@ -138,28 +140,71 @@ class BuildGridCLI(click.MultiCommand): | 
| 138 | 140 |          return mod.cli
 | 
| 139 | 141 |  | 
| 140 | 142 |  | 
| 143 | +class DebugFilter(logging.Filter):
 | |
| 144 | + | |
| 145 | +    def __init__(self, debug_domains, name=''):
 | |
| 146 | +        super().__init__(name=name)
 | |
| 147 | +        self.__domains_tree = {}
 | |
| 148 | + | |
| 149 | +        for domain in debug_domains.split(':'):
 | |
| 150 | +            domains_tree = self.__domains_tree
 | |
| 151 | +            for label in domain.split('.'):
 | |
| 152 | +                if all(key not in domains_tree for key in [label, '*']):
 | |
| 153 | +                    domains_tree[label] = {}
 | |
| 154 | +                domains_tree = domains_tree[label]
 | |
| 155 | + | |
| 156 | +    def filter(self, record):
 | |
| 157 | +        domains_tree, last_match = self.__domains_tree, None
 | |
| 158 | +        for label in record.name.split('.'):
 | |
| 159 | +            if all(key not in domains_tree for key in [label, '*']):
 | |
| 160 | +                return False
 | |
| 161 | +            last_match = label if label in domains_tree else '*'
 | |
| 162 | +            domains_tree = domains_tree[last_match]
 | |
| 163 | +        if domains_tree and '*' not in domains_tree:
 | |
| 164 | +            return False
 | |
| 165 | +        return True
 | |
| 166 | + | |
| 167 | + | |
| 168 | +def setup_logging(verbosity=0, debug_mode=False):
 | |
| 169 | +    """Deals with loggers verbosity"""
 | |
| 170 | +    asyncio_logger = logging.getLogger('asyncio')
 | |
| 171 | +    root_logger = logging.getLogger()
 | |
| 172 | + | |
| 173 | +    log_handler = logging.StreamHandler(stream=sys.stdout)
 | |
| 174 | +    for log_filter in root_logger.filters:
 | |
| 175 | +        log_handler.addFilter(log_filter)
 | |
| 176 | + | |
| 177 | +    logging.basicConfig(format=LOG_RECORD_FORMAT, handlers=[log_handler])
 | |
| 178 | + | |
| 179 | +    if verbosity == 1:
 | |
| 180 | +        root_logger.setLevel(logging.WARNING)
 | |
| 181 | +    elif verbosity == 2:
 | |
| 182 | +        root_logger.setLevel(logging.INFO)
 | |
| 183 | +    elif verbosity >= 3:
 | |
| 184 | +        root_logger.setLevel(logging.DEBUG)
 | |
| 185 | +    else:
 | |
| 186 | +        root_logger.setLevel(logging.ERROR)
 | |
| 187 | + | |
| 188 | +    if not debug_mode:
 | |
| 189 | +        asyncio_logger.setLevel(logging.CRITICAL)
 | |
| 190 | +    else:
 | |
| 191 | +        asyncio_logger.setLevel(logging.DEBUG)
 | |
| 192 | +        root_logger.setLevel(logging.DEBUG)
 | |
| 193 | + | |
| 194 | + | |
| 141 | 195 |  @click.command(cls=BuildGridCLI, context_settings=CONTEXT_SETTINGS)
 | 
| 142 | -@click.option('-v', '--verbose', count=True,
 | |
| 143 | -              help='Increase log verbosity level.')
 | |
| 144 | 196 |  @pass_context
 | 
| 145 | -def cli(context, verbose):
 | |
| 197 | +def cli(context):
 | |
| 146 | 198 |      """BuildGrid App"""
 | 
| 147 | -    logger = logging.getLogger()
 | |
| 199 | +    root_logger = logging.getLogger()
 | |
| 148 | 200 |  | 
| 149 | 201 |      # Clean-up root logger for any pre-configuration:
 | 
| 150 | -    for log_handler in logger.handlers[:]:
 | |
| 151 | -        logger.removeHandler(log_handler)
 | |
| 152 | -    for log_filter in logger.filters[:]:
 | |
| 153 | -        logger.removeFilter(log_filter)
 | |
| 154 | - | |
| 155 | -    logging.basicConfig(
 | |
| 156 | -        format='%(asctime)s:%(name)32.32s][%(levelname)5.5s]: %(message)s')
 | |
| 157 | - | |
| 158 | -    if verbose == 1:
 | |
| 159 | -        logger.setLevel(logging.WARNING)
 | |
| 160 | -    elif verbose == 2:
 | |
| 161 | -        logger.setLevel(logging.INFO)
 | |
| 162 | -    elif verbose >= 3:
 | |
| 163 | -        logger.setLevel(logging.DEBUG)
 | |
| 164 | -    else:
 | |
| 165 | -        logger.setLevel(logging.ERROR) | |
| 202 | +    for log_handler in root_logger.handlers[:]:
 | |
| 203 | +        root_logger.removeHandler(log_handler)
 | |
| 204 | +    for log_filter in root_logger.filters[:]:
 | |
| 205 | +        root_logger.removeFilter(log_filter)
 | |
| 206 | + | |
| 207 | +    # Filter debug messages using BGD_MESSAGE_DEBUG value:
 | |
| 208 | +    debug_domains = os.environ.get('BGD_MESSAGE_DEBUG', None)
 | |
| 209 | +    if debug_domains:
 | |
| 210 | +        root_logger.addFilter(DebugFilter(debug_domains)) | 
| ... | ... | @@ -32,9 +32,8 @@ from buildgrid.bot.hardware.interface import HardwareInterface | 
| 32 | 32 |  from buildgrid.bot.hardware.device import Device
 | 
| 33 | 33 |  from buildgrid.bot.hardware.worker import Worker
 | 
| 34 | 34 |  | 
| 35 | - | |
| 36 | 35 |  from ..bots import buildbox, dummy, host
 | 
| 37 | -from ..cli import pass_context
 | |
| 36 | +from ..cli import pass_context, setup_logging
 | |
| 38 | 37 |  | 
| 39 | 38 |  | 
| 40 | 39 |  @click.group(name='bot', short_help="Create and register bot clients.")
 | 
| ... | ... | @@ -54,19 +53,21 @@ from ..cli import pass_context | 
| 54 | 53 |                help="Public CAS client certificate for TLS (PEM-encoded)")
 | 
| 55 | 54 |  @click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
 | 
| 56 | 55 |                help="Public CAS server certificate for TLS (PEM-encoded)")
 | 
| 57 | -@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
 | |
| 56 | +@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
 | |
| 58 | 57 |                help="Time period for bot updates to the server in seconds.")
 | 
| 59 | 58 |  @click.option('--parent', type=click.STRING, default='main', show_default=True,
 | 
| 60 | 59 |                help="Targeted farm resource.")
 | 
| 60 | +@click.option('-v', '--verbose', count=True,
 | |
| 61 | +              help='Increase log verbosity level.')
 | |
| 61 | 62 |  @pass_context
 | 
| 62 | 63 |  def cli(context, parent, update_period, remote, client_key, client_cert, server_cert,
 | 
| 63 | -        remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
 | |
| 64 | +        remote_cas, cas_client_key, cas_client_cert, cas_server_cert, verbose):
 | |
| 65 | +    setup_logging(verbosity=verbose)
 | |
| 64 | 66 |      # Setup the remote execution server channel:
 | 
| 65 | 67 |      url = urlparse(remote)
 | 
| 66 | 68 |  | 
| 67 | 69 |      context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
 | 
| 68 | 70 |      context.remote_url = remote
 | 
| 69 | -    context.update_period = update_period
 | |
| 70 | 71 |      context.parent = parent
 | 
| 71 | 72 |  | 
| 72 | 73 |      if url.scheme == 'http':
 | 
| ... | ... | @@ -124,7 +125,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ | 
| 124 | 125 |  | 
| 125 | 126 |      click.echo("Starting for remote=[{}]".format(context.remote))
 | 
| 126 | 127 |  | 
| 127 | -    bot_interface = interface.BotInterface(context.channel)
 | |
| 128 | +    bot_interface = interface.BotInterface(context.channel, update_period)
 | |
| 128 | 129 |      worker = Worker()
 | 
| 129 | 130 |      worker.add_device(Device())
 | 
| 130 | 131 |      hardware_interface = HardwareInterface(worker)
 | 
| ... | ... | @@ -142,7 +143,7 @@ def run_dummy(context): | 
| 142 | 143 |      try:
 | 
| 143 | 144 |          bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
 | 
| 144 | 145 |                                           dummy.work_dummy, context)
 | 
| 145 | -        b = bot.Bot(bot_session, context.update_period)
 | |
| 146 | +        b = bot.Bot(bot_session)
 | |
| 146 | 147 |          b.session()
 | 
| 147 | 148 |      except KeyboardInterrupt:
 | 
| 148 | 149 |          pass
 | 
| ... | ... | @@ -158,7 +159,7 @@ def run_host_tools(context): | 
| 158 | 159 |      try:
 | 
| 159 | 160 |          bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
 | 
| 160 | 161 |                                           host.work_host_tools, context)
 | 
| 161 | -        b = bot.Bot(bot_session, context.update_period)
 | |
| 162 | +        b = bot.Bot(bot_session)
 | |
| 162 | 163 |          b.session()
 | 
| 163 | 164 |      except KeyboardInterrupt:
 | 
| 164 | 165 |          pass
 | 
| ... | ... | @@ -180,7 +181,7 @@ def run_buildbox(context, local_cas, fuse_dir): | 
| 180 | 181 |      try:
 | 
| 181 | 182 |          bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
 | 
| 182 | 183 |                                           buildbox.work_buildbox, context)
 | 
| 183 | -        b = bot.Bot(bot_session, context.update_period)
 | |
| 184 | +        b = bot.Bot(bot_session)
 | |
| 184 | 185 |          b.session()
 | 
| 185 | 186 |      except KeyboardInterrupt:
 | 
| 186 | 187 |          pass | 
| ... | ... | @@ -26,7 +26,7 @@ import click | 
| 26 | 26 |  | 
| 27 | 27 |  from buildgrid.server.instance import BuildGridServer
 | 
| 28 | 28 |  | 
| 29 | -from ..cli import pass_context
 | |
| 29 | +from ..cli import pass_context, setup_logging
 | |
| 30 | 30 |  from ..settings import parser
 | 
| 31 | 31 |  | 
| 32 | 32 |  | 
| ... | ... | @@ -37,9 +37,14 @@ def cli(context): | 
| 37 | 37 |  | 
| 38 | 38 |  | 
| 39 | 39 |  @cli.command('start', short_help="Setup a new server instance.")
 | 
| 40 | -@click.argument('CONFIG', type=click.Path(file_okay=True, dir_okay=False, writable=False))
 | |
| 40 | +@click.argument('CONFIG',
 | |
| 41 | +                type=click.Path(file_okay=True, dir_okay=False, writable=False))
 | |
| 42 | +@click.option('-v', '--verbose', count=True,
 | |
| 43 | +              help='Increase log verbosity level.')
 | |
| 41 | 44 |  @pass_context
 | 
| 42 | -def start(context, config):
 | |
| 45 | +def start(context, config, verbose):
 | |
| 46 | +    setup_logging(verbosity=verbose)
 | |
| 47 | + | |
| 43 | 48 |      with open(config) as f:
 | 
| 44 | 49 |          settings = parser.get_parser().safe_load(f)
 | 
| 45 | 50 |  | 
| ... | ... | @@ -20,14 +20,12 @@ import logging | 
| 20 | 20 |  class Bot:
 | 
| 21 | 21 |      """Creates a local BotSession."""
 | 
| 22 | 22 |  | 
| 23 | -    def __init__(self, bot_session, update_period=1):
 | |
| 23 | +    def __init__(self, bot_session):
 | |
| 24 | 24 |          """
 | 
| 25 | 25 |          """
 | 
| 26 | 26 |          self.__logger = logging.getLogger(__name__)
 | 
| 27 | 27 |  | 
| 28 | 28 |          self.__bot_session = bot_session
 | 
| 29 | -        self.__update_period = update_period
 | |
| 30 | - | |
| 31 | 29 |          self.__loop = None
 | 
| 32 | 30 |  | 
| 33 | 31 |      def session(self):
 | 
| ... | ... | @@ -37,7 +35,7 @@ class Bot: | 
| 37 | 35 |          self.__bot_session.create_bot_session()
 | 
| 38 | 36 |  | 
| 39 | 37 |          try:
 | 
| 40 | -            task = asyncio.ensure_future(self.__update_bot_session())
 | |
| 38 | +            task = asyncio.ensure_future(self.__bot_session.run())
 | |
| 41 | 39 |              self.__loop.run_until_complete(task)
 | 
| 42 | 40 |  | 
| 43 | 41 |          except KeyboardInterrupt:
 | 
| ... | ... | @@ -46,16 +44,6 @@ class Bot: | 
| 46 | 44 |          self.__kill_everyone()
 | 
| 47 | 45 |          self.__logger.info("Bot shutdown.")
 | 
| 48 | 46 |  | 
| 49 | -    async def __update_bot_session(self):
 | |
| 50 | -        """Calls the server periodically to inform the server the client has not died."""
 | |
| 51 | -        try:
 | |
| 52 | -            while True:
 | |
| 53 | -                self.__bot_session.update_bot_session()
 | |
| 54 | -                await asyncio.sleep(self.__update_period)
 | |
| 55 | - | |
| 56 | -        except asyncio.CancelledError:
 | |
| 57 | -            pass
 | |
| 58 | - | |
| 59 | 47 |      def __kill_everyone(self):
 | 
| 60 | 48 |          """Cancels and waits for them to stop."""
 | 
| 61 | 49 |          self.__logger.info("Cancelling remaining tasks...")
 | 
| ... | ... | @@ -24,6 +24,7 @@ import logging | 
| 24 | 24 |  import grpc
 | 
| 25 | 25 |  | 
| 26 | 26 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
 | 
| 27 | +from buildgrid.settings import NETWORK_TIMEOUT
 | |
| 27 | 28 |  | 
| 28 | 29 |  | 
| 29 | 30 |  class BotInterface:
 | 
| ... | ... | @@ -31,28 +32,32 @@ class BotInterface: | 
| 31 | 32 |      Interface handles calls to the server.
 | 
| 32 | 33 |      """
 | 
| 33 | 34 |  | 
| 34 | -    def __init__(self, channel):
 | |
| 35 | +    def __init__(self, channel, interval):
 | |
| 35 | 36 |          self.__logger = logging.getLogger(__name__)
 | 
| 36 | 37 |  | 
| 37 | 38 |          self._stub = bots_pb2_grpc.BotsStub(channel)
 | 
| 39 | +        self.__interval = interval
 | |
| 40 | + | |
| 41 | +    @property
 | |
| 42 | +    def interval(self):
 | |
| 43 | +        return self.__interval
 | |
| 38 | 44 |  | 
| 39 | 45 |      def create_bot_session(self, parent, bot_session):
 | 
| 46 | +        """ Creates a bot session returning a grpc StatusCode if it failed """
 | |
| 40 | 47 |          request = bots_pb2.CreateBotSessionRequest(parent=parent,
 | 
| 41 | 48 |                                                     bot_session=bot_session)
 | 
| 42 | -        try:
 | |
| 43 | -            return self._stub.CreateBotSession(request)
 | |
| 44 | - | |
| 45 | -        except grpc.RpcError as e:
 | |
| 46 | -            self.__logger.error(e)
 | |
| 47 | -            raise
 | |
| 49 | +        return self._bot_call(self._stub.CreateBotSession, request)
 | |
| 48 | 50 |  | 
| 49 | 51 |      def update_bot_session(self, bot_session, update_mask=None):
 | 
| 52 | +        """ Updates a bot session returning a grpc StatusCode if it failed """
 | |
| 50 | 53 |          request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
 | 
| 51 | 54 |                                                     bot_session=bot_session,
 | 
| 52 | 55 |                                                     update_mask=update_mask)
 | 
| 53 | -        try:
 | |
| 54 | -            return self._stub.UpdateBotSession(request)
 | |
| 56 | +        return self._bot_call(self._stub.UpdateBotSession, request)
 | |
| 55 | 57 |  | 
| 58 | +    def _bot_call(self, call, request):
 | |
| 59 | +        try:
 | |
| 60 | +            return call(request, timeout=self.interval + NETWORK_TIMEOUT)
 | |
| 56 | 61 |          except grpc.RpcError as e:
 | 
| 57 | -            self.__logger.error(e)
 | |
| 58 | -            raise | |
| 62 | +            self.__logger.error(e.code())
 | |
| 63 | +            return e.code() | 
| ... | ... | @@ -19,9 +19,12 @@ Bot Session | 
| 19 | 19 |  | 
| 20 | 20 |  Allows connections
 | 
| 21 | 21 |  """
 | 
| 22 | +import asyncio
 | |
| 22 | 23 |  import logging
 | 
| 23 | 24 |  import platform
 | 
| 24 | 25 |  | 
| 26 | +from grpc import StatusCode
 | |
| 27 | + | |
| 25 | 28 |  from buildgrid._enums import BotStatus, LeaseState
 | 
| 26 | 29 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
 | 
| 27 | 30 |  from buildgrid._protos.google.rpc import code_pb2
 | 
| ... | ... | @@ -47,6 +50,8 @@ class BotSession: | 
| 47 | 50 |          self._status = BotStatus.OK.value
 | 
| 48 | 51 |          self._tenant_manager = TenantManager()
 | 
| 49 | 52 |  | 
| 53 | +        self.connected = False
 | |
| 54 | + | |
| 50 | 55 |          self.__parent = parent
 | 
| 51 | 56 |          self.__bot_id = '{}.{}'.format(parent, platform.node())
 | 
| 52 | 57 |          self.__name = None
 | 
| ... | ... | @@ -58,10 +63,33 @@ class BotSession: | 
| 58 | 63 |      def bot_id(self):
 | 
| 59 | 64 |          return self.__bot_id
 | 
| 60 | 65 |  | 
| 66 | +    async def run(self):
 | |
| 67 | +        """ Run a bot session
 | |
| 68 | + | |
| 69 | +        This connects and reconnects via create bot session and waits on update
 | |
| 70 | +        bot session calls.
 | |
| 71 | +        """
 | |
| 72 | +        self.__logger.debug("Starting bot session")
 | |
| 73 | +        interval = self._bots_interface.interval
 | |
| 74 | +        while True:
 | |
| 75 | +            if not self.connected:
 | |
| 76 | +                self.create_bot_session()
 | |
| 77 | +            else:
 | |
| 78 | +                self.update_bot_session()
 | |
| 79 | + | |
| 80 | +            if not self.connected:
 | |
| 81 | +                await asyncio.sleep(interval)
 | |
| 82 | +            else:
 | |
| 83 | +                await self._tenant_manager.wait_on_tenants(interval)
 | |
| 84 | + | |
| 61 | 85 |      def create_bot_session(self):
 | 
| 62 | 86 |          self.__logger.debug("Creating bot session")
 | 
| 63 | 87 |  | 
| 64 | 88 |          session = self._bots_interface.create_bot_session(self.__parent, self.get_pb2())
 | 
| 89 | +        if session in list(StatusCode):
 | |
| 90 | +            self.connected = False
 | |
| 91 | +            return
 | |
| 92 | +        self.connected = True
 | |
| 65 | 93 |          self.__name = session.name
 | 
| 66 | 94 |  | 
| 67 | 95 |          self.__logger.info("Created bot session with name: [%s]", self.__name)
 | 
| ... | ... | @@ -73,6 +101,13 @@ class BotSession: | 
| 73 | 101 |          self.__logger.debug("Updating bot session: [%s]", self.__bot_id)
 | 
| 74 | 102 |  | 
| 75 | 103 |          session = self._bots_interface.update_bot_session(self.get_pb2())
 | 
| 104 | +        if session == StatusCode.DEADLINE_EXCEEDED:
 | |
| 105 | +            # try to continue to do update session if it passed the timeout
 | |
| 106 | +            return
 | |
| 107 | +        elif session in StatusCode:
 | |
| 108 | +            self.connected = False
 | |
| 109 | +            return
 | |
| 110 | +        self.connected = True
 | |
| 76 | 111 |          server_ids = []
 | 
| 77 | 112 |  | 
| 78 | 113 |          for lease in session.leases:
 | 
| ... | ... | @@ -150,6 +150,13 @@ class TenantManager: | 
| 150 | 150 |          """
 | 
| 151 | 151 |          return self._tenants[lease_id].tenant_completed
 | 
| 152 | 152 |  | 
| 153 | +    async def wait_on_tenants(self, timeout):
 | |
| 154 | +        if self._tasks:
 | |
| 155 | +            tasks = self._tasks.values()
 | |
| 156 | +            await asyncio.wait(tasks,
 | |
| 157 | +                               timeout=timeout,
 | |
| 158 | +                               return_when=asyncio.FIRST_COMPLETED)
 | |
| 159 | + | |
| 153 | 160 |      def _update_lease_result(self, lease_id, result):
 | 
| 154 | 161 |          """Updates the lease with the result."""
 | 
| 155 | 162 |          self._tenants[lease_id].update_lease_result(result)
 | 
| ... | ... | @@ -156,9 +156,11 @@ class MonitoringBus: | 
| 156 | 156 |                  output_writers.append(output_file)
 | 
| 157 | 157 |  | 
| 158 | 158 |                  while True:
 | 
| 159 | -                    if await __streaming_worker(iter(output_file)):
 | |
| 159 | +                    if await __streaming_worker([output_file]):
 | |
| 160 | 160 |                          self.__sequence_number += 1
 | 
| 161 | 161 |  | 
| 162 | +                        output_file.flush()
 | |
| 163 | + | |
| 162 | 164 |              else:
 | 
| 163 | 165 |                  output_writers.append(sys.stdout.buffer)
 | 
| 164 | 166 |  | 
| ... | ... | @@ -24,6 +24,7 @@ import logging | 
| 24 | 24 |  import uuid
 | 
| 25 | 25 |  | 
| 26 | 26 |  from buildgrid._exceptions import InvalidArgumentError
 | 
| 27 | +from buildgrid.settings import NETWORK_TIMEOUT
 | |
| 27 | 28 |  | 
| 28 | 29 |  from ..job import LeaseState
 | 
| 29 | 30 |  | 
| ... | ... | @@ -75,7 +76,7 @@ class BotsInterface: | 
| 75 | 76 |          self._request_leases(bot_session)
 | 
| 76 | 77 |          return bot_session
 | 
| 77 | 78 |  | 
| 78 | -    def update_bot_session(self, name, bot_session):
 | |
| 79 | +    def update_bot_session(self, name, bot_session, deadline=None):
 | |
| 79 | 80 |          """ Client updates the server. Any changes in state to the Lease should be
 | 
| 80 | 81 |          registered server side. Assigns available leases with work.
 | 
| 81 | 82 |          """
 | 
| ... | ... | @@ -93,14 +94,15 @@ class BotsInterface: | 
| 93 | 94 |                      pass
 | 
| 94 | 95 |                  lease.Clear()
 | 
| 95 | 96 |  | 
| 96 | -        self._request_leases(bot_session)
 | |
| 97 | +        self._request_leases(bot_session, deadline)
 | |
| 97 | 98 |          return bot_session
 | 
| 98 | 99 |  | 
| 99 | -    def _request_leases(self, bot_session):
 | |
| 100 | +    def _request_leases(self, bot_session, deadline=None):
 | |
| 100 | 101 |          # TODO: Send worker capabilities to the scheduler!
 | 
| 101 | 102 |          # Only send one lease at a time currently.
 | 
| 102 | 103 |          if not bot_session.leases:
 | 
| 103 | -            leases = self._scheduler.request_job_leases({})
 | |
| 104 | +            leases = self._scheduler.request_job_leases(
 | |
| 105 | +                {}, timeout=deadline - NETWORK_TIMEOUT if deadline else None)
 | |
| 104 | 106 |              if leases:
 | 
| 105 | 107 |                  for lease in leases:
 | 
| 106 | 108 |                      self._assigned_leases[bot_session.name].add(lease.id)
 | 
| ... | ... | @@ -138,8 +138,10 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 138 | 138 |              instance_name = '/'.join(names[:-1])
 | 
| 139 | 139 |  | 
| 140 | 140 |              instance = self._get_instance(instance_name)
 | 
| 141 | -            bot_session = instance.update_bot_session(request.name,
 | |
| 142 | -                                                      request.bot_session)
 | |
| 141 | +            bot_session = instance.update_bot_session(
 | |
| 142 | +                request.name,
 | |
| 143 | +                request.bot_session,
 | |
| 144 | +                deadline=context.time_remaining())
 | |
| 143 | 145 |  | 
| 144 | 146 |              if self._is_instrumented:
 | 
| 145 | 147 |                  self.__bots[bot_id].GetCurrentTime()
 | 
| ... | ... | @@ -15,26 +15,29 @@ | 
| 15 | 15 |  | 
| 16 | 16 |  import asyncio
 | 
| 17 | 17 |  from concurrent import futures
 | 
| 18 | -from datetime import timedelta
 | |
| 18 | +from datetime import datetime, timedelta
 | |
| 19 | 19 |  import logging
 | 
| 20 | +import logging.handlers
 | |
| 20 | 21 |  import os
 | 
| 21 | 22 |  import signal
 | 
| 23 | +import sys
 | |
| 22 | 24 |  import time
 | 
| 23 | 25 |  | 
| 24 | 26 |  import grpc
 | 
| 27 | +import janus
 | |
| 25 | 28 |  | 
| 26 | -from buildgrid._enums import BotStatus, MetricRecordDomain, MetricRecordType
 | |
| 29 | +from buildgrid._enums import BotStatus, LogRecordLevel, MetricRecordDomain, MetricRecordType
 | |
| 27 | 30 |  from buildgrid._protos.buildgrid.v2 import monitoring_pb2
 | 
| 28 | 31 |  from buildgrid.server.actioncache.service import ActionCacheService
 | 
| 29 | 32 |  from buildgrid.server.bots.service import BotsService
 | 
| 33 | +from buildgrid.server.capabilities.instance import CapabilitiesInstance
 | |
| 34 | +from buildgrid.server.capabilities.service import CapabilitiesService
 | |
| 30 | 35 |  from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
 | 
| 31 | 36 |  from buildgrid.server.execution.service import ExecutionService
 | 
| 32 | 37 |  from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
 | 
| 33 | 38 |  from buildgrid.server.operations.service import OperationsService
 | 
| 34 | 39 |  from buildgrid.server.referencestorage.service import ReferenceStorageService
 | 
| 35 | -from buildgrid.server.capabilities.instance import CapabilitiesInstance
 | |
| 36 | -from buildgrid.server.capabilities.service import CapabilitiesService
 | |
| 37 | -from buildgrid.settings import MONITORING_PERIOD
 | |
| 40 | +from buildgrid.settings import LOG_RECORD_FORMAT, MONITORING_PERIOD
 | |
| 38 | 41 |  | 
| 39 | 42 |  | 
| 40 | 43 |  class BuildGridServer:
 | 
| ... | ... | @@ -60,9 +63,16 @@ class BuildGridServer: | 
| 60 | 63 |          self.__grpc_server = grpc.server(self.__grpc_executor)
 | 
| 61 | 64 |  | 
| 62 | 65 |          self.__main_loop = asyncio.get_event_loop()
 | 
| 66 | + | |
| 63 | 67 |          self.__monitoring_bus = None
 | 
| 64 | 68 |  | 
| 69 | +        self.__logging_queue = janus.Queue(loop=self.__main_loop)
 | |
| 70 | +        self.__logging_handler = logging.handlers.QueueHandler(self.__logging_queue.sync_q)
 | |
| 71 | +        self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT)
 | |
| 72 | +        self.__print_log_records = True
 | |
| 73 | + | |
| 65 | 74 |          self.__state_monitoring_task = None
 | 
| 75 | +        self.__logging_task = None
 | |
| 66 | 76 |  | 
| 67 | 77 |          # We always want a capabilities service
 | 
| 68 | 78 |          self._capabilities_service = CapabilitiesService(self.__grpc_server)
 | 
| ... | ... | @@ -85,6 +95,17 @@ class BuildGridServer: | 
| 85 | 95 |                  self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
 | 
| 86 | 96 |                  serialisation_format=MonitoringOutputFormat.JSON)
 | 
| 87 | 97 |  | 
| 98 | +        # Setup the main logging handler:
 | |
| 99 | +        root_logger = logging.getLogger()
 | |
| 100 | + | |
| 101 | +        for log_filter in root_logger.filters[:]:
 | |
| 102 | +            self.__logging_handler.addFilter(log_filter)
 | |
| 103 | +            root_logger.removeFilter(log_filter)
 | |
| 104 | + | |
| 105 | +        for log_handler in root_logger.handlers[:]:
 | |
| 106 | +            root_logger.removeHandler(log_handler)
 | |
| 107 | +        root_logger.addHandler(self.__logging_handler)
 | |
| 108 | + | |
| 88 | 109 |      # --- Public API ---
 | 
| 89 | 110 |  | 
| 90 | 111 |      def start(self):
 | 
| ... | ... | @@ -98,6 +119,9 @@ class BuildGridServer: | 
| 98 | 119 |                  self._state_monitoring_worker(period=MONITORING_PERIOD),
 | 
| 99 | 120 |                  loop=self.__main_loop)
 | 
| 100 | 121 |  | 
| 122 | +        self.__logging_task = asyncio.ensure_future(
 | |
| 123 | +            self._logging_worker(), loop=self.__main_loop)
 | |
| 124 | + | |
| 101 | 125 |          self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
 | 
| 102 | 126 |  | 
| 103 | 127 |          self.__main_loop.run_forever()
 | 
| ... | ... | @@ -110,6 +134,9 @@ class BuildGridServer: | 
| 110 | 134 |  | 
| 111 | 135 |              self.__monitoring_bus.stop()
 | 
| 112 | 136 |  | 
| 137 | +        if self.__logging_task is not None:
 | |
| 138 | +            self.__logging_task.cancel()
 | |
| 139 | + | |
| 113 | 140 |          self.__main_loop.stop()
 | 
| 114 | 141 |  | 
| 115 | 142 |          self.__grpc_server.stop(None)
 | 
| ... | ... | @@ -278,6 +305,53 @@ class BuildGridServer: | 
| 278 | 305 |                                                           execution_instance)
 | 
| 279 | 306 |              self._capabilities_service.add_instance(instance_name, capabilities_instance)
 | 
| 280 | 307 |  | 
| 308 | +    async def _logging_worker(self):
 | |
| 309 | +        """Publishes log records to the monitoring bus."""
 | |
| 310 | +        async def __logging_worker():
 | |
| 311 | +            log_record = await self.__logging_queue.async_q.get()
 | |
| 312 | + | |
| 313 | +            # Print log records to stdout, if required:
 | |
| 314 | +            if self.__print_log_records:
 | |
| 315 | +                record = self.__logging_formatter.format(log_record)
 | |
| 316 | + | |
| 317 | +                # TODO: Investigate if async write would be worth here.
 | |
| 318 | +                sys.stdout.write('{}\n'.format(record))
 | |
| 319 | +                sys.stdout.flush()
 | |
| 320 | + | |
| 321 | +            # Emit a log record if server is instrumented:
 | |
| 322 | +            if self._is_instrumented:
 | |
| 323 | +                log_record_level = LogRecordLevel(int(log_record.levelno / 10))
 | |
| 324 | +                log_record_creation_time = datetime.fromtimestamp(log_record.created)
 | |
| 325 | +                # logging.LogRecord.extra must be a str to str dict:
 | |
| 326 | +                if 'extra' in log_record.__dict__ and log_record.extra:
 | |
| 327 | +                    log_record_metadata = log_record.extra
 | |
| 328 | +                else:
 | |
| 329 | +                    log_record_metadata = None
 | |
| 330 | +                record = self._forge_log_record(
 | |
| 331 | +                    log_record.name, log_record_level, log_record.message,
 | |
| 332 | +                    log_record_creation_time, metadata=log_record_metadata)
 | |
| 333 | + | |
| 334 | +                await self.__monitoring_bus.send_record(record)
 | |
| 335 | + | |
| 336 | +        try:
 | |
| 337 | +            while True:
 | |
| 338 | +                await __logging_worker()
 | |
| 339 | + | |
| 340 | +        except asyncio.CancelledError:
 | |
| 341 | +            pass
 | |
| 342 | + | |
| 343 | +    def _forge_log_record(self, domain, level, message, creation_time, metadata=None):
 | |
| 344 | +        log_record = monitoring_pb2.LogRecord()
 | |
| 345 | + | |
| 346 | +        log_record.creation_timestamp.FromDatetime(creation_time)
 | |
| 347 | +        log_record.domain = domain
 | |
| 348 | +        log_record.level = level.value
 | |
| 349 | +        log_record.message = message
 | |
| 350 | +        if metadata is not None:
 | |
| 351 | +            log_record.metadata.update(metadata)
 | |
| 352 | + | |
| 353 | +        return log_record
 | |
| 354 | + | |
| 281 | 355 |      async def _state_monitoring_worker(self, period=1.0):
 | 
| 282 | 356 |          """Periodically publishes state metrics to the monitoring bus."""
 | 
| 283 | 357 |          async def __state_monitoring_worker():
 | 
| ... | ... | @@ -19,12 +19,13 @@ Scheduler | 
| 19 | 19 |  Schedules jobs.
 | 
| 20 | 20 |  """
 | 
| 21 | 21 |  | 
| 22 | -from collections import deque
 | |
| 23 | 22 |  from datetime import timedelta
 | 
| 24 | 23 |  import logging
 | 
| 24 | +from queue import Queue, Empty
 | |
| 25 | 25 |  | 
| 26 | 26 |  from buildgrid._enums import LeaseState, OperationStage
 | 
| 27 | 27 |  from buildgrid._exceptions import NotFoundError
 | 
| 28 | +from buildgrid.settings import MAX_JOB_BLOCK_TIME
 | |
| 28 | 29 |  | 
| 29 | 30 |  | 
| 30 | 31 |  class Scheduler:
 | 
| ... | ... | @@ -41,7 +42,7 @@ class Scheduler: | 
| 41 | 42 |  | 
| 42 | 43 |          self._action_cache = action_cache
 | 
| 43 | 44 |          self.jobs = {}
 | 
| 44 | -        self.queue = deque()
 | |
| 45 | +        self.queue = Queue()
 | |
| 45 | 46 |  | 
| 46 | 47 |          self._is_instrumented = monitor
 | 
| 47 | 48 |  | 
| ... | ... | @@ -93,7 +94,7 @@ class Scheduler: | 
| 93 | 94 |                  action_result = self._action_cache.get_action_result(job.action_digest)
 | 
| 94 | 95 |              except NotFoundError:
 | 
| 95 | 96 |                  operation_stage = OperationStage.QUEUED
 | 
| 96 | -                self.queue.append(job)
 | |
| 97 | +                self.queue.put(job)
 | |
| 97 | 98 |  | 
| 98 | 99 |              else:
 | 
| 99 | 100 |                  job.set_cached_result(action_result)
 | 
| ... | ... | @@ -104,7 +105,7 @@ class Scheduler: | 
| 104 | 105 |  | 
| 105 | 106 |          else:
 | 
| 106 | 107 |              operation_stage = OperationStage.QUEUED
 | 
| 107 | -            self.queue.append(job)
 | |
| 108 | +            self.queue.put(job)
 | |
| 108 | 109 |  | 
| 109 | 110 |          self._update_job_operation_stage(job.name, operation_stage)
 | 
| 110 | 111 |  | 
| ... | ... | @@ -120,25 +121,35 @@ class Scheduler: | 
| 120 | 121 |          else:
 | 
| 121 | 122 |              operation_stage = OperationStage.QUEUED
 | 
| 122 | 123 |              job.update_lease_state(LeaseState.PENDING)
 | 
| 123 | -            self.queue.append(job)
 | |
| 124 | +            self.queue.put(job)
 | |
| 124 | 125 |  | 
| 125 | 126 |          self._update_job_operation_stage(job_name, operation_stage)
 | 
| 126 | 127 |  | 
| 127 | 128 |      def list_jobs(self):
 | 
| 128 | 129 |          return self.jobs.values()
 | 
| 129 | 130 |  | 
| 130 | -    def request_job_leases(self, worker_capabilities):
 | |
| 131 | +    def request_job_leases(self, worker_capabilities, timeout=None):
 | |
| 131 | 132 |          """Generates a list of the highest priority leases to be run.
 | 
| 132 | 133 |  | 
| 133 | 134 |          Args:
 | 
| 134 | 135 |              worker_capabilities (dict): a set of key-value pairs decribing the
 | 
| 135 | 136 |                  worker properties, configuration and state at the time of the
 | 
| 136 | 137 |                  request.
 | 
| 138 | +            timeout (int): time to block waiting on job queue, caps if longer
 | |
| 139 | +                than MAX_JOB_BLOCK_TIME
 | |
| 137 | 140 |          """
 | 
| 138 | -        if not self.queue:
 | |
| 141 | +        if not timeout and self.queue.empty():
 | |
| 139 | 142 |              return []
 | 
| 140 | 143 |  | 
| 141 | -        job = self.queue.popleft()
 | |
| 144 | +        # Cap the timeout if it's larger than MAX_JOB_BLOCK_TIME
 | |
| 145 | +        if timeout:
 | |
| 146 | +            timeout = timeout if timeout < MAX_JOB_BLOCK_TIME else MAX_JOB_BLOCK_TIME
 | |
| 147 | +        try:
 | |
| 148 | +            job = (self.queue.get(block=True, timeout=timeout)
 | |
| 149 | +                   if timeout
 | |
| 150 | +                   else self.queue.get(block=False))
 | |
| 151 | +        except Empty:
 | |
| 152 | +            return []
 | |
| 142 | 153 |  | 
| 143 | 154 |          lease = job.lease
 | 
| 144 | 155 |  | 
| ... | ... | @@ -30,3 +30,15 @@ MAX_REQUEST_SIZE = 2 * 1024 * 1024 | 
| 30 | 30 |  | 
| 31 | 31 |  # Maximum number of elements per gRPC request:
 | 
| 32 | 32 |  MAX_REQUEST_COUNT = 500
 | 
| 33 | + | |
| 34 | +# String format for log records:
 | |
| 35 | +LOG_RECORD_FORMAT = '%(asctime)s:[%(name)36.36s][%(levelname)5.5s]: %(message)s'
 | |
| 36 | +# The different log record attributes are documented here:
 | |
| 37 | +# https://docs.python.org/3/library/logging.html#logrecord-attributes
 | |
| 38 | + | |
| 39 | +# time in seconds to pad timeouts
 | |
| 40 | +NETWORK_TIMEOUT = 5
 | |
| 41 | + | |
| 42 | +# Hard limit for waiting on jobs, avoids grpc timeouts not being set defaulting
 | |
| 43 | +# the interval to the max int64 value
 | |
| 44 | +MAX_JOB_BLOCK_TIME = 300 | 
| ... | ... | @@ -30,6 +30,7 @@ from ..utils.utils import run_in_subprocess | 
| 30 | 30 |  from ..utils.bots_interface import serve_bots_interface
 | 
| 31 | 31 |  | 
| 32 | 32 |  | 
| 33 | +TIMEOUT = 5
 | |
| 33 | 34 |  INSTANCES = ['', 'instance']
 | 
| 34 | 35 |  | 
| 35 | 36 |  | 
| ... | ... | @@ -48,7 +49,7 @@ class ServerInterface: | 
| 48 | 49 |              bot_session = bots_pb2.BotSession()
 | 
| 49 | 50 |              bot_session.ParseFromString(string_bot_session)
 | 
| 50 | 51 |  | 
| 51 | -            interface = BotInterface(grpc.insecure_channel(remote))
 | |
| 52 | +            interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
 | |
| 52 | 53 |  | 
| 53 | 54 |              result = interface.create_bot_session(parent, bot_session)
 | 
| 54 | 55 |              queue.put(result.SerializeToString())
 | 
| ... | ... | @@ -67,7 +68,7 @@ class ServerInterface: | 
| 67 | 68 |              bot_session = bots_pb2.BotSession()
 | 
| 68 | 69 |              bot_session.ParseFromString(string_bot_session)
 | 
| 69 | 70 |  | 
| 70 | -            interface = BotInterface(grpc.insecure_channel(remote))
 | |
| 71 | +            interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
 | |
| 71 | 72 |  | 
| 72 | 73 |              result = interface.update_bot_session(bot_session, update_mask)
 | 
| 73 | 74 |              queue.put(result.SerializeToString())
 | 
| ... | ... | @@ -38,7 +38,9 @@ server = mock.create_autospec(grpc.server) | 
| 38 | 38 |  # GRPC context
 | 
| 39 | 39 |  @pytest.fixture
 | 
| 40 | 40 |  def context():
 | 
| 41 | -    yield mock.MagicMock(spec=_Context)
 | |
| 41 | +    context_mock = mock.MagicMock(spec=_Context)
 | |
| 42 | +    context_mock.time_remaining.return_value = None
 | |
| 43 | +    yield context_mock
 | |
| 42 | 44 |  | 
| 43 | 45 |  | 
| 44 | 46 |  @pytest.fixture
 | 
| ... | ... | @@ -123,7 +123,7 @@ class BotsInterface: | 
| 123 | 123 |          self.__bot_session_queue.put(bot_session.SerializeToString())
 | 
| 124 | 124 |          return bot_session
 | 
| 125 | 125 |  | 
| 126 | -    def update_bot_session(self, name, bot_session):
 | |
| 126 | +    def update_bot_session(self, name, bot_session, deadline=None):
 | |
| 127 | 127 |          for lease in bot_session.leases:
 | 
| 128 | 128 |              state = LeaseState(lease.state)
 | 
| 129 | 129 |              if state == LeaseState.COMPLETED:
 | 
