Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid
Commits:
-
589ffa40
by Martin Blanchard at 2018-11-06T15:12:47Z
-
e4fb8842
by Raoul Hidalgo Charman at 2018-11-07T17:25:15Z
-
d2e70ad5
by Raoul Hidalgo Charman at 2018-11-07T17:25:15Z
10 changed files:
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/bot/bot.py
- buildgrid/bot/bot_interface.py
- buildgrid/bot/bot_session.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- setup.py
- tests/integration/bots_service.py
Changes:
... | ... | @@ -52,7 +52,8 @@ from ..cli import pass_context |
52 | 52 |
help="Public CAS client certificate for TLS (PEM-encoded)")
|
53 | 53 |
@click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
|
54 | 54 |
help="Public CAS server certificate for TLS (PEM-encoded)")
|
55 |
-@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
|
|
55 |
+# TODO change default to 30
|
|
56 |
+@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
|
|
56 | 57 |
help="Time period for bot updates to the server in seconds.")
|
57 | 58 |
@click.option('--parent', type=click.STRING, default='main', show_default=True,
|
58 | 59 |
help="Targeted farm resource.")
|
... | ... | @@ -64,7 +65,6 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ |
64 | 65 |
|
65 | 66 |
context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
|
66 | 67 |
context.remote_url = remote
|
67 |
- context.update_period = update_period
|
|
68 | 68 |
context.parent = parent
|
69 | 69 |
|
70 | 70 |
if url.scheme == 'http':
|
... | ... | @@ -123,7 +123,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ |
123 | 123 |
context.logger = logging.getLogger(__name__)
|
124 | 124 |
context.logger.debug("Starting for remote {}".format(context.remote))
|
125 | 125 |
|
126 |
- interface = bot_interface.BotInterface(context.channel)
|
|
126 |
+ interface = bot_interface.BotInterface(context.channel, update_period)
|
|
127 | 127 |
|
128 | 128 |
worker = Worker()
|
129 | 129 |
worker.add_device(Device())
|
... | ... | @@ -141,7 +141,7 @@ def run_dummy(context): |
141 | 141 |
Creates a session, accepts leases, does fake work and updates the server.
|
142 | 142 |
"""
|
143 | 143 |
try:
|
144 |
- b = bot.Bot(context.bot_session, context.update_period)
|
|
144 |
+ b = bot.Bot(context.bot_session)
|
|
145 | 145 |
b.session(dummy.work_dummy,
|
146 | 146 |
context)
|
147 | 147 |
except KeyboardInterrupt:
|
... | ... | @@ -156,7 +156,7 @@ def run_host_tools(context): |
156 | 156 |
result back to CAS.
|
157 | 157 |
"""
|
158 | 158 |
try:
|
159 |
- b = bot.Bot(context.bot_session, context.update_period)
|
|
159 |
+ b = bot.Bot(context.bot_session)
|
|
160 | 160 |
b.session(host.work_host_tools,
|
161 | 161 |
context)
|
162 | 162 |
except KeyboardInterrupt:
|
... | ... | @@ -177,7 +177,7 @@ def run_buildbox(context, local_cas, fuse_dir): |
177 | 177 |
context.fuse_dir = fuse_dir
|
178 | 178 |
|
179 | 179 |
try:
|
180 |
- b = bot.Bot(context.bot_session, context.update_period)
|
|
180 |
+ b = bot.Bot(context.bot_session)
|
|
181 | 181 |
b.session(buildbox.work_buildbox,
|
182 | 182 |
context)
|
183 | 183 |
except KeyboardInterrupt:
|
... | ... | @@ -29,19 +29,16 @@ class Bot: |
29 | 29 |
Creates a local BotSession.
|
30 | 30 |
"""
|
31 | 31 |
|
32 |
- def __init__(self, bot_session, update_period=1):
|
|
32 |
+ def __init__(self, bot_session):
|
|
33 | 33 |
self.logger = logging.getLogger(__name__)
|
34 | 34 |
|
35 | 35 |
self._bot_session = bot_session
|
36 |
- self._update_period = update_period
|
|
37 | 36 |
|
38 | 37 |
def session(self, work, context):
|
39 | 38 |
loop = asyncio.get_event_loop()
|
40 | 39 |
|
41 |
- self._bot_session.create_bot_session(work, context)
|
|
42 |
- |
|
43 | 40 |
try:
|
44 |
- task = asyncio.ensure_future(self._update_bot_session())
|
|
41 |
+ task = asyncio.ensure_future(self._bot_session.run(work, context))
|
|
45 | 42 |
loop.run_forever()
|
46 | 43 |
except KeyboardInterrupt:
|
47 | 44 |
pass
|
... | ... | @@ -49,10 +46,19 @@ class Bot: |
49 | 46 |
task.cancel()
|
50 | 47 |
loop.close()
|
51 | 48 |
|
52 |
- async def _update_bot_session(self):
|
|
49 |
+ async def _run_bot_session(self, work, context):
|
|
53 | 50 |
"""
|
54 | 51 |
Calls the server periodically to inform the server the client has not died.
|
55 | 52 |
"""
|
56 | 53 |
while True:
|
57 |
- self._bot_session.update_bot_session()
|
|
58 |
- await asyncio.sleep(self._update_period)
|
|
54 |
+ if self._bot_session.connected is False:
|
|
55 |
+ self._bot_session.create_bot_session(work, context)
|
|
56 |
+ else:
|
|
57 |
+ self._bot_session.update_bot_session()
|
|
58 |
+ |
|
59 |
+ if self._bot_session._futures:
|
|
60 |
+ await asyncio.wait(self._bot_session._futures.values(),
|
|
61 |
+ timeout=30,
|
|
62 |
+ return_when=asyncio.FIRST_COMPLETED)
|
|
63 |
+ elif self._bot_session.connected is False:
|
|
64 |
+ await asyncio.sleep(30)
|
... | ... | @@ -21,8 +21,10 @@ Interface to grpc |
21 | 21 |
"""
|
22 | 22 |
|
23 | 23 |
import logging
|
24 |
+import grpc
|
|
24 | 25 |
|
25 | 26 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
|
27 |
+from ..settings import INTERVAL_BUFFER
|
|
26 | 28 |
|
27 | 29 |
|
28 | 30 |
class BotInterface:
|
... | ... | @@ -30,18 +32,27 @@ class BotInterface: |
30 | 32 |
Interface handles calls to the server.
|
31 | 33 |
"""
|
32 | 34 |
|
33 |
- def __init__(self, channel):
|
|
35 |
+ def __init__(self, channel, interval):
|
|
34 | 36 |
self.logger = logging.getLogger(__name__)
|
35 | 37 |
self.logger.info(channel)
|
36 | 38 |
self._stub = bots_pb2_grpc.BotsStub(channel)
|
39 |
+ self.interval = interval
|
|
37 | 40 |
|
38 | 41 |
def create_bot_session(self, parent, bot_session):
|
39 | 42 |
request = bots_pb2.CreateBotSessionRequest(parent=parent,
|
40 | 43 |
bot_session=bot_session)
|
41 |
- return self._stub.CreateBotSession(request)
|
|
44 |
+ return self._bot_call(self._stub.CreateBotSession, request)
|
|
42 | 45 |
|
43 | 46 |
def update_bot_session(self, bot_session, update_mask=None):
|
44 | 47 |
request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
|
45 | 48 |
bot_session=bot_session,
|
46 | 49 |
update_mask=update_mask)
|
47 |
- return self._stub.UpdateBotSession(request)
|
|
50 |
+ return self._bot_call(self._stub.UpdateBotSession, request)
|
|
51 |
+ |
|
52 |
+ def _bot_call(self, call, request):
|
|
53 |
+ try:
|
|
54 |
+ return call(request, timeout=self.interval + INTERVAL_BUFFER)
|
|
55 |
+ except grpc.RpcError as e:
|
|
56 |
+ if e.code() in grpc.StatusCode:
|
|
57 |
+ self.logger.warning("Server responded with error: {}".format(e.code()))
|
|
58 |
+ return None
|
... | ... | @@ -49,7 +49,9 @@ class BotSession: |
49 | 49 |
self._bot_id = '{}.{}'.format(parent, platform.node())
|
50 | 50 |
self._context = None
|
51 | 51 |
self._interface = interface
|
52 |
+ self.connected = False
|
|
52 | 53 |
self._leases = {}
|
54 |
+ self._futures = {}
|
|
53 | 55 |
self._name = None
|
54 | 56 |
self._parent = parent
|
55 | 57 |
self._status = BotStatus.OK.value
|
... | ... | @@ -63,12 +65,31 @@ class BotSession: |
63 | 65 |
def add_worker(self, worker):
|
64 | 66 |
self._worker = worker
|
65 | 67 |
|
68 |
+ async def run(self, work, context=None):
|
|
69 |
+ self.logger.info("Starting bot session runner")
|
|
70 |
+ while True:
|
|
71 |
+ if self.connected is False:
|
|
72 |
+ self.create_bot_session(work, context)
|
|
73 |
+ else:
|
|
74 |
+ self.update_bot_session()
|
|
75 |
+ |
|
76 |
+ if self._futures:
|
|
77 |
+ await asyncio.wait(self._futures.values(),
|
|
78 |
+ timeout=self._interface.interval,
|
|
79 |
+ return_when=asyncio.FIRST_COMPLETED)
|
|
80 |
+ elif self.connected is False:
|
|
81 |
+ await asyncio.sleep(self._interface.interval)
|
|
82 |
+ |
|
66 | 83 |
def create_bot_session(self, work, context=None):
|
67 | 84 |
self.logger.debug("Creating bot session")
|
68 | 85 |
self._work = work
|
69 | 86 |
self._context = context
|
70 | 87 |
|
71 | 88 |
session = self._interface.create_bot_session(self._parent, self.get_pb2())
|
89 |
+ if session is None:
|
|
90 |
+ self.connected = False
|
|
91 |
+ return
|
|
92 |
+ self.connected = True
|
|
72 | 93 |
self._name = session.name
|
73 | 94 |
|
74 | 95 |
self.logger.info("Created bot session with name: [{}]".format(self._name))
|
... | ... | @@ -79,6 +100,10 @@ class BotSession: |
79 | 100 |
def update_bot_session(self):
|
80 | 101 |
self.logger.debug("Updating bot session: [{}]".format(self._bot_id))
|
81 | 102 |
session = self._interface.update_bot_session(self.get_pb2())
|
103 |
+ if session is None:
|
|
104 |
+ self.connected = False
|
|
105 |
+ return
|
|
106 |
+ self.connected = True
|
|
82 | 107 |
for k, v in list(self._leases.items()):
|
83 | 108 |
if v.state == LeaseState.COMPLETED.value:
|
84 | 109 |
del self._leases[k]
|
... | ... | @@ -110,7 +135,7 @@ class BotSession: |
110 | 135 |
lease.state = LeaseState.ACTIVE.value
|
111 | 136 |
self._leases[lease.id] = lease
|
112 | 137 |
self.update_bot_session()
|
113 |
- asyncio.ensure_future(self.create_work(lease))
|
|
138 |
+ self._futures[lease.id] = asyncio.ensure_future(self.create_work(lease))
|
|
114 | 139 |
|
115 | 140 |
async def create_work(self, lease):
|
116 | 141 |
self.logger.debug("Work created: [{}]".format(lease.id))
|
... | ... | @@ -133,6 +158,7 @@ class BotSession: |
133 | 158 |
|
134 | 159 |
self.logger.debug("Work complete: [{}]".format(lease.id))
|
135 | 160 |
self.lease_completed(lease)
|
161 |
+ del self._futures[lease.id]
|
|
136 | 162 |
|
137 | 163 |
|
138 | 164 |
class Worker:
|
... | ... | @@ -26,6 +26,7 @@ import uuid |
26 | 26 |
from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
|
27 | 27 |
|
28 | 28 |
from ..job import LeaseState
|
29 |
+from ...settings import INTERVAL_BUFFER
|
|
29 | 30 |
|
30 | 31 |
|
31 | 32 |
class BotsInterface:
|
... | ... | @@ -73,7 +74,7 @@ class BotsInterface: |
73 | 74 |
|
74 | 75 |
return bot_session
|
75 | 76 |
|
76 |
- def update_bot_session(self, name, bot_session):
|
|
77 |
+ def update_bot_session(self, name, bot_session, deadline=None):
|
|
77 | 78 |
""" Client updates the server. Any changes in state to the Lease should be
|
78 | 79 |
registered server side. Assigns available leases with work.
|
79 | 80 |
"""
|
... | ... | @@ -87,7 +88,9 @@ class BotsInterface: |
87 | 88 |
|
88 | 89 |
# TODO: Send worker capabilities to the scheduler!
|
89 | 90 |
if not bot_session.leases:
|
90 |
- leases = self._scheduler.request_job_leases({})
|
|
91 |
+ leases = self._scheduler.request_job_leases(
|
|
92 |
+ {}, block=True if deadline else None,
|
|
93 |
+ timeout=deadline - INTERVAL_BUFFER if deadline else None)
|
|
91 | 94 |
if leases:
|
92 | 95 |
bot_session.leases.extend(leases)
|
93 | 96 |
|
... | ... | @@ -64,8 +64,10 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
64 | 64 |
instance_name = ''.join(names[0:-1])
|
65 | 65 |
|
66 | 66 |
instance = self._get_instance(instance_name)
|
67 |
+ # server side context time_remaining is maxint - unix time
|
|
67 | 68 |
return instance.update_bot_session(request.name,
|
68 |
- request.bot_session)
|
|
69 |
+ request.bot_session,
|
|
70 |
+ context.time_remaining())
|
|
69 | 71 |
|
70 | 72 |
except InvalidArgumentError as e:
|
71 | 73 |
self.logger.error(e)
|
... | ... | @@ -19,7 +19,7 @@ Scheduler |
19 | 19 |
Schedules jobs.
|
20 | 20 |
"""
|
21 | 21 |
|
22 |
-from collections import deque
|
|
22 |
+from queue import Queue, Empty
|
|
23 | 23 |
|
24 | 24 |
from buildgrid._exceptions import NotFoundError
|
25 | 25 |
|
... | ... | @@ -33,7 +33,7 @@ class Scheduler: |
33 | 33 |
def __init__(self, action_cache=None):
|
34 | 34 |
self._action_cache = action_cache
|
35 | 35 |
self.jobs = {}
|
36 |
- self.queue = deque()
|
|
36 |
+ self.queue = Queue()
|
|
37 | 37 |
|
38 | 38 |
def register_client(self, job_name, queue):
|
39 | 39 |
self.jobs[job_name].register_client(queue)
|
... | ... | @@ -53,7 +53,7 @@ class Scheduler: |
53 | 53 |
action_result = self._action_cache.get_action_result(job.action_digest)
|
54 | 54 |
except NotFoundError:
|
55 | 55 |
operation_stage = OperationStage.QUEUED
|
56 |
- self.queue.append(job)
|
|
56 |
+ self.queue.put(job)
|
|
57 | 57 |
|
58 | 58 |
else:
|
59 | 59 |
job.set_cached_result(action_result)
|
... | ... | @@ -61,7 +61,7 @@ class Scheduler: |
61 | 61 |
|
62 | 62 |
else:
|
63 | 63 |
operation_stage = OperationStage.QUEUED
|
64 |
- self.queue.append(job)
|
|
64 |
+ self.queue.put(job)
|
|
65 | 65 |
|
66 | 66 |
job.update_operation_stage(operation_stage)
|
67 | 67 |
|
... | ... | @@ -74,12 +74,12 @@ class Scheduler: |
74 | 74 |
# TODO: Mark these jobs as done
|
75 | 75 |
else:
|
76 | 76 |
job.update_operation_stage(OperationStage.QUEUED)
|
77 |
- self.queue.appendleft(job)
|
|
77 |
+ self.queue.queue.appendleft(job)
|
|
78 | 78 |
|
79 | 79 |
def list_jobs(self):
|
80 | 80 |
return self.jobs.values()
|
81 | 81 |
|
82 |
- def request_job_leases(self, worker_capabilities):
|
|
82 |
+ def request_job_leases(self, worker_capabilities, block=False, timeout=None):
|
|
83 | 83 |
"""Generates a list of the highest priority leases to be run.
|
84 | 84 |
|
85 | 85 |
Args:
|
... | ... | @@ -87,10 +87,13 @@ class Scheduler: |
87 | 87 |
worker properties, configuration and state at the time of the
|
88 | 88 |
request.
|
89 | 89 |
"""
|
90 |
- if not self.queue:
|
|
90 |
+ if not block and self.queue.empty():
|
|
91 | 91 |
return []
|
92 | 92 |
|
93 |
- job = self.queue.popleft()
|
|
93 |
+ try:
|
|
94 |
+ job = self.queue.get(block, timeout)
|
|
95 |
+ except Empty:
|
|
96 |
+ return []
|
|
94 | 97 |
# For now, one lease at a time:
|
95 | 98 |
lease = job.create_lease()
|
96 | 99 |
|
... | ... | @@ -4,3 +4,6 @@ import hashlib |
4 | 4 |
# The hash function that CAS uses
|
5 | 5 |
HASH = hashlib.sha256
|
6 | 6 |
HASH_LENGTH = HASH().digest_size * 2
|
7 |
+ |
|
8 |
+# time in seconds to pad timeouts
|
|
9 |
+INTERVAL_BUFFER = 5
|
... | ... | @@ -87,7 +87,7 @@ def get_cmdclass(): |
87 | 87 |
|
88 | 88 |
tests_require = [
|
89 | 89 |
'coverage >= 4.5.0',
|
90 |
- 'moto',
|
|
90 |
+ 'moto < 1.3.7',
|
|
91 | 91 |
'pep8',
|
92 | 92 |
'psutil',
|
93 | 93 |
'pytest >= 3.8.0',
|
... | ... | @@ -39,7 +39,9 @@ server = mock.create_autospec(grpc.server) |
39 | 39 |
# GRPC context
|
40 | 40 |
@pytest.fixture
|
41 | 41 |
def context():
|
42 |
- yield mock.MagicMock(spec=_Context)
|
|
42 |
+ context_mock = mock.MagicMock(spec=_Context)
|
|
43 |
+ context_mock.time_remaining.return_value = None
|
|
44 |
+ yield context_mock
|
|
43 | 45 |
|
44 | 46 |
|
45 | 47 |
@pytest.fixture
|
... | ... | @@ -91,7 +93,6 @@ def test_update_bot_session(bot_session, context, instance): |
91 | 93 |
|
92 | 94 |
request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
|
93 | 95 |
bot_session=bot)
|
94 |
- |
|
95 | 96 |
response = instance.UpdateBotSession(request, context)
|
96 | 97 |
|
97 | 98 |
assert isinstance(response, bots_pb2.BotSession)
|