Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid
Commits:
-
8187d48b
by Raoul Hidalgo Charman at 2018-11-02T12:37:33Z
7 changed files:
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/bot/bot_interface.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- tests/integration/bots_service.py
Changes:
... | ... | @@ -33,9 +33,12 @@ from buildgrid.bot.bot_session import BotSession, Device, Worker |
33 | 33 |
|
34 | 34 |
from ..bots import buildbox, dummy, host
|
35 | 35 |
from ..cli import pass_context
|
36 |
+from ...settings import INTERVAL_BUFFER
|
|
36 | 37 |
|
37 | 38 |
|
38 | 39 |
@click.group(name='bot', short_help="Create and register bot clients.")
|
40 |
+@click.option('--interval', type=click.INT, default=30,
|
|
41 |
+ help="Interval for calling central server")
|
|
39 | 42 |
@click.option('--remote', type=click.STRING, default='http://localhost:50051', show_default=True,
|
40 | 43 |
help="Remote execution server's URL (port defaults to 50051 if not specified).")
|
41 | 44 |
@click.option('--client-key', type=click.Path(exists=True, dir_okay=False), default=None,
|
... | ... | @@ -58,7 +61,7 @@ from ..cli import pass_context |
58 | 61 |
help="Targeted farm resource.")
|
59 | 62 |
@pass_context
|
60 | 63 |
def cli(context, parent, update_period, remote, client_key, client_cert, server_cert,
|
61 |
- remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
|
|
64 |
+ remote_cas, cas_client_key, cas_client_cert, cas_server_cert, interval):
|
|
62 | 65 |
# Setup the remote execution server channel:
|
63 | 66 |
url = urlparse(remote)
|
64 | 67 |
|
... | ... | @@ -123,7 +126,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ |
123 | 126 |
context.logger = logging.getLogger(__name__)
|
124 | 127 |
context.logger.debug("Starting for remote {}".format(context.remote))
|
125 | 128 |
|
126 |
- interface = bot_interface.BotInterface(context.channel)
|
|
129 |
+ interface = bot_interface.BotInterface(context.channel, interval + INTERVAL_BUFFER)
|
|
127 | 130 |
|
128 | 131 |
worker = Worker()
|
129 | 132 |
worker.add_device(Device())
|
... | ... | @@ -30,10 +30,11 @@ class BotInterface: |
30 | 30 |
Interface handles calls to the server.
|
31 | 31 |
"""
|
32 | 32 |
|
33 |
- def __init__(self, channel):
|
|
33 |
+ def __init__(self, channel, interval):
|
|
34 | 34 |
self.logger = logging.getLogger(__name__)
|
35 | 35 |
self.logger.info(channel)
|
36 | 36 |
self._stub = bots_pb2_grpc.BotsStub(channel)
|
37 |
+ self._interval = interval
|
|
37 | 38 |
|
38 | 39 |
def create_bot_session(self, parent, bot_session):
|
39 | 40 |
request = bots_pb2.CreateBotSessionRequest(parent=parent,
|
... | ... | @@ -44,4 +45,4 @@ class BotInterface: |
44 | 45 |
request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
|
45 | 46 |
bot_session=bot_session,
|
46 | 47 |
update_mask=update_mask)
|
47 |
- return self._stub.UpdateBotSession(request)
|
|
48 |
+ return self._stub.UpdateBotSession(request, timeout=self._interval)
|
... | ... | @@ -26,6 +26,7 @@ import uuid |
26 | 26 |
from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
|
27 | 27 |
|
28 | 28 |
from ..job import LeaseState
|
29 |
+from ...settings import INTERVAL_BUFFER
|
|
29 | 30 |
|
30 | 31 |
|
31 | 32 |
class BotsInterface:
|
... | ... | @@ -73,7 +74,7 @@ class BotsInterface: |
73 | 74 |
|
74 | 75 |
return bot_session
|
75 | 76 |
|
76 |
- def update_bot_session(self, name, bot_session):
|
|
77 |
+ def update_bot_session(self, name, bot_session, deadline=None):
|
|
77 | 78 |
""" Client updates the server. Any changes in state to the Lease should be
|
78 | 79 |
registered server side. Assigns available leases with work.
|
79 | 80 |
"""
|
... | ... | @@ -87,7 +88,9 @@ class BotsInterface: |
87 | 88 |
|
88 | 89 |
# TODO: Send worker capabilities to the scheduler!
|
89 | 90 |
if not bot_session.leases:
|
90 |
- leases = self._scheduler.request_job_leases({})
|
|
91 |
+ leases = self._scheduler.request_job_leases(
|
|
92 |
+ {}, block=True if deadline else None,
|
|
93 |
+ timeout=deadline - INTERVAL_BUFFER if deadline else None)
|
|
91 | 94 |
if leases:
|
92 | 95 |
bot_session.leases.extend(leases)
|
93 | 96 |
|
... | ... | @@ -64,8 +64,11 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
64 | 64 |
instance_name = ''.join(names[0:-1])
|
65 | 65 |
|
66 | 66 |
instance = self._get_instance(instance_name)
|
67 |
+ # server side context time_remaining is maxint - unix time
|
|
68 |
+ print(context.time_remaining())
|
|
67 | 69 |
return instance.update_bot_session(request.name,
|
68 |
- request.bot_session)
|
|
70 |
+ request.bot_session,
|
|
71 |
+ context.time_remaining())
|
|
69 | 72 |
|
70 | 73 |
except InvalidArgumentError as e:
|
71 | 74 |
self.logger.error(e)
|
... | ... | @@ -19,7 +19,7 @@ Scheduler |
19 | 19 |
Schedules jobs.
|
20 | 20 |
"""
|
21 | 21 |
|
22 |
-from collections import deque
|
|
22 |
+from queue import Queue, Empty
|
|
23 | 23 |
|
24 | 24 |
from buildgrid._exceptions import NotFoundError
|
25 | 25 |
|
... | ... | @@ -33,7 +33,7 @@ class Scheduler: |
33 | 33 |
def __init__(self, action_cache=None):
|
34 | 34 |
self._action_cache = action_cache
|
35 | 35 |
self.jobs = {}
|
36 |
- self.queue = deque()
|
|
36 |
+ self.queue = Queue()
|
|
37 | 37 |
|
38 | 38 |
def register_client(self, job_name, queue):
|
39 | 39 |
self.jobs[job_name].register_client(queue)
|
... | ... | @@ -53,7 +53,7 @@ class Scheduler: |
53 | 53 |
action_result = self._action_cache.get_action_result(job.action_digest)
|
54 | 54 |
except NotFoundError:
|
55 | 55 |
operation_stage = OperationStage.QUEUED
|
56 |
- self.queue.append(job)
|
|
56 |
+ self.queue.put(job)
|
|
57 | 57 |
|
58 | 58 |
else:
|
59 | 59 |
job.set_cached_result(action_result)
|
... | ... | @@ -61,7 +61,7 @@ class Scheduler: |
61 | 61 |
|
62 | 62 |
else:
|
63 | 63 |
operation_stage = OperationStage.QUEUED
|
64 |
- self.queue.append(job)
|
|
64 |
+ self.queue.put(job)
|
|
65 | 65 |
|
66 | 66 |
job.update_operation_stage(operation_stage)
|
67 | 67 |
|
... | ... | @@ -74,12 +74,12 @@ class Scheduler: |
74 | 74 |
# TODO: Mark these jobs as done
|
75 | 75 |
else:
|
76 | 76 |
job.update_operation_stage(OperationStage.QUEUED)
|
77 |
- self.queue.appendleft(job)
|
|
77 |
+ self.queue.queue.appendleft(job)
|
|
78 | 78 |
|
79 | 79 |
def list_jobs(self):
|
80 | 80 |
return self.jobs.values()
|
81 | 81 |
|
82 |
- def request_job_leases(self, worker_capabilities):
|
|
82 |
+ def request_job_leases(self, worker_capabilities, block=False, timeout=None):
|
|
83 | 83 |
"""Generates a list of the highest priority leases to be run.
|
84 | 84 |
|
85 | 85 |
Args:
|
... | ... | @@ -87,10 +87,13 @@ class Scheduler: |
87 | 87 |
worker properties, configuration and state at the time of the
|
88 | 88 |
request.
|
89 | 89 |
"""
|
90 |
- if not self.queue:
|
|
90 |
+ if not block and self.queue.empty():
|
|
91 | 91 |
return []
|
92 | 92 |
|
93 |
- job = self.queue.popleft()
|
|
93 |
+ try:
|
|
94 |
+ job = self.queue.get(block, timeout)
|
|
95 |
+ except Empty:
|
|
96 |
+ return []
|
|
94 | 97 |
# For now, one lease at a time:
|
95 | 98 |
lease = job.create_lease()
|
96 | 99 |
|
... | ... | @@ -4,3 +4,6 @@ import hashlib |
4 | 4 |
# The hash function that CAS uses
|
5 | 5 |
HASH = hashlib.sha256
|
6 | 6 |
HASH_LENGTH = HASH().digest_size * 2
|
7 |
+ |
|
8 |
+# time in seconds to pad timeouts
|
|
9 |
+INTERVAL_BUFFER = 5
|
... | ... | @@ -39,7 +39,9 @@ server = mock.create_autospec(grpc.server) |
39 | 39 |
# GRPC context
|
40 | 40 |
@pytest.fixture
|
41 | 41 |
def context():
|
42 |
- yield mock.MagicMock(spec=_Context)
|
|
42 |
+ context_mock = mock.MagicMock(spec=_Context)
|
|
43 |
+ context_mock.time_remaining.return_value = None
|
|
44 |
+ yield context_mock
|
|
43 | 45 |
|
44 | 46 |
|
45 | 47 |
@pytest.fixture
|
... | ... | @@ -91,7 +93,6 @@ def test_update_bot_session(bot_session, context, instance): |
91 | 93 |
|
92 | 94 |
request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
|
93 | 95 |
bot_session=bot)
|
94 |
- |
|
95 | 96 |
response = instance.UpdateBotSession(request, context)
|
96 | 97 |
|
97 | 98 |
assert isinstance(response, bots_pb2.BotSession)
|