[Notes] [Git][BuildGrid/buildgrid][raoul/smarter-bot-calls] 3 commits: scheduler.py: allow requests to block until a job appears



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid

Commits:

12 changed files:

Changes:

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -35,6 +35,7 @@ from buildgrid.bot.hardware.worker import Worker
    35 35
     
    
    36 36
     from ..bots import buildbox, dummy, host
    
    37 37
     from ..cli import pass_context
    
    38
    +from ...settings import INTERVAL_BUFFER
    
    38 39
     
    
    39 40
     
    
    40 41
     @click.group(name='bot', short_help="Create and register bot clients.")
    
    ... ... @@ -54,7 +55,7 @@ from ..cli import pass_context
    54 55
                   help="Public CAS client certificate for TLS (PEM-encoded)")
    
    55 56
     @click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
    
    56 57
                   help="Public CAS server certificate for TLS (PEM-encoded)")
    
    57
    -@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
    
    58
    +@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
    
    58 59
                   help="Time period for bot updates to the server in seconds.")
    
    59 60
     @click.option('--parent', type=click.STRING, default='main', show_default=True,
    
    60 61
                   help="Targeted farm resource.")
    
    ... ... @@ -66,7 +67,6 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
    66 67
     
    
    67 68
         context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
    
    68 69
         context.remote_url = remote
    
    69
    -    context.update_period = update_period
    
    70 70
         context.parent = parent
    
    71 71
     
    
    72 72
         if url.scheme == 'http':
    
    ... ... @@ -124,7 +124,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
    124 124
     
    
    125 125
         click.echo("Starting for remote=[{}]".format(context.remote))
    
    126 126
     
    
    127
    -    bot_interface = interface.BotInterface(context.channel)
    
    127
    +    bot_interface = interface.BotInterface(context.channel, update_period + INTERVAL_BUFFER)
    
    128 128
         worker = Worker()
    
    129 129
         worker.add_device(Device())
    
    130 130
         hardware_interface = HardwareInterface(worker)
    
    ... ... @@ -142,7 +142,7 @@ def run_dummy(context):
    142 142
         try:
    
    143 143
             bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
    
    144 144
                                              dummy.work_dummy, context)
    
    145
    -        b = bot.Bot(bot_session, context.update_period)
    
    145
    +        b = bot.Bot(bot_session)
    
    146 146
             b.session()
    
    147 147
         except KeyboardInterrupt:
    
    148 148
             pass
    
    ... ... @@ -158,7 +158,7 @@ def run_host_tools(context):
    158 158
         try:
    
    159 159
             bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
    
    160 160
                                              host.work_host_tools, context)
    
    161
    -        b = bot.Bot(bot_session, context.update_period)
    
    161
    +        b = bot.Bot(bot_session)
    
    162 162
             b.session()
    
    163 163
         except KeyboardInterrupt:
    
    164 164
             pass
    
    ... ... @@ -180,7 +180,7 @@ def run_buildbox(context, local_cas, fuse_dir):
    180 180
         try:
    
    181 181
             bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
    
    182 182
                                              buildbox.work_buildbox, context)
    
    183
    -        b = bot.Bot(bot_session, context.update_period)
    
    183
    +        b = bot.Bot(bot_session)
    
    184 184
             b.session()
    
    185 185
         except KeyboardInterrupt:
    
    186 186
             pass

  • buildgrid/bot/bot.py
    ... ... @@ -20,14 +20,12 @@ import logging
    20 20
     class Bot:
    
    21 21
         """Creates a local BotSession."""
    
    22 22
     
    
    23
    -    def __init__(self, bot_session, update_period=1):
    
    23
    +    def __init__(self, bot_session):
    
    24 24
             """
    
    25 25
             """
    
    26 26
             self.__logger = logging.getLogger(__name__)
    
    27 27
     
    
    28 28
             self.__bot_session = bot_session
    
    29
    -        self.__update_period = update_period
    
    30
    -
    
    31 29
             self.__loop = None
    
    32 30
     
    
    33 31
         def session(self):
    
    ... ... @@ -37,7 +35,7 @@ class Bot:
    37 35
             self.__bot_session.create_bot_session()
    
    38 36
     
    
    39 37
             try:
    
    40
    -            task = asyncio.ensure_future(self.__update_bot_session())
    
    38
    +            task = asyncio.ensure_future(self.__bot_session.run())
    
    41 39
                 self.__loop.run_until_complete(task)
    
    42 40
     
    
    43 41
             except KeyboardInterrupt:
    
    ... ... @@ -46,16 +44,6 @@ class Bot:
    46 44
             self.__kill_everyone()
    
    47 45
             self.__logger.info("Bot shutdown.")
    
    48 46
     
    
    49
    -    async def __update_bot_session(self):
    
    50
    -        """Calls the server periodically to inform the server the client has not died."""
    
    51
    -        try:
    
    52
    -            while True:
    
    53
    -                self.__bot_session.update_bot_session()
    
    54
    -                await asyncio.sleep(self.__update_period)
    
    55
    -
    
    56
    -        except asyncio.CancelledError:
    
    57
    -            pass
    
    58
    -
    
    59 47
         def __kill_everyone(self):
    
    60 48
             """Cancels and waits for them to stop."""
    
    61 49
             self.__logger.info("Cancelling remaining tasks...")
    

  • buildgrid/bot/interface.py
    ... ... @@ -31,28 +31,27 @@ class BotInterface:
    31 31
         Interface handles calls to the server.
    
    32 32
         """
    
    33 33
     
    
    34
    -    def __init__(self, channel):
    
    34
    +    def __init__(self, channel, interval):
    
    35 35
             self.__logger = logging.getLogger(__name__)
    
    36 36
     
    
    37
    +        self.__logger.info(channel)
    
    37 38
             self._stub = bots_pb2_grpc.BotsStub(channel)
    
    39
    +        self.interval = interval
    
    38 40
     
    
    39 41
         def create_bot_session(self, parent, bot_session):
    
    40 42
             request = bots_pb2.CreateBotSessionRequest(parent=parent,
    
    41 43
                                                        bot_session=bot_session)
    
    42
    -        try:
    
    43
    -            return self._stub.CreateBotSession(request)
    
    44
    -
    
    45
    -        except grpc.RpcError as e:
    
    46
    -            self.__logger.error(e)
    
    47
    -            raise
    
    44
    +        return self._bot_call(self._stub.CreateBotSession, request)
    
    48 45
     
    
    49 46
         def update_bot_session(self, bot_session, update_mask=None):
    
    50 47
             request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
    
    51 48
                                                        bot_session=bot_session,
    
    52 49
                                                        update_mask=update_mask)
    
    53
    -        try:
    
    54
    -            return self._stub.UpdateBotSession(request)
    
    50
    +        return self._bot_call(self._stub.UpdateBotSession, request)
    
    55 51
     
    
    52
    +    def _bot_call(self, call, request):
    
    53
    +        try:
    
    54
    +            return call(request, timeout=self.interval)
    
    56 55
             except grpc.RpcError as e:
    
    57 56
                 self.__logger.error(e)
    
    58 57
                 raise

  • buildgrid/bot/session.py
    ... ... @@ -19,6 +19,7 @@ Bot Session
    19 19
     
    
    20 20
     Allows connections
    
    21 21
     """
    
    22
    +import asyncio
    
    22 23
     import logging
    
    23 24
     import platform
    
    24 25
     
    
    ... ... @@ -47,6 +48,8 @@ class BotSession:
    47 48
             self._status = BotStatus.OK.value
    
    48 49
             self._tenant_manager = TenantManager()
    
    49 50
     
    
    51
    +        self.connected = False
    
    52
    +
    
    50 53
             self.__parent = parent
    
    51 54
             self.__bot_id = '{}.{}'.format(parent, platform.node())
    
    52 55
             self.__name = None
    
    ... ... @@ -58,10 +61,33 @@ class BotSession:
    58 61
         def bot_id(self):
    
    59 62
             return self.__bot_id
    
    60 63
     
    
    64
    +    async def run(self):
    
    65
    +        """ Run a bot session
    
    66
    +
    
    67
    +        This connects and reconnects via create bot session and waits on update
    
    68
    +        bot session calls.
    
    69
    +        """
    
    70
    +        self.__logger.debug("Starting bot session")
    
    71
    +        interval = self._bots_interface.interval
    
    72
    +        while True:
    
    73
    +            if self.connected is False:
    
    74
    +                self.create_bot_session()
    
    75
    +            else:
    
    76
    +                self.update_bot_session()
    
    77
    +
    
    78
    +            if self.connected is False:
    
    79
    +                await asyncio.sleep(interval)
    
    80
    +            else:
    
    81
    +                await self._tenant_manager.wait_on_tenants(interval)
    
    82
    +
    
    61 83
         def create_bot_session(self):
    
    62 84
             self.__logger.debug("Creating bot session")
    
    63 85
     
    
    64 86
             session = self._bots_interface.create_bot_session(self.__parent, self.get_pb2())
    
    87
    +        if session is None:
    
    88
    +            self.connected = False
    
    89
    +            return
    
    90
    +        self.connected = True
    
    65 91
             self.__name = session.name
    
    66 92
     
    
    67 93
             self.__logger.info("Created bot session with name: [%s]", self.__name)
    
    ... ... @@ -73,6 +99,10 @@ class BotSession:
    73 99
             self.__logger.debug("Updating bot session: [%s]", self.__bot_id)
    
    74 100
     
    
    75 101
             session = self._bots_interface.update_bot_session(self.get_pb2())
    
    102
    +        if session is None:
    
    103
    +            self.connected = False
    
    104
    +            return
    
    105
    +        self.connected = True
    
    76 106
             server_ids = []
    
    77 107
     
    
    78 108
             for lease in session.leases:
    

  • buildgrid/bot/tenantmanager.py
    ... ... @@ -150,6 +150,14 @@ class TenantManager:
    150 150
             """
    
    151 151
             return self._tenants[lease_id].tenant_completed
    
    152 152
     
    
    153
    +    async def wait_on_tenants(self, timeout):
    
    154
    +        if self._tasks:
    
    155
    +            tasks = self._tasks.values()
    
    156
    +            print(type(tasks))
    
    157
    +            await asyncio.wait(tasks,
    
    158
    +                               timeout=timeout,
    
    159
    +                               return_when=asyncio.FIRST_COMPLETED)
    
    160
    +
    
    153 161
         def _update_lease_result(self, lease_id, result):
    
    154 162
             """Updates the lease with the result."""
    
    155 163
             self._tenants[lease_id].update_lease_result(result)
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -26,6 +26,7 @@ import uuid
    26 26
     from buildgrid._exceptions import InvalidArgumentError
    
    27 27
     
    
    28 28
     from ..job import LeaseState
    
    29
    +from ...settings import INTERVAL_BUFFER
    
    29 30
     
    
    30 31
     
    
    31 32
     class BotsInterface:
    
    ... ... @@ -71,7 +72,7 @@ class BotsInterface:
    71 72
             self._request_leases(bot_session)
    
    72 73
             return bot_session
    
    73 74
     
    
    74
    -    def update_bot_session(self, name, bot_session):
    
    75
    +    def update_bot_session(self, name, bot_session, deadline=None):
    
    75 76
             """ Client updates the server. Any changes in state to the Lease should be
    
    76 77
             registered server side. Assigns available leases with work.
    
    77 78
             """
    
    ... ... @@ -89,14 +90,15 @@ class BotsInterface:
    89 90
                         pass
    
    90 91
                     lease.Clear()
    
    91 92
     
    
    92
    -        self._request_leases(bot_session)
    
    93
    +        self._request_leases(bot_session, deadline)
    
    93 94
             return bot_session
    
    94 95
     
    
    95
    -    def _request_leases(self, bot_session):
    
    96
    +    def _request_leases(self, bot_session, deadline=None):
    
    96 97
             # TODO: Send worker capabilities to the scheduler!
    
    97 98
             # Only send one lease at a time currently.
    
    98 99
             if not bot_session.leases:
    
    99
    -            leases = self._scheduler.request_job_leases({})
    
    100
    +            leases = self._scheduler.request_job_leases(
    
    101
    +                {}, timeout=deadline - INTERVAL_BUFFER if deadline else None)
    
    100 102
                 if leases:
    
    101 103
                     for lease in leases:
    
    102 104
                         self._assigned_leases[bot_session.name].add(lease.id)
    

  • buildgrid/server/bots/service.py
    ... ... @@ -68,8 +68,10 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    68 68
                 instance_name = ''.join(names[0:-1])
    
    69 69
     
    
    70 70
                 instance = self._get_instance(instance_name)
    
    71
    +            # server side context time_remaining is maxint - unix time
    
    71 72
                 return instance.update_bot_session(request.name,
    
    72
    -                                               request.bot_session)
    
    73
    +                                               request.bot_session,
    
    74
    +                                               deadline=context.time_remaining())
    
    73 75
     
    
    74 76
             except InvalidArgumentError as e:
    
    75 77
                 self.__logger.error(e)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -19,8 +19,8 @@ Scheduler
    19 19
     Schedules jobs.
    
    20 20
     """
    
    21 21
     
    
    22
    -from collections import deque
    
    23 22
     import logging
    
    23
    +from queue import Queue, Empty
    
    24 24
     
    
    25 25
     from buildgrid._exceptions import NotFoundError
    
    26 26
     
    
    ... ... @@ -36,7 +36,7 @@ class Scheduler:
    36 36
     
    
    37 37
             self._action_cache = action_cache
    
    38 38
             self.jobs = {}
    
    39
    -        self.queue = deque()
    
    39
    +        self.queue = Queue()
    
    40 40
     
    
    41 41
         def register_client(self, job_name, queue):
    
    42 42
             self.jobs[job_name].register_client(queue)
    
    ... ... @@ -56,7 +56,7 @@ class Scheduler:
    56 56
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    57 57
                 except NotFoundError:
    
    58 58
                     operation_stage = OperationStage.QUEUED
    
    59
    -                self.queue.append(job)
    
    59
    +                self.queue.put(job)
    
    60 60
     
    
    61 61
                 else:
    
    62 62
                     job.set_cached_result(action_result)
    
    ... ... @@ -64,7 +64,7 @@ class Scheduler:
    64 64
     
    
    65 65
             else:
    
    66 66
                 operation_stage = OperationStage.QUEUED
    
    67
    -            self.queue.append(job)
    
    67
    +            self.queue.put(job)
    
    68 68
     
    
    69 69
             job.update_operation_stage(operation_stage)
    
    70 70
     
    
    ... ... @@ -78,12 +78,12 @@ class Scheduler:
    78 78
                 else:
    
    79 79
                     job.update_operation_stage(OperationStage.QUEUED)
    
    80 80
                     job.update_lease_state(LeaseState.PENDING)
    
    81
    -                self.queue.append(job)
    
    81
    +                self.queue.queue.appendleft(job)
    
    82 82
     
    
    83 83
         def list_jobs(self):
    
    84 84
             return self.jobs.values()
    
    85 85
     
    
    86
    -    def request_job_leases(self, worker_capabilities):
    
    86
    +    def request_job_leases(self, worker_capabilities, timeout=None):
    
    87 87
             """Generates a list of the highest priority leases to be run.
    
    88 88
     
    
    89 89
             Args:
    
    ... ... @@ -91,10 +91,13 @@ class Scheduler:
    91 91
                     worker properties, configuration and state at the time of the
    
    92 92
                     request.
    
    93 93
             """
    
    94
    -        if not self.queue:
    
    94
    +        if not timeout and self.queue.empty():
    
    95 95
                 return []
    
    96 96
     
    
    97
    -        job = self.queue.popleft()
    
    97
    +        try:
    
    98
    +            job = self.queue.get(True, timeout) if timeout else self.queue.get(False)
    
    99
    +        except Empty:
    
    100
    +            return []
    
    98 101
     
    
    99 102
             lease = job.lease
    
    100 103
     
    

  • buildgrid/settings.py
    ... ... @@ -4,3 +4,6 @@ import hashlib
    4 4
     # The hash function that CAS uses
    
    5 5
     HASH = hashlib.sha256
    
    6 6
     HASH_LENGTH = HASH().digest_size * 2
    
    7
    +
    
    8
    +# time in seconds to pad timeouts
    
    9
    +INTERVAL_BUFFER = 5

  • tests/integration/bot_session.py
    ... ... @@ -30,6 +30,7 @@ from ..utils.utils import run_in_subprocess
    30 30
     from ..utils.bots_interface import serve_bots_interface
    
    31 31
     
    
    32 32
     
    
    33
    +TIMEOUT = 5
    
    33 34
     INSTANCES = ['', 'instance']
    
    34 35
     
    
    35 36
     
    
    ... ... @@ -48,7 +49,7 @@ class ServerInterface:
    48 49
                 bot_session = bots_pb2.BotSession()
    
    49 50
                 bot_session.ParseFromString(string_bot_session)
    
    50 51
     
    
    51
    -            interface = BotInterface(grpc.insecure_channel(remote))
    
    52
    +            interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
    
    52 53
     
    
    53 54
                 result = interface.create_bot_session(parent, bot_session)
    
    54 55
                 queue.put(result.SerializeToString())
    
    ... ... @@ -67,7 +68,7 @@ class ServerInterface:
    67 68
                 bot_session = bots_pb2.BotSession()
    
    68 69
                 bot_session.ParseFromString(string_bot_session)
    
    69 70
     
    
    70
    -            interface = BotInterface(grpc.insecure_channel(remote))
    
    71
    +            interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
    
    71 72
     
    
    72 73
                 result = interface.update_bot_session(bot_session, update_mask)
    
    73 74
                 queue.put(result.SerializeToString())
    

  • tests/integration/bots_service.py
    ... ... @@ -38,7 +38,9 @@ server = mock.create_autospec(grpc.server)
    38 38
     # GRPC context
    
    39 39
     @pytest.fixture
    
    40 40
     def context():
    
    41
    -    yield mock.MagicMock(spec=_Context)
    
    41
    +    context_mock = mock.MagicMock(spec=_Context)
    
    42
    +    context_mock.time_remaining.return_value = None
    
    43
    +    yield context_mock
    
    42 44
     
    
    43 45
     
    
    44 46
     @pytest.fixture
    
    ... ... @@ -90,7 +92,6 @@ def test_update_bot_session(bot_session, context, instance):
    90 92
     
    
    91 93
         request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    92 94
                                                    bot_session=bot)
    
    93
    -
    
    94 95
         response = instance.UpdateBotSession(request, context)
    
    95 96
     
    
    96 97
         assert isinstance(response, bots_pb2.BotSession)
    

  • tests/utils/bots_interface.py
    ... ... @@ -123,7 +123,7 @@ class BotsInterface:
    123 123
             self.__bot_session_queue.put(bot_session.SerializeToString())
    
    124 124
             return bot_session
    
    125 125
     
    
    126
    -    def update_bot_session(self, name, bot_session):
    
    126
    +    def update_bot_session(self, name, bot_session, deadline=None):
    
    127 127
             for lease in bot_session.leases:
    
    128 128
                 state = LeaseState(lease.state)
    
    129 129
                 if state == LeaseState.COMPLETED:
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]