[Notes] [Git][BuildGrid/buildgrid][raoul/smarter-bot-calls] 2 commits: Smarter bot calls



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid

Commits:

9 changed files:

Changes:

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -33,6 +33,7 @@ from buildgrid.bot.bot_session import BotSession, Device, Worker
    33 33
     
    
    34 34
     from ..bots import buildbox, dummy, host
    
    35 35
     from ..cli import pass_context
    
    36
    +from ...settings import INTERVAL_BUFFER
    
    36 37
     
    
    37 38
     
    
    38 39
     @click.group(name='bot', short_help="Create and register bot clients.")
    
    ... ... @@ -52,7 +53,7 @@ from ..cli import pass_context
    52 53
                   help="Public CAS client certificate for TLS (PEM-encoded)")
    
    53 54
     @click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
    
    54 55
                   help="Public CAS server certificate for TLS (PEM-encoded)")
    
    55
    -@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
    
    56
    +@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
    
    56 57
                   help="Time period for bot updates to the server in seconds.")
    
    57 58
     @click.option('--parent', type=click.STRING, default='main', show_default=True,
    
    58 59
                   help="Targeted farm resource.")
    
    ... ... @@ -64,7 +65,6 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
    64 65
     
    
    65 66
         context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
    
    66 67
         context.remote_url = remote
    
    67
    -    context.update_period = update_period
    
    68 68
         context.parent = parent
    
    69 69
     
    
    70 70
         if url.scheme == 'http':
    
    ... ... @@ -123,7 +123,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
    123 123
         context.logger = logging.getLogger(__name__)
    
    124 124
         context.logger.debug("Starting for remote {}".format(context.remote))
    
    125 125
     
    
    126
    -    interface = bot_interface.BotInterface(context.channel)
    
    126
    +    interface = bot_interface.BotInterface(context.channel, update_period)
    
    127 127
     
    
    128 128
         worker = Worker()
    
    129 129
         worker.add_device(Device())
    
    ... ... @@ -141,7 +141,7 @@ def run_dummy(context):
    141 141
         Creates a session, accepts leases, does fake work and updates the server.
    
    142 142
         """
    
    143 143
         try:
    
    144
    -        b = bot.Bot(context.bot_session, context.update_period)
    
    144
    +        b = bot.Bot(context.bot_session)
    
    145 145
             b.session(dummy.work_dummy,
    
    146 146
                       context)
    
    147 147
         except KeyboardInterrupt:
    
    ... ... @@ -156,7 +156,7 @@ def run_host_tools(context):
    156 156
         result back to CAS.
    
    157 157
         """
    
    158 158
         try:
    
    159
    -        b = bot.Bot(context.bot_session, context.update_period)
    
    159
    +        b = bot.Bot(context.bot_session)
    
    160 160
             b.session(host.work_host_tools,
    
    161 161
                       context)
    
    162 162
         except KeyboardInterrupt:
    
    ... ... @@ -177,7 +177,7 @@ def run_buildbox(context, local_cas, fuse_dir):
    177 177
         context.fuse_dir = fuse_dir
    
    178 178
     
    
    179 179
         try:
    
    180
    -        b = bot.Bot(context.bot_session, context.update_period)
    
    180
    +        b = bot.Bot(context.bot_session)
    
    181 181
             b.session(buildbox.work_buildbox,
    
    182 182
                       context)
    
    183 183
         except KeyboardInterrupt:
    

  • buildgrid/bot/bot.py
    ... ... @@ -29,11 +29,10 @@ class Bot:
    29 29
         Creates a local BotSession.
    
    30 30
         """
    
    31 31
     
    
    32
    -    def __init__(self, bot_session, update_period=1):
    
    32
    +    def __init__(self, bot_session):
    
    33 33
             self.logger = logging.getLogger(__name__)
    
    34 34
     
    
    35 35
             self._bot_session = bot_session
    
    36
    -        self._update_period = update_period
    
    37 36
     
    
    38 37
         def session(self, work, context):
    
    39 38
             loop = asyncio.get_event_loop()
    
    ... ... @@ -55,4 +54,3 @@ class Bot:
    55 54
             """
    
    56 55
             while True:
    
    57 56
                 self._bot_session.update_bot_session()
    58
    -            await asyncio.sleep(self._update_period)

  • buildgrid/bot/bot_interface.py
    ... ... @@ -21,7 +21,10 @@ Interface to grpc
    21 21
     """
    
    22 22
     
    
    23 23
     import logging
    
    24
    +from ..utils import timeout
    
    25
    +from ..settings import INTERVAL_BUFFER
    
    24 26
     
    
    27
    +import grpc
    
    25 28
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
    
    26 29
     
    
    27 30
     
    
    ... ... @@ -30,10 +33,11 @@ class BotInterface:
    30 33
         Interface handles calls to the server.
    
    31 34
         """
    
    32 35
     
    
    33
    -    def __init__(self, channel):
    
    36
    +    def __init__(self, channel, interval):
    
    34 37
             self.logger = logging.getLogger(__name__)
    
    35 38
             self.logger.info(channel)
    
    36 39
             self._stub = bots_pb2_grpc.BotsStub(channel)
    
    40
    +        self._interval = interval
    
    37 41
     
    
    38 42
         def create_bot_session(self, parent, bot_session):
    
    39 43
             request = bots_pb2.CreateBotSessionRequest(parent=parent,
    
    ... ... @@ -44,4 +48,15 @@ class BotInterface:
    44 48
             request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
    
    45 49
                                                        bot_session=bot_session,
    
    46 50
                                                        update_mask=update_mask)
    
    47
    -        return self._stub.UpdateBotSession(request)
    51
    +        try:
    
    52
    +            with timeout(30):
    
    53
    +                return self._stub.UpdateBotSession(
    
    54
    +                    request, timeout=self._interval + INTERVAL_BUFFER)
    
    55
    +        except grpc.StatusCode.DEADLINE_EXCEEDED:
    
    56
    +            self.logger.info("Server didn't respond")
    
    57
    +            return None
    
    58
    +        except TimeoutError:
    
    59
    +            self.logger.info("server didn't respond")
    
    60
    +        except Exception as e:
    
    61
    +            self.logger.info("Some error: {}".format(e))
    
    62
    +

  • buildgrid/server/bots/instance.py
    ... ... @@ -26,6 +26,7 @@ import uuid
    26 26
     from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    27 27
     
    
    28 28
     from ..job import LeaseState
    
    29
    +from ...settings import INTERVAL_BUFFER
    
    29 30
     
    
    30 31
     
    
    31 32
     class BotsInterface:
    
    ... ... @@ -73,7 +74,7 @@ class BotsInterface:
    73 74
     
    
    74 75
             return bot_session
    
    75 76
     
    
    76
    -    def update_bot_session(self, name, bot_session):
    
    77
    +    def update_bot_session(self, name, bot_session, deadline=None):
    
    77 78
             """ Client updates the server. Any changes in state to the Lease should be
    
    78 79
             registered server side. Assigns available leases with work.
    
    79 80
             """
    
    ... ... @@ -87,7 +88,9 @@ class BotsInterface:
    87 88
     
    
    88 89
             # TODO: Send worker capabilities to the scheduler!
    
    89 90
             if not bot_session.leases:
    
    90
    -            leases = self._scheduler.request_job_leases({})
    
    91
    +            leases = self._scheduler.request_job_leases(
    
    92
    +                {}, block=True if deadline else None,
    
    93
    +                timeout=deadline - INTERVAL_BUFFER if deadline else None)
    
    91 94
                 if leases:
    
    92 95
                     bot_session.leases.extend(leases)
    
    93 96
     
    

  • buildgrid/server/bots/service.py
    ... ... @@ -64,8 +64,10 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    64 64
                 instance_name = ''.join(names[0:-1])
    
    65 65
     
    
    66 66
                 instance = self._get_instance(instance_name)
    
    67
    +            # server side context time_remaining is maxint - unix time
    
    67 68
                 return instance.update_bot_session(request.name,
    
    68
    -                                               request.bot_session)
    
    69
    +                                               request.bot_session,
    
    70
    +                                               context.time_remaining())
    
    69 71
     
    
    70 72
             except InvalidArgumentError as e:
    
    71 73
                 self.logger.error(e)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -19,7 +19,7 @@ Scheduler
    19 19
     Schedules jobs.
    
    20 20
     """
    
    21 21
     
    
    22
    -from collections import deque
    
    22
    +from queue import Queue, Empty
    
    23 23
     
    
    24 24
     from buildgrid._exceptions import NotFoundError
    
    25 25
     
    
    ... ... @@ -33,7 +33,7 @@ class Scheduler:
    33 33
         def __init__(self, action_cache=None):
    
    34 34
             self._action_cache = action_cache
    
    35 35
             self.jobs = {}
    
    36
    -        self.queue = deque()
    
    36
    +        self.queue = Queue()
    
    37 37
     
    
    38 38
         def register_client(self, job_name, queue):
    
    39 39
             self.jobs[job_name].register_client(queue)
    
    ... ... @@ -53,7 +53,7 @@ class Scheduler:
    53 53
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    54 54
                 except NotFoundError:
    
    55 55
                     operation_stage = OperationStage.QUEUED
    
    56
    -                self.queue.append(job)
    
    56
    +                self.queue.put(job)
    
    57 57
     
    
    58 58
                 else:
    
    59 59
                     job.set_cached_result(action_result)
    
    ... ... @@ -61,7 +61,7 @@ class Scheduler:
    61 61
     
    
    62 62
             else:
    
    63 63
                 operation_stage = OperationStage.QUEUED
    
    64
    -            self.queue.append(job)
    
    64
    +            self.queue.put(job)
    
    65 65
     
    
    66 66
             job.update_operation_stage(operation_stage)
    
    67 67
     
    
    ... ... @@ -74,12 +74,12 @@ class Scheduler:
    74 74
                     # TODO: Mark these jobs as done
    
    75 75
                 else:
    
    76 76
                     job.update_operation_stage(OperationStage.QUEUED)
    
    77
    -                self.queue.appendleft(job)
    
    77
    +                self.queue.queue.appendleft(job)
    
    78 78
     
    
    79 79
         def list_jobs(self):
    
    80 80
             return self.jobs.values()
    
    81 81
     
    
    82
    -    def request_job_leases(self, worker_capabilities):
    
    82
    +    def request_job_leases(self, worker_capabilities, block=False, timeout=None):
    
    83 83
             """Generates a list of the highest priority leases to be run.
    
    84 84
     
    
    85 85
             Args:
    
    ... ... @@ -87,10 +87,13 @@ class Scheduler:
    87 87
                     worker properties, configuration and state at the time of the
    
    88 88
                     request.
    
    89 89
             """
    
    90
    -        if not self.queue:
    
    90
    +        if not block and self.queue.empty():
    
    91 91
                 return []
    
    92 92
     
    
    93
    -        job = self.queue.popleft()
    
    93
    +        try:
    
    94
    +            job = self.queue.get(block, timeout)
    
    95
    +        except Empty:
    
    96
    +            return []
    
    94 97
             # For now, one lease at a time:
    
    95 98
             lease = job.create_lease()
    
    96 99
     
    

  • buildgrid/settings.py
    ... ... @@ -4,3 +4,6 @@ import hashlib
    4 4
     # The hash function that CAS uses
    
    5 5
     HASH = hashlib.sha256
    
    6 6
     HASH_LENGTH = HASH().digest_size * 2
    
    7
    +
    
    8
    +# time in seconds to pad timeouts
    
    9
    +INTERVAL_BUFFER = 5

  • buildgrid/utils.py
    ... ... @@ -16,6 +16,8 @@
    16 16
     from operator import attrgetter
    
    17 17
     import os
    
    18 18
     import socket
    
    19
    +from contextlib import contextmanager
    
    20
    +import signal
    
    19 21
     
    
    20 22
     from buildgrid.settings import HASH
    
    21 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    ... ... @@ -208,3 +210,15 @@ def output_directory_maker(directory_path, working_path, tree_digest):
    208 210
         output_directory.path = os.path.relpath(directory_path, start=working_path)
    
    209 211
     
    
    210 212
         return output_directory
    
    213
    +
    
    214
    +
    
    215
    +@contextmanager
    
    216
    +def timeout(seconds):
    
    217
    +    def raise_timeout(x, y):
    
    218
    +        raise TimeoutError
    
    219
    +    signal.signal(signal.SIGALRM, raise_timeout)
    
    220
    +    signal.alarm(seconds)
    
    221
    +    try:
    
    222
    +        yield
    
    223
    +    finally:
    
    224
    +        signal.alarm(0)

  • tests/integration/bots_service.py
    ... ... @@ -39,7 +39,9 @@ server = mock.create_autospec(grpc.server)
    39 39
     # GRPC context
    
    40 40
     @pytest.fixture
    
    41 41
     def context():
    
    42
    -    yield mock.MagicMock(spec=_Context)
    
    42
    +    context_mock = mock.MagicMock(spec=_Context)
    
    43
    +    context_mock.time_remaining.return_value = None
    
    44
    +    yield context_mock
    
    43 45
     
    
    44 46
     
    
    45 47
     @pytest.fixture
    
    ... ... @@ -91,7 +93,6 @@ def test_update_bot_session(bot_session, context, instance):
    91 93
     
    
    92 94
         request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    93 95
                                                    bot_session=bot)
    
    94
    -
    
    95 96
         response = instance.UpdateBotSession(request, context)
    
    96 97
     
    
    97 98
         assert isinstance(response, bots_pb2.BotSession)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]