Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid
Commits:
-
49586d88
by Martin Blanchard at 2018-11-30T17:19:40Z
-
5673b009
by Martin Blanchard at 2018-11-30T17:20:19Z
-
b763ec5f
by Martin Blanchard at 2018-11-30T17:20:19Z
-
76459e0a
by Martin Blanchard at 2018-11-30T17:20:19Z
-
94bf76a7
by Martin Blanchard at 2018-11-30T17:20:19Z
-
8f5d71c5
by Martin Blanchard at 2018-11-30T17:20:19Z
-
d0a84116
by Raoul Hidalgo Charman at 2018-12-05T10:00:46Z
-
0d241b27
by Raoul Hidalgo Charman at 2018-12-05T10:02:34Z
-
a0ada9f4
by Raoul Hidalgo Charman at 2018-12-05T10:02:34Z
-
2f86fb10
by Raoul Hidalgo Charman at 2018-12-05T10:02:34Z
17 changed files:
- .gitlab-ci.yml
- buildgrid/_app/cli.py
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/_app/commands/cmd_server.py
- buildgrid/bot/bot.py
- buildgrid/bot/interface.py
- buildgrid/bot/session.py
- buildgrid/bot/tenantmanager.py
- buildgrid/server/_monitoring.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/instance.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- tests/integration/bot_session.py
- tests/integration/bots_service.py
- tests/utils/bots_interface.py
Changes:
| ... | ... | @@ -2,7 +2,7 @@ |
| 2 | 2 |
image: python:3.5-stretch
|
| 3 | 3 |
|
| 4 | 4 |
variables:
|
| 5 |
- BGD: bgd --verbose
|
|
| 5 |
+ BGD: bgd
|
|
| 6 | 6 |
|
| 7 | 7 |
stages:
|
| 8 | 8 |
- test
|
| ... | ... | @@ -23,10 +23,12 @@ will be attempted to be imported. |
| 23 | 23 |
|
| 24 | 24 |
import logging
|
| 25 | 25 |
import os
|
| 26 |
+import sys
|
|
| 26 | 27 |
|
| 27 | 28 |
import click
|
| 28 | 29 |
import grpc
|
| 29 | 30 |
|
| 31 |
+from buildgrid.settings import LOG_RECORD_FORMAT
|
|
| 30 | 32 |
from buildgrid.utils import read_file
|
| 31 | 33 |
|
| 32 | 34 |
CONTEXT_SETTINGS = dict(auto_envvar_prefix='BUILDGRID')
|
| ... | ... | @@ -138,28 +140,71 @@ class BuildGridCLI(click.MultiCommand): |
| 138 | 140 |
return mod.cli
|
| 139 | 141 |
|
| 140 | 142 |
|
| 143 |
+class DebugFilter(logging.Filter):
|
|
| 144 |
+ |
|
| 145 |
+ def __init__(self, debug_domains, name=''):
|
|
| 146 |
+ super().__init__(name=name)
|
|
| 147 |
+ self.__domains_tree = {}
|
|
| 148 |
+ |
|
| 149 |
+ for domain in debug_domains.split(':'):
|
|
| 150 |
+ domains_tree = self.__domains_tree
|
|
| 151 |
+ for label in domain.split('.'):
|
|
| 152 |
+ if all(key not in domains_tree for key in [label, '*']):
|
|
| 153 |
+ domains_tree[label] = {}
|
|
| 154 |
+ domains_tree = domains_tree[label]
|
|
| 155 |
+ |
|
| 156 |
+ def filter(self, record):
|
|
| 157 |
+ domains_tree, last_match = self.__domains_tree, None
|
|
| 158 |
+ for label in record.name.split('.'):
|
|
| 159 |
+ if all(key not in domains_tree for key in [label, '*']):
|
|
| 160 |
+ return False
|
|
| 161 |
+ last_match = label if label in domains_tree else '*'
|
|
| 162 |
+ domains_tree = domains_tree[last_match]
|
|
| 163 |
+ if domains_tree and '*' not in domains_tree:
|
|
| 164 |
+ return False
|
|
| 165 |
+ return True
|
|
| 166 |
+ |
|
| 167 |
+ |
|
| 168 |
+def setup_logging(verbosity=0, debug_mode=False):
|
|
| 169 |
+ """Deals with loggers verbosity"""
|
|
| 170 |
+ asyncio_logger = logging.getLogger('asyncio')
|
|
| 171 |
+ root_logger = logging.getLogger()
|
|
| 172 |
+ |
|
| 173 |
+ log_handler = logging.StreamHandler(stream=sys.stdout)
|
|
| 174 |
+ for log_filter in root_logger.filters:
|
|
| 175 |
+ log_handler.addFilter(log_filter)
|
|
| 176 |
+ |
|
| 177 |
+ logging.basicConfig(format=LOG_RECORD_FORMAT, handlers=[log_handler])
|
|
| 178 |
+ |
|
| 179 |
+ if verbosity == 1:
|
|
| 180 |
+ root_logger.setLevel(logging.WARNING)
|
|
| 181 |
+ elif verbosity == 2:
|
|
| 182 |
+ root_logger.setLevel(logging.INFO)
|
|
| 183 |
+ elif verbosity >= 3:
|
|
| 184 |
+ root_logger.setLevel(logging.DEBUG)
|
|
| 185 |
+ else:
|
|
| 186 |
+ root_logger.setLevel(logging.ERROR)
|
|
| 187 |
+ |
|
| 188 |
+ if not debug_mode:
|
|
| 189 |
+ asyncio_logger.setLevel(logging.CRITICAL)
|
|
| 190 |
+ else:
|
|
| 191 |
+ asyncio_logger.setLevel(logging.DEBUG)
|
|
| 192 |
+ root_logger.setLevel(logging.DEBUG)
|
|
| 193 |
+ |
|
| 194 |
+ |
|
| 141 | 195 |
@click.command(cls=BuildGridCLI, context_settings=CONTEXT_SETTINGS)
|
| 142 |
-@click.option('-v', '--verbose', count=True,
|
|
| 143 |
- help='Increase log verbosity level.')
|
|
| 144 | 196 |
@pass_context
|
| 145 |
-def cli(context, verbose):
|
|
| 197 |
+def cli(context):
|
|
| 146 | 198 |
"""BuildGrid App"""
|
| 147 |
- logger = logging.getLogger()
|
|
| 199 |
+ root_logger = logging.getLogger()
|
|
| 148 | 200 |
|
| 149 | 201 |
# Clean-up root logger for any pre-configuration:
|
| 150 |
- for log_handler in logger.handlers[:]:
|
|
| 151 |
- logger.removeHandler(log_handler)
|
|
| 152 |
- for log_filter in logger.filters[:]:
|
|
| 153 |
- logger.removeFilter(log_filter)
|
|
| 154 |
- |
|
| 155 |
- logging.basicConfig(
|
|
| 156 |
- format='%(asctime)s:%(name)32.32s][%(levelname)5.5s]: %(message)s')
|
|
| 157 |
- |
|
| 158 |
- if verbose == 1:
|
|
| 159 |
- logger.setLevel(logging.WARNING)
|
|
| 160 |
- elif verbose == 2:
|
|
| 161 |
- logger.setLevel(logging.INFO)
|
|
| 162 |
- elif verbose >= 3:
|
|
| 163 |
- logger.setLevel(logging.DEBUG)
|
|
| 164 |
- else:
|
|
| 165 |
- logger.setLevel(logging.ERROR)
|
|
| 202 |
+ for log_handler in root_logger.handlers[:]:
|
|
| 203 |
+ root_logger.removeHandler(log_handler)
|
|
| 204 |
+ for log_filter in root_logger.filters[:]:
|
|
| 205 |
+ root_logger.removeFilter(log_filter)
|
|
| 206 |
+ |
|
| 207 |
+ # Filter debug messages using BGD_MESSAGE_DEBUG value:
|
|
| 208 |
+ debug_domains = os.environ.get('BGD_MESSAGE_DEBUG', None)
|
|
| 209 |
+ if debug_domains:
|
|
| 210 |
+ root_logger.addFilter(DebugFilter(debug_domains))
|
| ... | ... | @@ -32,9 +32,8 @@ from buildgrid.bot.hardware.interface import HardwareInterface |
| 32 | 32 |
from buildgrid.bot.hardware.device import Device
|
| 33 | 33 |
from buildgrid.bot.hardware.worker import Worker
|
| 34 | 34 |
|
| 35 |
- |
|
| 36 | 35 |
from ..bots import buildbox, dummy, host
|
| 37 |
-from ..cli import pass_context
|
|
| 36 |
+from ..cli import pass_context, setup_logging
|
|
| 38 | 37 |
|
| 39 | 38 |
|
| 40 | 39 |
@click.group(name='bot', short_help="Create and register bot clients.")
|
| ... | ... | @@ -54,19 +53,21 @@ from ..cli import pass_context |
| 54 | 53 |
help="Public CAS client certificate for TLS (PEM-encoded)")
|
| 55 | 54 |
@click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
|
| 56 | 55 |
help="Public CAS server certificate for TLS (PEM-encoded)")
|
| 57 |
-@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
|
|
| 56 |
+@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
|
|
| 58 | 57 |
help="Time period for bot updates to the server in seconds.")
|
| 59 | 58 |
@click.option('--parent', type=click.STRING, default='main', show_default=True,
|
| 60 | 59 |
help="Targeted farm resource.")
|
| 60 |
+@click.option('-v', '--verbose', count=True,
|
|
| 61 |
+ help='Increase log verbosity level.')
|
|
| 61 | 62 |
@pass_context
|
| 62 | 63 |
def cli(context, parent, update_period, remote, client_key, client_cert, server_cert,
|
| 63 |
- remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
|
|
| 64 |
+ remote_cas, cas_client_key, cas_client_cert, cas_server_cert, verbose):
|
|
| 65 |
+ setup_logging(verbosity=verbose)
|
|
| 64 | 66 |
# Setup the remote execution server channel:
|
| 65 | 67 |
url = urlparse(remote)
|
| 66 | 68 |
|
| 67 | 69 |
context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
|
| 68 | 70 |
context.remote_url = remote
|
| 69 |
- context.update_period = update_period
|
|
| 70 | 71 |
context.parent = parent
|
| 71 | 72 |
|
| 72 | 73 |
if url.scheme == 'http':
|
| ... | ... | @@ -124,7 +125,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ |
| 124 | 125 |
|
| 125 | 126 |
click.echo("Starting for remote=[{}]".format(context.remote))
|
| 126 | 127 |
|
| 127 |
- bot_interface = interface.BotInterface(context.channel)
|
|
| 128 |
+ bot_interface = interface.BotInterface(context.channel, update_period)
|
|
| 128 | 129 |
worker = Worker()
|
| 129 | 130 |
worker.add_device(Device())
|
| 130 | 131 |
hardware_interface = HardwareInterface(worker)
|
| ... | ... | @@ -142,7 +143,7 @@ def run_dummy(context): |
| 142 | 143 |
try:
|
| 143 | 144 |
bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
|
| 144 | 145 |
dummy.work_dummy, context)
|
| 145 |
- b = bot.Bot(bot_session, context.update_period)
|
|
| 146 |
+ b = bot.Bot(bot_session)
|
|
| 146 | 147 |
b.session()
|
| 147 | 148 |
except KeyboardInterrupt:
|
| 148 | 149 |
pass
|
| ... | ... | @@ -158,7 +159,7 @@ def run_host_tools(context): |
| 158 | 159 |
try:
|
| 159 | 160 |
bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
|
| 160 | 161 |
host.work_host_tools, context)
|
| 161 |
- b = bot.Bot(bot_session, context.update_period)
|
|
| 162 |
+ b = bot.Bot(bot_session)
|
|
| 162 | 163 |
b.session()
|
| 163 | 164 |
except KeyboardInterrupt:
|
| 164 | 165 |
pass
|
| ... | ... | @@ -180,7 +181,7 @@ def run_buildbox(context, local_cas, fuse_dir): |
| 180 | 181 |
try:
|
| 181 | 182 |
bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
|
| 182 | 183 |
buildbox.work_buildbox, context)
|
| 183 |
- b = bot.Bot(bot_session, context.update_period)
|
|
| 184 |
+ b = bot.Bot(bot_session)
|
|
| 184 | 185 |
b.session()
|
| 185 | 186 |
except KeyboardInterrupt:
|
| 186 | 187 |
pass
|
| ... | ... | @@ -26,7 +26,7 @@ import click |
| 26 | 26 |
|
| 27 | 27 |
from buildgrid.server.instance import BuildGridServer
|
| 28 | 28 |
|
| 29 |
-from ..cli import pass_context
|
|
| 29 |
+from ..cli import pass_context, setup_logging
|
|
| 30 | 30 |
from ..settings import parser
|
| 31 | 31 |
|
| 32 | 32 |
|
| ... | ... | @@ -37,9 +37,14 @@ def cli(context): |
| 37 | 37 |
|
| 38 | 38 |
|
| 39 | 39 |
@cli.command('start', short_help="Setup a new server instance.")
|
| 40 |
-@click.argument('CONFIG', type=click.Path(file_okay=True, dir_okay=False, writable=False))
|
|
| 40 |
+@click.argument('CONFIG',
|
|
| 41 |
+ type=click.Path(file_okay=True, dir_okay=False, writable=False))
|
|
| 42 |
+@click.option('-v', '--verbose', count=True,
|
|
| 43 |
+ help='Increase log verbosity level.')
|
|
| 41 | 44 |
@pass_context
|
| 42 |
-def start(context, config):
|
|
| 45 |
+def start(context, config, verbose):
|
|
| 46 |
+ setup_logging(verbosity=verbose)
|
|
| 47 |
+ |
|
| 43 | 48 |
with open(config) as f:
|
| 44 | 49 |
settings = parser.get_parser().safe_load(f)
|
| 45 | 50 |
|
| ... | ... | @@ -20,14 +20,12 @@ import logging |
| 20 | 20 |
class Bot:
|
| 21 | 21 |
"""Creates a local BotSession."""
|
| 22 | 22 |
|
| 23 |
- def __init__(self, bot_session, update_period=1):
|
|
| 23 |
+ def __init__(self, bot_session):
|
|
| 24 | 24 |
"""
|
| 25 | 25 |
"""
|
| 26 | 26 |
self.__logger = logging.getLogger(__name__)
|
| 27 | 27 |
|
| 28 | 28 |
self.__bot_session = bot_session
|
| 29 |
- self.__update_period = update_period
|
|
| 30 |
- |
|
| 31 | 29 |
self.__loop = None
|
| 32 | 30 |
|
| 33 | 31 |
def session(self):
|
| ... | ... | @@ -37,7 +35,7 @@ class Bot: |
| 37 | 35 |
self.__bot_session.create_bot_session()
|
| 38 | 36 |
|
| 39 | 37 |
try:
|
| 40 |
- task = asyncio.ensure_future(self.__update_bot_session())
|
|
| 38 |
+ task = asyncio.ensure_future(self.__bot_session.run())
|
|
| 41 | 39 |
self.__loop.run_until_complete(task)
|
| 42 | 40 |
|
| 43 | 41 |
except KeyboardInterrupt:
|
| ... | ... | @@ -46,16 +44,6 @@ class Bot: |
| 46 | 44 |
self.__kill_everyone()
|
| 47 | 45 |
self.__logger.info("Bot shutdown.")
|
| 48 | 46 |
|
| 49 |
- async def __update_bot_session(self):
|
|
| 50 |
- """Calls the server periodically to inform the server the client has not died."""
|
|
| 51 |
- try:
|
|
| 52 |
- while True:
|
|
| 53 |
- self.__bot_session.update_bot_session()
|
|
| 54 |
- await asyncio.sleep(self.__update_period)
|
|
| 55 |
- |
|
| 56 |
- except asyncio.CancelledError:
|
|
| 57 |
- pass
|
|
| 58 |
- |
|
| 59 | 47 |
def __kill_everyone(self):
|
| 60 | 48 |
"""Cancels and waits for them to stop."""
|
| 61 | 49 |
self.__logger.info("Cancelling remaining tasks...")
|
| ... | ... | @@ -24,6 +24,7 @@ import logging |
| 24 | 24 |
import grpc
|
| 25 | 25 |
|
| 26 | 26 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
|
| 27 |
+from buildgrid.settings import NETWORK_TIMEOUT
|
|
| 27 | 28 |
|
| 28 | 29 |
|
| 29 | 30 |
class BotInterface:
|
| ... | ... | @@ -31,28 +32,32 @@ class BotInterface: |
| 31 | 32 |
Interface handles calls to the server.
|
| 32 | 33 |
"""
|
| 33 | 34 |
|
| 34 |
- def __init__(self, channel):
|
|
| 35 |
+ def __init__(self, channel, interval):
|
|
| 35 | 36 |
self.__logger = logging.getLogger(__name__)
|
| 36 | 37 |
|
| 37 | 38 |
self._stub = bots_pb2_grpc.BotsStub(channel)
|
| 39 |
+ self.__interval = interval
|
|
| 40 |
+ |
|
| 41 |
+ @property
|
|
| 42 |
+ def interval(self):
|
|
| 43 |
+ return self.__interval
|
|
| 38 | 44 |
|
| 39 | 45 |
def create_bot_session(self, parent, bot_session):
|
| 46 |
+ """ Creates a bot session returning a grpc StatusCode if it failed """
|
|
| 40 | 47 |
request = bots_pb2.CreateBotSessionRequest(parent=parent,
|
| 41 | 48 |
bot_session=bot_session)
|
| 42 |
- try:
|
|
| 43 |
- return self._stub.CreateBotSession(request)
|
|
| 44 |
- |
|
| 45 |
- except grpc.RpcError as e:
|
|
| 46 |
- self.__logger.error(e)
|
|
| 47 |
- raise
|
|
| 49 |
+ return self._bot_call(self._stub.CreateBotSession, request)
|
|
| 48 | 50 |
|
| 49 | 51 |
def update_bot_session(self, bot_session, update_mask=None):
|
| 52 |
+ """ Updates a bot session returning a grpc StatusCode if it failed """
|
|
| 50 | 53 |
request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
|
| 51 | 54 |
bot_session=bot_session,
|
| 52 | 55 |
update_mask=update_mask)
|
| 53 |
- try:
|
|
| 54 |
- return self._stub.UpdateBotSession(request)
|
|
| 56 |
+ return self._bot_call(self._stub.UpdateBotSession, request)
|
|
| 55 | 57 |
|
| 58 |
+ def _bot_call(self, call, request):
|
|
| 59 |
+ try:
|
|
| 60 |
+ return call(request, timeout=self.interval + NETWORK_TIMEOUT)
|
|
| 56 | 61 |
except grpc.RpcError as e:
|
| 57 |
- self.__logger.error(e)
|
|
| 58 |
- raise
|
|
| 62 |
+ self.__logger.error(e.code())
|
|
| 63 |
+ return e.code()
|
| ... | ... | @@ -19,9 +19,12 @@ Bot Session |
| 19 | 19 |
|
| 20 | 20 |
Allows connections
|
| 21 | 21 |
"""
|
| 22 |
+import asyncio
|
|
| 22 | 23 |
import logging
|
| 23 | 24 |
import platform
|
| 24 | 25 |
|
| 26 |
+from grpc import StatusCode
|
|
| 27 |
+ |
|
| 25 | 28 |
from buildgrid._enums import BotStatus, LeaseState
|
| 26 | 29 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
| 27 | 30 |
from buildgrid._protos.google.rpc import code_pb2
|
| ... | ... | @@ -47,6 +50,8 @@ class BotSession: |
| 47 | 50 |
self._status = BotStatus.OK.value
|
| 48 | 51 |
self._tenant_manager = TenantManager()
|
| 49 | 52 |
|
| 53 |
+ self.connected = False
|
|
| 54 |
+ |
|
| 50 | 55 |
self.__parent = parent
|
| 51 | 56 |
self.__bot_id = '{}.{}'.format(parent, platform.node())
|
| 52 | 57 |
self.__name = None
|
| ... | ... | @@ -58,10 +63,33 @@ class BotSession: |
| 58 | 63 |
def bot_id(self):
|
| 59 | 64 |
return self.__bot_id
|
| 60 | 65 |
|
| 66 |
+ async def run(self):
|
|
| 67 |
+ """ Run a bot session
|
|
| 68 |
+ |
|
| 69 |
+ This connects and reconnects via create bot session and waits on update
|
|
| 70 |
+ bot session calls.
|
|
| 71 |
+ """
|
|
| 72 |
+ self.__logger.debug("Starting bot session")
|
|
| 73 |
+ interval = self._bots_interface.interval
|
|
| 74 |
+ while True:
|
|
| 75 |
+ if not self.connected:
|
|
| 76 |
+ self.create_bot_session()
|
|
| 77 |
+ else:
|
|
| 78 |
+ self.update_bot_session()
|
|
| 79 |
+ |
|
| 80 |
+ if not self.connected:
|
|
| 81 |
+ await asyncio.sleep(interval)
|
|
| 82 |
+ else:
|
|
| 83 |
+ await self._tenant_manager.wait_on_tenants(interval)
|
|
| 84 |
+ |
|
| 61 | 85 |
def create_bot_session(self):
|
| 62 | 86 |
self.__logger.debug("Creating bot session")
|
| 63 | 87 |
|
| 64 | 88 |
session = self._bots_interface.create_bot_session(self.__parent, self.get_pb2())
|
| 89 |
+ if session in list(StatusCode):
|
|
| 90 |
+ self.connected = False
|
|
| 91 |
+ return
|
|
| 92 |
+ self.connected = True
|
|
| 65 | 93 |
self.__name = session.name
|
| 66 | 94 |
|
| 67 | 95 |
self.__logger.info("Created bot session with name: [%s]", self.__name)
|
| ... | ... | @@ -73,6 +101,13 @@ class BotSession: |
| 73 | 101 |
self.__logger.debug("Updating bot session: [%s]", self.__bot_id)
|
| 74 | 102 |
|
| 75 | 103 |
session = self._bots_interface.update_bot_session(self.get_pb2())
|
| 104 |
+ if session == StatusCode.DEADLINE_EXCEEDED:
|
|
| 105 |
+ # try to continue to do update session if it passed the timeout
|
|
| 106 |
+ return
|
|
| 107 |
+ elif session in StatusCode:
|
|
| 108 |
+ self.connected = False
|
|
| 109 |
+ return
|
|
| 110 |
+ self.connected = True
|
|
| 76 | 111 |
server_ids = []
|
| 77 | 112 |
|
| 78 | 113 |
for lease in session.leases:
|
| ... | ... | @@ -150,6 +150,13 @@ class TenantManager: |
| 150 | 150 |
"""
|
| 151 | 151 |
return self._tenants[lease_id].tenant_completed
|
| 152 | 152 |
|
| 153 |
+ async def wait_on_tenants(self, timeout):
|
|
| 154 |
+ if self._tasks:
|
|
| 155 |
+ tasks = self._tasks.values()
|
|
| 156 |
+ await asyncio.wait(tasks,
|
|
| 157 |
+ timeout=timeout,
|
|
| 158 |
+ return_when=asyncio.FIRST_COMPLETED)
|
|
| 159 |
+ |
|
| 153 | 160 |
def _update_lease_result(self, lease_id, result):
|
| 154 | 161 |
"""Updates the lease with the result."""
|
| 155 | 162 |
self._tenants[lease_id].update_lease_result(result)
|
| ... | ... | @@ -156,9 +156,11 @@ class MonitoringBus: |
| 156 | 156 |
output_writers.append(output_file)
|
| 157 | 157 |
|
| 158 | 158 |
while True:
|
| 159 |
- if await __streaming_worker(iter(output_file)):
|
|
| 159 |
+ if await __streaming_worker([output_file]):
|
|
| 160 | 160 |
self.__sequence_number += 1
|
| 161 | 161 |
|
| 162 |
+ output_file.flush()
|
|
| 163 |
+ |
|
| 162 | 164 |
else:
|
| 163 | 165 |
output_writers.append(sys.stdout.buffer)
|
| 164 | 166 |
|
| ... | ... | @@ -24,6 +24,7 @@ import logging |
| 24 | 24 |
import uuid
|
| 25 | 25 |
|
| 26 | 26 |
from buildgrid._exceptions import InvalidArgumentError
|
| 27 |
+from buildgrid.settings import NETWORK_TIMEOUT
|
|
| 27 | 28 |
|
| 28 | 29 |
from ..job import LeaseState
|
| 29 | 30 |
|
| ... | ... | @@ -75,7 +76,7 @@ class BotsInterface: |
| 75 | 76 |
self._request_leases(bot_session)
|
| 76 | 77 |
return bot_session
|
| 77 | 78 |
|
| 78 |
- def update_bot_session(self, name, bot_session):
|
|
| 79 |
+ def update_bot_session(self, name, bot_session, deadline=None):
|
|
| 79 | 80 |
""" Client updates the server. Any changes in state to the Lease should be
|
| 80 | 81 |
registered server side. Assigns available leases with work.
|
| 81 | 82 |
"""
|
| ... | ... | @@ -93,14 +94,15 @@ class BotsInterface: |
| 93 | 94 |
pass
|
| 94 | 95 |
lease.Clear()
|
| 95 | 96 |
|
| 96 |
- self._request_leases(bot_session)
|
|
| 97 |
+ self._request_leases(bot_session, deadline)
|
|
| 97 | 98 |
return bot_session
|
| 98 | 99 |
|
| 99 |
- def _request_leases(self, bot_session):
|
|
| 100 |
+ def _request_leases(self, bot_session, deadline=None):
|
|
| 100 | 101 |
# TODO: Send worker capabilities to the scheduler!
|
| 101 | 102 |
# Only send one lease at a time currently.
|
| 102 | 103 |
if not bot_session.leases:
|
| 103 |
- leases = self._scheduler.request_job_leases({})
|
|
| 104 |
+ leases = self._scheduler.request_job_leases(
|
|
| 105 |
+ {}, timeout=deadline - NETWORK_TIMEOUT if deadline else None)
|
|
| 104 | 106 |
if leases:
|
| 105 | 107 |
for lease in leases:
|
| 106 | 108 |
self._assigned_leases[bot_session.name].add(lease.id)
|
| ... | ... | @@ -138,8 +138,10 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
| 138 | 138 |
instance_name = '/'.join(names[:-1])
|
| 139 | 139 |
|
| 140 | 140 |
instance = self._get_instance(instance_name)
|
| 141 |
- bot_session = instance.update_bot_session(request.name,
|
|
| 142 |
- request.bot_session)
|
|
| 141 |
+ bot_session = instance.update_bot_session(
|
|
| 142 |
+ request.name,
|
|
| 143 |
+ request.bot_session,
|
|
| 144 |
+ deadline=context.time_remaining())
|
|
| 143 | 145 |
|
| 144 | 146 |
if self._is_instrumented:
|
| 145 | 147 |
self.__bots[bot_id].GetCurrentTime()
|
| ... | ... | @@ -15,26 +15,29 @@ |
| 15 | 15 |
|
| 16 | 16 |
import asyncio
|
| 17 | 17 |
from concurrent import futures
|
| 18 |
-from datetime import timedelta
|
|
| 18 |
+from datetime import datetime, timedelta
|
|
| 19 | 19 |
import logging
|
| 20 |
+import logging.handlers
|
|
| 20 | 21 |
import os
|
| 21 | 22 |
import signal
|
| 23 |
+import sys
|
|
| 22 | 24 |
import time
|
| 23 | 25 |
|
| 24 | 26 |
import grpc
|
| 27 |
+import janus
|
|
| 25 | 28 |
|
| 26 |
-from buildgrid._enums import BotStatus, MetricRecordDomain, MetricRecordType
|
|
| 29 |
+from buildgrid._enums import BotStatus, LogRecordLevel, MetricRecordDomain, MetricRecordType
|
|
| 27 | 30 |
from buildgrid._protos.buildgrid.v2 import monitoring_pb2
|
| 28 | 31 |
from buildgrid.server.actioncache.service import ActionCacheService
|
| 29 | 32 |
from buildgrid.server.bots.service import BotsService
|
| 33 |
+from buildgrid.server.capabilities.instance import CapabilitiesInstance
|
|
| 34 |
+from buildgrid.server.capabilities.service import CapabilitiesService
|
|
| 30 | 35 |
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
|
| 31 | 36 |
from buildgrid.server.execution.service import ExecutionService
|
| 32 | 37 |
from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
|
| 33 | 38 |
from buildgrid.server.operations.service import OperationsService
|
| 34 | 39 |
from buildgrid.server.referencestorage.service import ReferenceStorageService
|
| 35 |
-from buildgrid.server.capabilities.instance import CapabilitiesInstance
|
|
| 36 |
-from buildgrid.server.capabilities.service import CapabilitiesService
|
|
| 37 |
-from buildgrid.settings import MONITORING_PERIOD
|
|
| 40 |
+from buildgrid.settings import LOG_RECORD_FORMAT, MONITORING_PERIOD
|
|
| 38 | 41 |
|
| 39 | 42 |
|
| 40 | 43 |
class BuildGridServer:
|
| ... | ... | @@ -60,9 +63,16 @@ class BuildGridServer: |
| 60 | 63 |
self.__grpc_server = grpc.server(self.__grpc_executor)
|
| 61 | 64 |
|
| 62 | 65 |
self.__main_loop = asyncio.get_event_loop()
|
| 66 |
+ |
|
| 63 | 67 |
self.__monitoring_bus = None
|
| 64 | 68 |
|
| 69 |
+ self.__logging_queue = janus.Queue(loop=self.__main_loop)
|
|
| 70 |
+ self.__logging_handler = logging.handlers.QueueHandler(self.__logging_queue.sync_q)
|
|
| 71 |
+ self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT)
|
|
| 72 |
+ self.__print_log_records = True
|
|
| 73 |
+ |
|
| 65 | 74 |
self.__state_monitoring_task = None
|
| 75 |
+ self.__logging_task = None
|
|
| 66 | 76 |
|
| 67 | 77 |
# We always want a capabilities service
|
| 68 | 78 |
self._capabilities_service = CapabilitiesService(self.__grpc_server)
|
| ... | ... | @@ -85,6 +95,17 @@ class BuildGridServer: |
| 85 | 95 |
self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
|
| 86 | 96 |
serialisation_format=MonitoringOutputFormat.JSON)
|
| 87 | 97 |
|
| 98 |
+ # Setup the main logging handler:
|
|
| 99 |
+ root_logger = logging.getLogger()
|
|
| 100 |
+ |
|
| 101 |
+ for log_filter in root_logger.filters[:]:
|
|
| 102 |
+ self.__logging_handler.addFilter(log_filter)
|
|
| 103 |
+ root_logger.removeFilter(log_filter)
|
|
| 104 |
+ |
|
| 105 |
+ for log_handler in root_logger.handlers[:]:
|
|
| 106 |
+ root_logger.removeHandler(log_handler)
|
|
| 107 |
+ root_logger.addHandler(self.__logging_handler)
|
|
| 108 |
+ |
|
| 88 | 109 |
# --- Public API ---
|
| 89 | 110 |
|
| 90 | 111 |
def start(self):
|
| ... | ... | @@ -98,6 +119,9 @@ class BuildGridServer: |
| 98 | 119 |
self._state_monitoring_worker(period=MONITORING_PERIOD),
|
| 99 | 120 |
loop=self.__main_loop)
|
| 100 | 121 |
|
| 122 |
+ self.__logging_task = asyncio.ensure_future(
|
|
| 123 |
+ self._logging_worker(), loop=self.__main_loop)
|
|
| 124 |
+ |
|
| 101 | 125 |
self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
|
| 102 | 126 |
|
| 103 | 127 |
self.__main_loop.run_forever()
|
| ... | ... | @@ -110,6 +134,9 @@ class BuildGridServer: |
| 110 | 134 |
|
| 111 | 135 |
self.__monitoring_bus.stop()
|
| 112 | 136 |
|
| 137 |
+ if self.__logging_task is not None:
|
|
| 138 |
+ self.__logging_task.cancel()
|
|
| 139 |
+ |
|
| 113 | 140 |
self.__main_loop.stop()
|
| 114 | 141 |
|
| 115 | 142 |
self.__grpc_server.stop(None)
|
| ... | ... | @@ -278,6 +305,53 @@ class BuildGridServer: |
| 278 | 305 |
execution_instance)
|
| 279 | 306 |
self._capabilities_service.add_instance(instance_name, capabilities_instance)
|
| 280 | 307 |
|
| 308 |
+ async def _logging_worker(self):
|
|
| 309 |
+ """Publishes log records to the monitoring bus."""
|
|
| 310 |
+ async def __logging_worker():
|
|
| 311 |
+ log_record = await self.__logging_queue.async_q.get()
|
|
| 312 |
+ |
|
| 313 |
+ # Print log records to stdout, if required:
|
|
| 314 |
+ if self.__print_log_records:
|
|
| 315 |
+ record = self.__logging_formatter.format(log_record)
|
|
| 316 |
+ |
|
| 317 |
+ # TODO: Investigate if async write would be worth here.
|
|
| 318 |
+ sys.stdout.write('{}\n'.format(record))
|
|
| 319 |
+ sys.stdout.flush()
|
|
| 320 |
+ |
|
| 321 |
+ # Emit a log record if server is instrumented:
|
|
| 322 |
+ if self._is_instrumented:
|
|
| 323 |
+ log_record_level = LogRecordLevel(int(log_record.levelno / 10))
|
|
| 324 |
+ log_record_creation_time = datetime.fromtimestamp(log_record.created)
|
|
| 325 |
+ # logging.LogRecord.extra must be a str to str dict:
|
|
| 326 |
+ if 'extra' in log_record.__dict__ and log_record.extra:
|
|
| 327 |
+ log_record_metadata = log_record.extra
|
|
| 328 |
+ else:
|
|
| 329 |
+ log_record_metadata = None
|
|
| 330 |
+ record = self._forge_log_record(
|
|
| 331 |
+ log_record.name, log_record_level, log_record.message,
|
|
| 332 |
+ log_record_creation_time, metadata=log_record_metadata)
|
|
| 333 |
+ |
|
| 334 |
+ await self.__monitoring_bus.send_record(record)
|
|
| 335 |
+ |
|
| 336 |
+ try:
|
|
| 337 |
+ while True:
|
|
| 338 |
+ await __logging_worker()
|
|
| 339 |
+ |
|
| 340 |
+ except asyncio.CancelledError:
|
|
| 341 |
+ pass
|
|
| 342 |
+ |
|
| 343 |
+ def _forge_log_record(self, domain, level, message, creation_time, metadata=None):
|
|
| 344 |
+ log_record = monitoring_pb2.LogRecord()
|
|
| 345 |
+ |
|
| 346 |
+ log_record.creation_timestamp.FromDatetime(creation_time)
|
|
| 347 |
+ log_record.domain = domain
|
|
| 348 |
+ log_record.level = level.value
|
|
| 349 |
+ log_record.message = message
|
|
| 350 |
+ if metadata is not None:
|
|
| 351 |
+ log_record.metadata.update(metadata)
|
|
| 352 |
+ |
|
| 353 |
+ return log_record
|
|
| 354 |
+ |
|
| 281 | 355 |
async def _state_monitoring_worker(self, period=1.0):
|
| 282 | 356 |
"""Periodically publishes state metrics to the monitoring bus."""
|
| 283 | 357 |
async def __state_monitoring_worker():
|
| ... | ... | @@ -19,12 +19,13 @@ Scheduler |
| 19 | 19 |
Schedules jobs.
|
| 20 | 20 |
"""
|
| 21 | 21 |
|
| 22 |
-from collections import deque
|
|
| 23 | 22 |
from datetime import timedelta
|
| 24 | 23 |
import logging
|
| 24 |
+from queue import Queue, Empty
|
|
| 25 | 25 |
|
| 26 | 26 |
from buildgrid._enums import LeaseState, OperationStage
|
| 27 | 27 |
from buildgrid._exceptions import NotFoundError
|
| 28 |
+from buildgrid.settings import MAX_JOB_BLOCK_TIME
|
|
| 28 | 29 |
|
| 29 | 30 |
|
| 30 | 31 |
class Scheduler:
|
| ... | ... | @@ -41,7 +42,7 @@ class Scheduler: |
| 41 | 42 |
|
| 42 | 43 |
self._action_cache = action_cache
|
| 43 | 44 |
self.jobs = {}
|
| 44 |
- self.queue = deque()
|
|
| 45 |
+ self.queue = Queue()
|
|
| 45 | 46 |
|
| 46 | 47 |
self._is_instrumented = monitor
|
| 47 | 48 |
|
| ... | ... | @@ -93,7 +94,7 @@ class Scheduler: |
| 93 | 94 |
action_result = self._action_cache.get_action_result(job.action_digest)
|
| 94 | 95 |
except NotFoundError:
|
| 95 | 96 |
operation_stage = OperationStage.QUEUED
|
| 96 |
- self.queue.append(job)
|
|
| 97 |
+ self.queue.put(job)
|
|
| 97 | 98 |
|
| 98 | 99 |
else:
|
| 99 | 100 |
job.set_cached_result(action_result)
|
| ... | ... | @@ -104,7 +105,7 @@ class Scheduler: |
| 104 | 105 |
|
| 105 | 106 |
else:
|
| 106 | 107 |
operation_stage = OperationStage.QUEUED
|
| 107 |
- self.queue.append(job)
|
|
| 108 |
+ self.queue.put(job)
|
|
| 108 | 109 |
|
| 109 | 110 |
self._update_job_operation_stage(job.name, operation_stage)
|
| 110 | 111 |
|
| ... | ... | @@ -120,25 +121,35 @@ class Scheduler: |
| 120 | 121 |
else:
|
| 121 | 122 |
operation_stage = OperationStage.QUEUED
|
| 122 | 123 |
job.update_lease_state(LeaseState.PENDING)
|
| 123 |
- self.queue.append(job)
|
|
| 124 |
+ self.queue.put(job)
|
|
| 124 | 125 |
|
| 125 | 126 |
self._update_job_operation_stage(job_name, operation_stage)
|
| 126 | 127 |
|
| 127 | 128 |
def list_jobs(self):
|
| 128 | 129 |
return self.jobs.values()
|
| 129 | 130 |
|
| 130 |
- def request_job_leases(self, worker_capabilities):
|
|
| 131 |
+ def request_job_leases(self, worker_capabilities, timeout=None):
|
|
| 131 | 132 |
"""Generates a list of the highest priority leases to be run.
|
| 132 | 133 |
|
| 133 | 134 |
Args:
|
| 134 | 135 |
worker_capabilities (dict): a set of key-value pairs decribing the
|
| 135 | 136 |
worker properties, configuration and state at the time of the
|
| 136 | 137 |
request.
|
| 138 |
+ timeout (int): time to block waiting on job queue, caps if longer
|
|
| 139 |
+ than MAX_JOB_BLOCK_TIME
|
|
| 137 | 140 |
"""
|
| 138 |
- if not self.queue:
|
|
| 141 |
+ if not timeout and self.queue.empty():
|
|
| 139 | 142 |
return []
|
| 140 | 143 |
|
| 141 |
- job = self.queue.popleft()
|
|
| 144 |
+ # Cap the timeout if it's larger than MAX_JOB_BLOCK_TIME
|
|
| 145 |
+ if timeout:
|
|
| 146 |
+ timeout = timeout if timeout < MAX_JOB_BLOCK_TIME else MAX_JOB_BLOCK_TIME
|
|
| 147 |
+ try:
|
|
| 148 |
+ job = (self.queue.get(block=True, timeout=timeout)
|
|
| 149 |
+ if timeout
|
|
| 150 |
+ else self.queue.get(block=False))
|
|
| 151 |
+ except Empty:
|
|
| 152 |
+ return []
|
|
| 142 | 153 |
|
| 143 | 154 |
lease = job.lease
|
| 144 | 155 |
|
| ... | ... | @@ -30,3 +30,15 @@ MAX_REQUEST_SIZE = 2 * 1024 * 1024 |
| 30 | 30 |
|
| 31 | 31 |
# Maximum number of elements per gRPC request:
|
| 32 | 32 |
MAX_REQUEST_COUNT = 500
|
| 33 |
+ |
|
| 34 |
+# String format for log records:
|
|
| 35 |
+LOG_RECORD_FORMAT = '%(asctime)s:[%(name)36.36s][%(levelname)5.5s]: %(message)s'
|
|
| 36 |
+# The different log record attributes are documented here:
|
|
| 37 |
+# https://docs.python.org/3/library/logging.html#logrecord-attributes
|
|
| 38 |
+ |
|
| 39 |
+# time in seconds to pad timeouts
|
|
| 40 |
+NETWORK_TIMEOUT = 5
|
|
| 41 |
+ |
|
| 42 |
+# Hard limit for waiting on jobs, avoids grpc timeouts not being set defaulting
|
|
| 43 |
+# the interval to the max int64 value
|
|
| 44 |
+MAX_JOB_BLOCK_TIME = 300
|
| ... | ... | @@ -30,6 +30,7 @@ from ..utils.utils import run_in_subprocess |
| 30 | 30 |
from ..utils.bots_interface import serve_bots_interface
|
| 31 | 31 |
|
| 32 | 32 |
|
| 33 |
+TIMEOUT = 5
|
|
| 33 | 34 |
INSTANCES = ['', 'instance']
|
| 34 | 35 |
|
| 35 | 36 |
|
| ... | ... | @@ -48,7 +49,7 @@ class ServerInterface: |
| 48 | 49 |
bot_session = bots_pb2.BotSession()
|
| 49 | 50 |
bot_session.ParseFromString(string_bot_session)
|
| 50 | 51 |
|
| 51 |
- interface = BotInterface(grpc.insecure_channel(remote))
|
|
| 52 |
+ interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
|
|
| 52 | 53 |
|
| 53 | 54 |
result = interface.create_bot_session(parent, bot_session)
|
| 54 | 55 |
queue.put(result.SerializeToString())
|
| ... | ... | @@ -67,7 +68,7 @@ class ServerInterface: |
| 67 | 68 |
bot_session = bots_pb2.BotSession()
|
| 68 | 69 |
bot_session.ParseFromString(string_bot_session)
|
| 69 | 70 |
|
| 70 |
- interface = BotInterface(grpc.insecure_channel(remote))
|
|
| 71 |
+ interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
|
|
| 71 | 72 |
|
| 72 | 73 |
result = interface.update_bot_session(bot_session, update_mask)
|
| 73 | 74 |
queue.put(result.SerializeToString())
|
| ... | ... | @@ -38,7 +38,9 @@ server = mock.create_autospec(grpc.server) |
| 38 | 38 |
# GRPC context
|
| 39 | 39 |
@pytest.fixture
|
| 40 | 40 |
def context():
|
| 41 |
- yield mock.MagicMock(spec=_Context)
|
|
| 41 |
+ context_mock = mock.MagicMock(spec=_Context)
|
|
| 42 |
+ context_mock.time_remaining.return_value = None
|
|
| 43 |
+ yield context_mock
|
|
| 42 | 44 |
|
| 43 | 45 |
|
| 44 | 46 |
@pytest.fixture
|
| ... | ... | @@ -123,7 +123,7 @@ class BotsInterface: |
| 123 | 123 |
self.__bot_session_queue.put(bot_session.SerializeToString())
|
| 124 | 124 |
return bot_session
|
| 125 | 125 |
|
| 126 |
- def update_bot_session(self, name, bot_session):
|
|
| 126 |
+ def update_bot_session(self, name, bot_session, deadline=None):
|
|
| 127 | 127 |
for lease in bot_session.leases:
|
| 128 | 128 |
state = LeaseState(lease.state)
|
| 129 | 129 |
if state == LeaseState.COMPLETED:
|
