Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid
Commits:
-
49586d88
by Martin Blanchard at 2018-11-30T17:19:40Z
-
5673b009
by Martin Blanchard at 2018-11-30T17:20:19Z
-
b763ec5f
by Martin Blanchard at 2018-11-30T17:20:19Z
-
76459e0a
by Martin Blanchard at 2018-11-30T17:20:19Z
-
94bf76a7
by Martin Blanchard at 2018-11-30T17:20:19Z
-
8f5d71c5
by Martin Blanchard at 2018-11-30T17:20:19Z
-
d0a84116
by Raoul Hidalgo Charman at 2018-12-05T10:00:46Z
-
0d241b27
by Raoul Hidalgo Charman at 2018-12-05T10:02:34Z
-
a0ada9f4
by Raoul Hidalgo Charman at 2018-12-05T10:02:34Z
-
2f86fb10
by Raoul Hidalgo Charman at 2018-12-05T10:02:34Z
17 changed files:
- .gitlab-ci.yml
- buildgrid/_app/cli.py
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/_app/commands/cmd_server.py
- buildgrid/bot/bot.py
- buildgrid/bot/interface.py
- buildgrid/bot/session.py
- buildgrid/bot/tenantmanager.py
- buildgrid/server/_monitoring.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/instance.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- tests/integration/bot_session.py
- tests/integration/bots_service.py
- tests/utils/bots_interface.py
Changes:
... | ... | @@ -2,7 +2,7 @@ |
2 | 2 |
image: python:3.5-stretch
|
3 | 3 |
|
4 | 4 |
variables:
|
5 |
- BGD: bgd --verbose
|
|
5 |
+ BGD: bgd
|
|
6 | 6 |
|
7 | 7 |
stages:
|
8 | 8 |
- test
|
... | ... | @@ -23,10 +23,12 @@ will be attempted to be imported. |
23 | 23 |
|
24 | 24 |
import logging
|
25 | 25 |
import os
|
26 |
+import sys
|
|
26 | 27 |
|
27 | 28 |
import click
|
28 | 29 |
import grpc
|
29 | 30 |
|
31 |
+from buildgrid.settings import LOG_RECORD_FORMAT
|
|
30 | 32 |
from buildgrid.utils import read_file
|
31 | 33 |
|
32 | 34 |
CONTEXT_SETTINGS = dict(auto_envvar_prefix='BUILDGRID')
|
... | ... | @@ -138,28 +140,71 @@ class BuildGridCLI(click.MultiCommand): |
138 | 140 |
return mod.cli
|
139 | 141 |
|
140 | 142 |
|
143 |
+class DebugFilter(logging.Filter):
|
|
144 |
+ |
|
145 |
+ def __init__(self, debug_domains, name=''):
|
|
146 |
+ super().__init__(name=name)
|
|
147 |
+ self.__domains_tree = {}
|
|
148 |
+ |
|
149 |
+ for domain in debug_domains.split(':'):
|
|
150 |
+ domains_tree = self.__domains_tree
|
|
151 |
+ for label in domain.split('.'):
|
|
152 |
+ if all(key not in domains_tree for key in [label, '*']):
|
|
153 |
+ domains_tree[label] = {}
|
|
154 |
+ domains_tree = domains_tree[label]
|
|
155 |
+ |
|
156 |
+ def filter(self, record):
|
|
157 |
+ domains_tree, last_match = self.__domains_tree, None
|
|
158 |
+ for label in record.name.split('.'):
|
|
159 |
+ if all(key not in domains_tree for key in [label, '*']):
|
|
160 |
+ return False
|
|
161 |
+ last_match = label if label in domains_tree else '*'
|
|
162 |
+ domains_tree = domains_tree[last_match]
|
|
163 |
+ if domains_tree and '*' not in domains_tree:
|
|
164 |
+ return False
|
|
165 |
+ return True
|
|
166 |
+ |
|
167 |
+ |
|
168 |
+def setup_logging(verbosity=0, debug_mode=False):
|
|
169 |
+ """Deals with loggers verbosity"""
|
|
170 |
+ asyncio_logger = logging.getLogger('asyncio')
|
|
171 |
+ root_logger = logging.getLogger()
|
|
172 |
+ |
|
173 |
+ log_handler = logging.StreamHandler(stream=sys.stdout)
|
|
174 |
+ for log_filter in root_logger.filters:
|
|
175 |
+ log_handler.addFilter(log_filter)
|
|
176 |
+ |
|
177 |
+ logging.basicConfig(format=LOG_RECORD_FORMAT, handlers=[log_handler])
|
|
178 |
+ |
|
179 |
+ if verbosity == 1:
|
|
180 |
+ root_logger.setLevel(logging.WARNING)
|
|
181 |
+ elif verbosity == 2:
|
|
182 |
+ root_logger.setLevel(logging.INFO)
|
|
183 |
+ elif verbosity >= 3:
|
|
184 |
+ root_logger.setLevel(logging.DEBUG)
|
|
185 |
+ else:
|
|
186 |
+ root_logger.setLevel(logging.ERROR)
|
|
187 |
+ |
|
188 |
+ if not debug_mode:
|
|
189 |
+ asyncio_logger.setLevel(logging.CRITICAL)
|
|
190 |
+ else:
|
|
191 |
+ asyncio_logger.setLevel(logging.DEBUG)
|
|
192 |
+ root_logger.setLevel(logging.DEBUG)
|
|
193 |
+ |
|
194 |
+ |
|
141 | 195 |
@click.command(cls=BuildGridCLI, context_settings=CONTEXT_SETTINGS)
|
142 |
-@click.option('-v', '--verbose', count=True,
|
|
143 |
- help='Increase log verbosity level.')
|
|
144 | 196 |
@pass_context
|
145 |
-def cli(context, verbose):
|
|
197 |
+def cli(context):
|
|
146 | 198 |
"""BuildGrid App"""
|
147 |
- logger = logging.getLogger()
|
|
199 |
+ root_logger = logging.getLogger()
|
|
148 | 200 |
|
149 | 201 |
# Clean-up root logger for any pre-configuration:
|
150 |
- for log_handler in logger.handlers[:]:
|
|
151 |
- logger.removeHandler(log_handler)
|
|
152 |
- for log_filter in logger.filters[:]:
|
|
153 |
- logger.removeFilter(log_filter)
|
|
154 |
- |
|
155 |
- logging.basicConfig(
|
|
156 |
- format='%(asctime)s:%(name)32.32s][%(levelname)5.5s]: %(message)s')
|
|
157 |
- |
|
158 |
- if verbose == 1:
|
|
159 |
- logger.setLevel(logging.WARNING)
|
|
160 |
- elif verbose == 2:
|
|
161 |
- logger.setLevel(logging.INFO)
|
|
162 |
- elif verbose >= 3:
|
|
163 |
- logger.setLevel(logging.DEBUG)
|
|
164 |
- else:
|
|
165 |
- logger.setLevel(logging.ERROR)
|
|
202 |
+ for log_handler in root_logger.handlers[:]:
|
|
203 |
+ root_logger.removeHandler(log_handler)
|
|
204 |
+ for log_filter in root_logger.filters[:]:
|
|
205 |
+ root_logger.removeFilter(log_filter)
|
|
206 |
+ |
|
207 |
+ # Filter debug messages using BGD_MESSAGE_DEBUG value:
|
|
208 |
+ debug_domains = os.environ.get('BGD_MESSAGE_DEBUG', None)
|
|
209 |
+ if debug_domains:
|
|
210 |
+ root_logger.addFilter(DebugFilter(debug_domains))
|
... | ... | @@ -32,9 +32,8 @@ from buildgrid.bot.hardware.interface import HardwareInterface |
32 | 32 |
from buildgrid.bot.hardware.device import Device
|
33 | 33 |
from buildgrid.bot.hardware.worker import Worker
|
34 | 34 |
|
35 |
- |
|
36 | 35 |
from ..bots import buildbox, dummy, host
|
37 |
-from ..cli import pass_context
|
|
36 |
+from ..cli import pass_context, setup_logging
|
|
38 | 37 |
|
39 | 38 |
|
40 | 39 |
@click.group(name='bot', short_help="Create and register bot clients.")
|
... | ... | @@ -54,19 +53,21 @@ from ..cli import pass_context |
54 | 53 |
help="Public CAS client certificate for TLS (PEM-encoded)")
|
55 | 54 |
@click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
|
56 | 55 |
help="Public CAS server certificate for TLS (PEM-encoded)")
|
57 |
-@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
|
|
56 |
+@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
|
|
58 | 57 |
help="Time period for bot updates to the server in seconds.")
|
59 | 58 |
@click.option('--parent', type=click.STRING, default='main', show_default=True,
|
60 | 59 |
help="Targeted farm resource.")
|
60 |
+@click.option('-v', '--verbose', count=True,
|
|
61 |
+ help='Increase log verbosity level.')
|
|
61 | 62 |
@pass_context
|
62 | 63 |
def cli(context, parent, update_period, remote, client_key, client_cert, server_cert,
|
63 |
- remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
|
|
64 |
+ remote_cas, cas_client_key, cas_client_cert, cas_server_cert, verbose):
|
|
65 |
+ setup_logging(verbosity=verbose)
|
|
64 | 66 |
# Setup the remote execution server channel:
|
65 | 67 |
url = urlparse(remote)
|
66 | 68 |
|
67 | 69 |
context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
|
68 | 70 |
context.remote_url = remote
|
69 |
- context.update_period = update_period
|
|
70 | 71 |
context.parent = parent
|
71 | 72 |
|
72 | 73 |
if url.scheme == 'http':
|
... | ... | @@ -124,7 +125,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ |
124 | 125 |
|
125 | 126 |
click.echo("Starting for remote=[{}]".format(context.remote))
|
126 | 127 |
|
127 |
- bot_interface = interface.BotInterface(context.channel)
|
|
128 |
+ bot_interface = interface.BotInterface(context.channel, update_period)
|
|
128 | 129 |
worker = Worker()
|
129 | 130 |
worker.add_device(Device())
|
130 | 131 |
hardware_interface = HardwareInterface(worker)
|
... | ... | @@ -142,7 +143,7 @@ def run_dummy(context): |
142 | 143 |
try:
|
143 | 144 |
bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
|
144 | 145 |
dummy.work_dummy, context)
|
145 |
- b = bot.Bot(bot_session, context.update_period)
|
|
146 |
+ b = bot.Bot(bot_session)
|
|
146 | 147 |
b.session()
|
147 | 148 |
except KeyboardInterrupt:
|
148 | 149 |
pass
|
... | ... | @@ -158,7 +159,7 @@ def run_host_tools(context): |
158 | 159 |
try:
|
159 | 160 |
bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
|
160 | 161 |
host.work_host_tools, context)
|
161 |
- b = bot.Bot(bot_session, context.update_period)
|
|
162 |
+ b = bot.Bot(bot_session)
|
|
162 | 163 |
b.session()
|
163 | 164 |
except KeyboardInterrupt:
|
164 | 165 |
pass
|
... | ... | @@ -180,7 +181,7 @@ def run_buildbox(context, local_cas, fuse_dir): |
180 | 181 |
try:
|
181 | 182 |
bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
|
182 | 183 |
buildbox.work_buildbox, context)
|
183 |
- b = bot.Bot(bot_session, context.update_period)
|
|
184 |
+ b = bot.Bot(bot_session)
|
|
184 | 185 |
b.session()
|
185 | 186 |
except KeyboardInterrupt:
|
186 | 187 |
pass
|
... | ... | @@ -26,7 +26,7 @@ import click |
26 | 26 |
|
27 | 27 |
from buildgrid.server.instance import BuildGridServer
|
28 | 28 |
|
29 |
-from ..cli import pass_context
|
|
29 |
+from ..cli import pass_context, setup_logging
|
|
30 | 30 |
from ..settings import parser
|
31 | 31 |
|
32 | 32 |
|
... | ... | @@ -37,9 +37,14 @@ def cli(context): |
37 | 37 |
|
38 | 38 |
|
39 | 39 |
@cli.command('start', short_help="Setup a new server instance.")
|
40 |
-@click.argument('CONFIG', type=click.Path(file_okay=True, dir_okay=False, writable=False))
|
|
40 |
+@click.argument('CONFIG',
|
|
41 |
+ type=click.Path(file_okay=True, dir_okay=False, writable=False))
|
|
42 |
+@click.option('-v', '--verbose', count=True,
|
|
43 |
+ help='Increase log verbosity level.')
|
|
41 | 44 |
@pass_context
|
42 |
-def start(context, config):
|
|
45 |
+def start(context, config, verbose):
|
|
46 |
+ setup_logging(verbosity=verbose)
|
|
47 |
+ |
|
43 | 48 |
with open(config) as f:
|
44 | 49 |
settings = parser.get_parser().safe_load(f)
|
45 | 50 |
|
... | ... | @@ -20,14 +20,12 @@ import logging |
20 | 20 |
class Bot:
|
21 | 21 |
"""Creates a local BotSession."""
|
22 | 22 |
|
23 |
- def __init__(self, bot_session, update_period=1):
|
|
23 |
+ def __init__(self, bot_session):
|
|
24 | 24 |
"""
|
25 | 25 |
"""
|
26 | 26 |
self.__logger = logging.getLogger(__name__)
|
27 | 27 |
|
28 | 28 |
self.__bot_session = bot_session
|
29 |
- self.__update_period = update_period
|
|
30 |
- |
|
31 | 29 |
self.__loop = None
|
32 | 30 |
|
33 | 31 |
def session(self):
|
... | ... | @@ -37,7 +35,7 @@ class Bot: |
37 | 35 |
self.__bot_session.create_bot_session()
|
38 | 36 |
|
39 | 37 |
try:
|
40 |
- task = asyncio.ensure_future(self.__update_bot_session())
|
|
38 |
+ task = asyncio.ensure_future(self.__bot_session.run())
|
|
41 | 39 |
self.__loop.run_until_complete(task)
|
42 | 40 |
|
43 | 41 |
except KeyboardInterrupt:
|
... | ... | @@ -46,16 +44,6 @@ class Bot: |
46 | 44 |
self.__kill_everyone()
|
47 | 45 |
self.__logger.info("Bot shutdown.")
|
48 | 46 |
|
49 |
- async def __update_bot_session(self):
|
|
50 |
- """Calls the server periodically to inform the server the client has not died."""
|
|
51 |
- try:
|
|
52 |
- while True:
|
|
53 |
- self.__bot_session.update_bot_session()
|
|
54 |
- await asyncio.sleep(self.__update_period)
|
|
55 |
- |
|
56 |
- except asyncio.CancelledError:
|
|
57 |
- pass
|
|
58 |
- |
|
59 | 47 |
def __kill_everyone(self):
|
60 | 48 |
"""Cancels and waits for them to stop."""
|
61 | 49 |
self.__logger.info("Cancelling remaining tasks...")
|
... | ... | @@ -24,6 +24,7 @@ import logging |
24 | 24 |
import grpc
|
25 | 25 |
|
26 | 26 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
|
27 |
+from buildgrid.settings import NETWORK_TIMEOUT
|
|
27 | 28 |
|
28 | 29 |
|
29 | 30 |
class BotInterface:
|
... | ... | @@ -31,28 +32,32 @@ class BotInterface: |
31 | 32 |
Interface handles calls to the server.
|
32 | 33 |
"""
|
33 | 34 |
|
34 |
- def __init__(self, channel):
|
|
35 |
+ def __init__(self, channel, interval):
|
|
35 | 36 |
self.__logger = logging.getLogger(__name__)
|
36 | 37 |
|
37 | 38 |
self._stub = bots_pb2_grpc.BotsStub(channel)
|
39 |
+ self.__interval = interval
|
|
40 |
+ |
|
41 |
+ @property
|
|
42 |
+ def interval(self):
|
|
43 |
+ return self.__interval
|
|
38 | 44 |
|
39 | 45 |
def create_bot_session(self, parent, bot_session):
|
46 |
+ """ Creates a bot session returning a grpc StatusCode if it failed """
|
|
40 | 47 |
request = bots_pb2.CreateBotSessionRequest(parent=parent,
|
41 | 48 |
bot_session=bot_session)
|
42 |
- try:
|
|
43 |
- return self._stub.CreateBotSession(request)
|
|
44 |
- |
|
45 |
- except grpc.RpcError as e:
|
|
46 |
- self.__logger.error(e)
|
|
47 |
- raise
|
|
49 |
+ return self._bot_call(self._stub.CreateBotSession, request)
|
|
48 | 50 |
|
49 | 51 |
def update_bot_session(self, bot_session, update_mask=None):
|
52 |
+ """ Updates a bot session returning a grpc StatusCode if it failed """
|
|
50 | 53 |
request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
|
51 | 54 |
bot_session=bot_session,
|
52 | 55 |
update_mask=update_mask)
|
53 |
- try:
|
|
54 |
- return self._stub.UpdateBotSession(request)
|
|
56 |
+ return self._bot_call(self._stub.UpdateBotSession, request)
|
|
55 | 57 |
|
58 |
+ def _bot_call(self, call, request):
|
|
59 |
+ try:
|
|
60 |
+ return call(request, timeout=self.interval + NETWORK_TIMEOUT)
|
|
56 | 61 |
except grpc.RpcError as e:
|
57 |
- self.__logger.error(e)
|
|
58 |
- raise
|
|
62 |
+ self.__logger.error(e.code())
|
|
63 |
+ return e.code()
|
... | ... | @@ -19,9 +19,12 @@ Bot Session |
19 | 19 |
|
20 | 20 |
Allows connections
|
21 | 21 |
"""
|
22 |
+import asyncio
|
|
22 | 23 |
import logging
|
23 | 24 |
import platform
|
24 | 25 |
|
26 |
+from grpc import StatusCode
|
|
27 |
+ |
|
25 | 28 |
from buildgrid._enums import BotStatus, LeaseState
|
26 | 29 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
27 | 30 |
from buildgrid._protos.google.rpc import code_pb2
|
... | ... | @@ -47,6 +50,8 @@ class BotSession: |
47 | 50 |
self._status = BotStatus.OK.value
|
48 | 51 |
self._tenant_manager = TenantManager()
|
49 | 52 |
|
53 |
+ self.connected = False
|
|
54 |
+ |
|
50 | 55 |
self.__parent = parent
|
51 | 56 |
self.__bot_id = '{}.{}'.format(parent, platform.node())
|
52 | 57 |
self.__name = None
|
... | ... | @@ -58,10 +63,33 @@ class BotSession: |
58 | 63 |
def bot_id(self):
|
59 | 64 |
return self.__bot_id
|
60 | 65 |
|
66 |
+ async def run(self):
|
|
67 |
+ """ Run a bot session
|
|
68 |
+ |
|
69 |
+ This connects and reconnects via create bot session and waits on update
|
|
70 |
+ bot session calls.
|
|
71 |
+ """
|
|
72 |
+ self.__logger.debug("Starting bot session")
|
|
73 |
+ interval = self._bots_interface.interval
|
|
74 |
+ while True:
|
|
75 |
+ if not self.connected:
|
|
76 |
+ self.create_bot_session()
|
|
77 |
+ else:
|
|
78 |
+ self.update_bot_session()
|
|
79 |
+ |
|
80 |
+ if not self.connected:
|
|
81 |
+ await asyncio.sleep(interval)
|
|
82 |
+ else:
|
|
83 |
+ await self._tenant_manager.wait_on_tenants(interval)
|
|
84 |
+ |
|
61 | 85 |
def create_bot_session(self):
|
62 | 86 |
self.__logger.debug("Creating bot session")
|
63 | 87 |
|
64 | 88 |
session = self._bots_interface.create_bot_session(self.__parent, self.get_pb2())
|
89 |
+ if session in list(StatusCode):
|
|
90 |
+ self.connected = False
|
|
91 |
+ return
|
|
92 |
+ self.connected = True
|
|
65 | 93 |
self.__name = session.name
|
66 | 94 |
|
67 | 95 |
self.__logger.info("Created bot session with name: [%s]", self.__name)
|
... | ... | @@ -73,6 +101,13 @@ class BotSession: |
73 | 101 |
self.__logger.debug("Updating bot session: [%s]", self.__bot_id)
|
74 | 102 |
|
75 | 103 |
session = self._bots_interface.update_bot_session(self.get_pb2())
|
104 |
+ if session == StatusCode.DEADLINE_EXCEEDED:
|
|
105 |
+ # try to continue to do update session if it passed the timeout
|
|
106 |
+ return
|
|
107 |
+ elif session in StatusCode:
|
|
108 |
+ self.connected = False
|
|
109 |
+ return
|
|
110 |
+ self.connected = True
|
|
76 | 111 |
server_ids = []
|
77 | 112 |
|
78 | 113 |
for lease in session.leases:
|
... | ... | @@ -150,6 +150,13 @@ class TenantManager: |
150 | 150 |
"""
|
151 | 151 |
return self._tenants[lease_id].tenant_completed
|
152 | 152 |
|
153 |
+ async def wait_on_tenants(self, timeout):
|
|
154 |
+ if self._tasks:
|
|
155 |
+ tasks = self._tasks.values()
|
|
156 |
+ await asyncio.wait(tasks,
|
|
157 |
+ timeout=timeout,
|
|
158 |
+ return_when=asyncio.FIRST_COMPLETED)
|
|
159 |
+ |
|
153 | 160 |
def _update_lease_result(self, lease_id, result):
|
154 | 161 |
"""Updates the lease with the result."""
|
155 | 162 |
self._tenants[lease_id].update_lease_result(result)
|
... | ... | @@ -156,9 +156,11 @@ class MonitoringBus: |
156 | 156 |
output_writers.append(output_file)
|
157 | 157 |
|
158 | 158 |
while True:
|
159 |
- if await __streaming_worker(iter(output_file)):
|
|
159 |
+ if await __streaming_worker([output_file]):
|
|
160 | 160 |
self.__sequence_number += 1
|
161 | 161 |
|
162 |
+ output_file.flush()
|
|
163 |
+ |
|
162 | 164 |
else:
|
163 | 165 |
output_writers.append(sys.stdout.buffer)
|
164 | 166 |
|
... | ... | @@ -24,6 +24,7 @@ import logging |
24 | 24 |
import uuid
|
25 | 25 |
|
26 | 26 |
from buildgrid._exceptions import InvalidArgumentError
|
27 |
+from buildgrid.settings import NETWORK_TIMEOUT
|
|
27 | 28 |
|
28 | 29 |
from ..job import LeaseState
|
29 | 30 |
|
... | ... | @@ -75,7 +76,7 @@ class BotsInterface: |
75 | 76 |
self._request_leases(bot_session)
|
76 | 77 |
return bot_session
|
77 | 78 |
|
78 |
- def update_bot_session(self, name, bot_session):
|
|
79 |
+ def update_bot_session(self, name, bot_session, deadline=None):
|
|
79 | 80 |
""" Client updates the server. Any changes in state to the Lease should be
|
80 | 81 |
registered server side. Assigns available leases with work.
|
81 | 82 |
"""
|
... | ... | @@ -93,14 +94,15 @@ class BotsInterface: |
93 | 94 |
pass
|
94 | 95 |
lease.Clear()
|
95 | 96 |
|
96 |
- self._request_leases(bot_session)
|
|
97 |
+ self._request_leases(bot_session, deadline)
|
|
97 | 98 |
return bot_session
|
98 | 99 |
|
99 |
- def _request_leases(self, bot_session):
|
|
100 |
+ def _request_leases(self, bot_session, deadline=None):
|
|
100 | 101 |
# TODO: Send worker capabilities to the scheduler!
|
101 | 102 |
# Only send one lease at a time currently.
|
102 | 103 |
if not bot_session.leases:
|
103 |
- leases = self._scheduler.request_job_leases({})
|
|
104 |
+ leases = self._scheduler.request_job_leases(
|
|
105 |
+ {}, timeout=deadline - NETWORK_TIMEOUT if deadline else None)
|
|
104 | 106 |
if leases:
|
105 | 107 |
for lease in leases:
|
106 | 108 |
self._assigned_leases[bot_session.name].add(lease.id)
|
... | ... | @@ -138,8 +138,10 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
138 | 138 |
instance_name = '/'.join(names[:-1])
|
139 | 139 |
|
140 | 140 |
instance = self._get_instance(instance_name)
|
141 |
- bot_session = instance.update_bot_session(request.name,
|
|
142 |
- request.bot_session)
|
|
141 |
+ bot_session = instance.update_bot_session(
|
|
142 |
+ request.name,
|
|
143 |
+ request.bot_session,
|
|
144 |
+ deadline=context.time_remaining())
|
|
143 | 145 |
|
144 | 146 |
if self._is_instrumented:
|
145 | 147 |
self.__bots[bot_id].GetCurrentTime()
|
... | ... | @@ -15,26 +15,29 @@ |
15 | 15 |
|
16 | 16 |
import asyncio
|
17 | 17 |
from concurrent import futures
|
18 |
-from datetime import timedelta
|
|
18 |
+from datetime import datetime, timedelta
|
|
19 | 19 |
import logging
|
20 |
+import logging.handlers
|
|
20 | 21 |
import os
|
21 | 22 |
import signal
|
23 |
+import sys
|
|
22 | 24 |
import time
|
23 | 25 |
|
24 | 26 |
import grpc
|
27 |
+import janus
|
|
25 | 28 |
|
26 |
-from buildgrid._enums import BotStatus, MetricRecordDomain, MetricRecordType
|
|
29 |
+from buildgrid._enums import BotStatus, LogRecordLevel, MetricRecordDomain, MetricRecordType
|
|
27 | 30 |
from buildgrid._protos.buildgrid.v2 import monitoring_pb2
|
28 | 31 |
from buildgrid.server.actioncache.service import ActionCacheService
|
29 | 32 |
from buildgrid.server.bots.service import BotsService
|
33 |
+from buildgrid.server.capabilities.instance import CapabilitiesInstance
|
|
34 |
+from buildgrid.server.capabilities.service import CapabilitiesService
|
|
30 | 35 |
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
|
31 | 36 |
from buildgrid.server.execution.service import ExecutionService
|
32 | 37 |
from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
|
33 | 38 |
from buildgrid.server.operations.service import OperationsService
|
34 | 39 |
from buildgrid.server.referencestorage.service import ReferenceStorageService
|
35 |
-from buildgrid.server.capabilities.instance import CapabilitiesInstance
|
|
36 |
-from buildgrid.server.capabilities.service import CapabilitiesService
|
|
37 |
-from buildgrid.settings import MONITORING_PERIOD
|
|
40 |
+from buildgrid.settings import LOG_RECORD_FORMAT, MONITORING_PERIOD
|
|
38 | 41 |
|
39 | 42 |
|
40 | 43 |
class BuildGridServer:
|
... | ... | @@ -60,9 +63,16 @@ class BuildGridServer: |
60 | 63 |
self.__grpc_server = grpc.server(self.__grpc_executor)
|
61 | 64 |
|
62 | 65 |
self.__main_loop = asyncio.get_event_loop()
|
66 |
+ |
|
63 | 67 |
self.__monitoring_bus = None
|
64 | 68 |
|
69 |
+ self.__logging_queue = janus.Queue(loop=self.__main_loop)
|
|
70 |
+ self.__logging_handler = logging.handlers.QueueHandler(self.__logging_queue.sync_q)
|
|
71 |
+ self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT)
|
|
72 |
+ self.__print_log_records = True
|
|
73 |
+ |
|
65 | 74 |
self.__state_monitoring_task = None
|
75 |
+ self.__logging_task = None
|
|
66 | 76 |
|
67 | 77 |
# We always want a capabilities service
|
68 | 78 |
self._capabilities_service = CapabilitiesService(self.__grpc_server)
|
... | ... | @@ -85,6 +95,17 @@ class BuildGridServer: |
85 | 95 |
self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
|
86 | 96 |
serialisation_format=MonitoringOutputFormat.JSON)
|
87 | 97 |
|
98 |
+ # Setup the main logging handler:
|
|
99 |
+ root_logger = logging.getLogger()
|
|
100 |
+ |
|
101 |
+ for log_filter in root_logger.filters[:]:
|
|
102 |
+ self.__logging_handler.addFilter(log_filter)
|
|
103 |
+ root_logger.removeFilter(log_filter)
|
|
104 |
+ |
|
105 |
+ for log_handler in root_logger.handlers[:]:
|
|
106 |
+ root_logger.removeHandler(log_handler)
|
|
107 |
+ root_logger.addHandler(self.__logging_handler)
|
|
108 |
+ |
|
88 | 109 |
# --- Public API ---
|
89 | 110 |
|
90 | 111 |
def start(self):
|
... | ... | @@ -98,6 +119,9 @@ class BuildGridServer: |
98 | 119 |
self._state_monitoring_worker(period=MONITORING_PERIOD),
|
99 | 120 |
loop=self.__main_loop)
|
100 | 121 |
|
122 |
+ self.__logging_task = asyncio.ensure_future(
|
|
123 |
+ self._logging_worker(), loop=self.__main_loop)
|
|
124 |
+ |
|
101 | 125 |
self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
|
102 | 126 |
|
103 | 127 |
self.__main_loop.run_forever()
|
... | ... | @@ -110,6 +134,9 @@ class BuildGridServer: |
110 | 134 |
|
111 | 135 |
self.__monitoring_bus.stop()
|
112 | 136 |
|
137 |
+ if self.__logging_task is not None:
|
|
138 |
+ self.__logging_task.cancel()
|
|
139 |
+ |
|
113 | 140 |
self.__main_loop.stop()
|
114 | 141 |
|
115 | 142 |
self.__grpc_server.stop(None)
|
... | ... | @@ -278,6 +305,53 @@ class BuildGridServer: |
278 | 305 |
execution_instance)
|
279 | 306 |
self._capabilities_service.add_instance(instance_name, capabilities_instance)
|
280 | 307 |
|
308 |
+ async def _logging_worker(self):
|
|
309 |
+ """Publishes log records to the monitoring bus."""
|
|
310 |
+ async def __logging_worker():
|
|
311 |
+ log_record = await self.__logging_queue.async_q.get()
|
|
312 |
+ |
|
313 |
+ # Print log records to stdout, if required:
|
|
314 |
+ if self.__print_log_records:
|
|
315 |
+ record = self.__logging_formatter.format(log_record)
|
|
316 |
+ |
|
317 |
+ # TODO: Investigate if async write would be worth here.
|
|
318 |
+ sys.stdout.write('{}\n'.format(record))
|
|
319 |
+ sys.stdout.flush()
|
|
320 |
+ |
|
321 |
+ # Emit a log record if server is instrumented:
|
|
322 |
+ if self._is_instrumented:
|
|
323 |
+ log_record_level = LogRecordLevel(int(log_record.levelno / 10))
|
|
324 |
+ log_record_creation_time = datetime.fromtimestamp(log_record.created)
|
|
325 |
+ # logging.LogRecord.extra must be a str to str dict:
|
|
326 |
+ if 'extra' in log_record.__dict__ and log_record.extra:
|
|
327 |
+ log_record_metadata = log_record.extra
|
|
328 |
+ else:
|
|
329 |
+ log_record_metadata = None
|
|
330 |
+ record = self._forge_log_record(
|
|
331 |
+ log_record.name, log_record_level, log_record.message,
|
|
332 |
+ log_record_creation_time, metadata=log_record_metadata)
|
|
333 |
+ |
|
334 |
+ await self.__monitoring_bus.send_record(record)
|
|
335 |
+ |
|
336 |
+ try:
|
|
337 |
+ while True:
|
|
338 |
+ await __logging_worker()
|
|
339 |
+ |
|
340 |
+ except asyncio.CancelledError:
|
|
341 |
+ pass
|
|
342 |
+ |
|
343 |
+ def _forge_log_record(self, domain, level, message, creation_time, metadata=None):
|
|
344 |
+ log_record = monitoring_pb2.LogRecord()
|
|
345 |
+ |
|
346 |
+ log_record.creation_timestamp.FromDatetime(creation_time)
|
|
347 |
+ log_record.domain = domain
|
|
348 |
+ log_record.level = level.value
|
|
349 |
+ log_record.message = message
|
|
350 |
+ if metadata is not None:
|
|
351 |
+ log_record.metadata.update(metadata)
|
|
352 |
+ |
|
353 |
+ return log_record
|
|
354 |
+ |
|
281 | 355 |
async def _state_monitoring_worker(self, period=1.0):
|
282 | 356 |
"""Periodically publishes state metrics to the monitoring bus."""
|
283 | 357 |
async def __state_monitoring_worker():
|
... | ... | @@ -19,12 +19,13 @@ Scheduler |
19 | 19 |
Schedules jobs.
|
20 | 20 |
"""
|
21 | 21 |
|
22 |
-from collections import deque
|
|
23 | 22 |
from datetime import timedelta
|
24 | 23 |
import logging
|
24 |
+from queue import Queue, Empty
|
|
25 | 25 |
|
26 | 26 |
from buildgrid._enums import LeaseState, OperationStage
|
27 | 27 |
from buildgrid._exceptions import NotFoundError
|
28 |
+from buildgrid.settings import MAX_JOB_BLOCK_TIME
|
|
28 | 29 |
|
29 | 30 |
|
30 | 31 |
class Scheduler:
|
... | ... | @@ -41,7 +42,7 @@ class Scheduler: |
41 | 42 |
|
42 | 43 |
self._action_cache = action_cache
|
43 | 44 |
self.jobs = {}
|
44 |
- self.queue = deque()
|
|
45 |
+ self.queue = Queue()
|
|
45 | 46 |
|
46 | 47 |
self._is_instrumented = monitor
|
47 | 48 |
|
... | ... | @@ -93,7 +94,7 @@ class Scheduler: |
93 | 94 |
action_result = self._action_cache.get_action_result(job.action_digest)
|
94 | 95 |
except NotFoundError:
|
95 | 96 |
operation_stage = OperationStage.QUEUED
|
96 |
- self.queue.append(job)
|
|
97 |
+ self.queue.put(job)
|
|
97 | 98 |
|
98 | 99 |
else:
|
99 | 100 |
job.set_cached_result(action_result)
|
... | ... | @@ -104,7 +105,7 @@ class Scheduler: |
104 | 105 |
|
105 | 106 |
else:
|
106 | 107 |
operation_stage = OperationStage.QUEUED
|
107 |
- self.queue.append(job)
|
|
108 |
+ self.queue.put(job)
|
|
108 | 109 |
|
109 | 110 |
self._update_job_operation_stage(job.name, operation_stage)
|
110 | 111 |
|
... | ... | @@ -120,25 +121,35 @@ class Scheduler: |
120 | 121 |
else:
|
121 | 122 |
operation_stage = OperationStage.QUEUED
|
122 | 123 |
job.update_lease_state(LeaseState.PENDING)
|
123 |
- self.queue.append(job)
|
|
124 |
+ self.queue.put(job)
|
|
124 | 125 |
|
125 | 126 |
self._update_job_operation_stage(job_name, operation_stage)
|
126 | 127 |
|
127 | 128 |
def list_jobs(self):
|
128 | 129 |
return self.jobs.values()
|
129 | 130 |
|
130 |
- def request_job_leases(self, worker_capabilities):
|
|
131 |
+ def request_job_leases(self, worker_capabilities, timeout=None):
|
|
131 | 132 |
"""Generates a list of the highest priority leases to be run.
|
132 | 133 |
|
133 | 134 |
Args:
|
134 | 135 |
worker_capabilities (dict): a set of key-value pairs decribing the
|
135 | 136 |
worker properties, configuration and state at the time of the
|
136 | 137 |
request.
|
138 |
+ timeout (int): time to block waiting on job queue, caps if longer
|
|
139 |
+ than MAX_JOB_BLOCK_TIME
|
|
137 | 140 |
"""
|
138 |
- if not self.queue:
|
|
141 |
+ if not timeout and self.queue.empty():
|
|
139 | 142 |
return []
|
140 | 143 |
|
141 |
- job = self.queue.popleft()
|
|
144 |
+ # Cap the timeout if it's larger than MAX_JOB_BLOCK_TIME
|
|
145 |
+ if timeout:
|
|
146 |
+ timeout = timeout if timeout < MAX_JOB_BLOCK_TIME else MAX_JOB_BLOCK_TIME
|
|
147 |
+ try:
|
|
148 |
+ job = (self.queue.get(block=True, timeout=timeout)
|
|
149 |
+ if timeout
|
|
150 |
+ else self.queue.get(block=False))
|
|
151 |
+ except Empty:
|
|
152 |
+ return []
|
|
142 | 153 |
|
143 | 154 |
lease = job.lease
|
144 | 155 |
|
... | ... | @@ -30,3 +30,15 @@ MAX_REQUEST_SIZE = 2 * 1024 * 1024 |
30 | 30 |
|
31 | 31 |
# Maximum number of elements per gRPC request:
|
32 | 32 |
MAX_REQUEST_COUNT = 500
|
33 |
+ |
|
34 |
+# String format for log records:
|
|
35 |
+LOG_RECORD_FORMAT = '%(asctime)s:[%(name)36.36s][%(levelname)5.5s]: %(message)s'
|
|
36 |
+# The different log record attributes are documented here:
|
|
37 |
+# https://docs.python.org/3/library/logging.html#logrecord-attributes
|
|
38 |
+ |
|
39 |
+# time in seconds to pad timeouts
|
|
40 |
+NETWORK_TIMEOUT = 5
|
|
41 |
+ |
|
42 |
+# Hard limit for waiting on jobs, avoids grpc timeouts not being set defaulting
|
|
43 |
+# the interval to the max int64 value
|
|
44 |
+MAX_JOB_BLOCK_TIME = 300
|
... | ... | @@ -30,6 +30,7 @@ from ..utils.utils import run_in_subprocess |
30 | 30 |
from ..utils.bots_interface import serve_bots_interface
|
31 | 31 |
|
32 | 32 |
|
33 |
+TIMEOUT = 5
|
|
33 | 34 |
INSTANCES = ['', 'instance']
|
34 | 35 |
|
35 | 36 |
|
... | ... | @@ -48,7 +49,7 @@ class ServerInterface: |
48 | 49 |
bot_session = bots_pb2.BotSession()
|
49 | 50 |
bot_session.ParseFromString(string_bot_session)
|
50 | 51 |
|
51 |
- interface = BotInterface(grpc.insecure_channel(remote))
|
|
52 |
+ interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
|
|
52 | 53 |
|
53 | 54 |
result = interface.create_bot_session(parent, bot_session)
|
54 | 55 |
queue.put(result.SerializeToString())
|
... | ... | @@ -67,7 +68,7 @@ class ServerInterface: |
67 | 68 |
bot_session = bots_pb2.BotSession()
|
68 | 69 |
bot_session.ParseFromString(string_bot_session)
|
69 | 70 |
|
70 |
- interface = BotInterface(grpc.insecure_channel(remote))
|
|
71 |
+ interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
|
|
71 | 72 |
|
72 | 73 |
result = interface.update_bot_session(bot_session, update_mask)
|
73 | 74 |
queue.put(result.SerializeToString())
|
... | ... | @@ -38,7 +38,9 @@ server = mock.create_autospec(grpc.server) |
38 | 38 |
# GRPC context
|
39 | 39 |
@pytest.fixture
|
40 | 40 |
def context():
|
41 |
- yield mock.MagicMock(spec=_Context)
|
|
41 |
+ context_mock = mock.MagicMock(spec=_Context)
|
|
42 |
+ context_mock.time_remaining.return_value = None
|
|
43 |
+ yield context_mock
|
|
42 | 44 |
|
43 | 45 |
|
44 | 46 |
@pytest.fixture
|
... | ... | @@ -123,7 +123,7 @@ class BotsInterface: |
123 | 123 |
self.__bot_session_queue.put(bot_session.SerializeToString())
|
124 | 124 |
return bot_session
|
125 | 125 |
|
126 |
- def update_bot_session(self, name, bot_session):
|
|
126 |
+ def update_bot_session(self, name, bot_session, deadline=None):
|
|
127 | 127 |
for lease in bot_session.leases:
|
128 | 128 |
state = LeaseState(lease.state)
|
129 | 129 |
if state == LeaseState.COMPLETED:
|