[Notes] [Git][BuildGrid/buildgrid][raoul/smarter-bot-calls] Smarter bot calls



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid

Commits:

7 changed files:

Changes:

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -33,9 +33,12 @@ from buildgrid.bot.bot_session import BotSession, Device, Worker
    33 33
     
    
    34 34
     from ..bots import buildbox, dummy, host
    
    35 35
     from ..cli import pass_context
    
    36
    +from ...settings import INTERVAL_BUFFER
    
    36 37
     
    
    37 38
     
    
    38 39
     @click.group(name='bot', short_help="Create and register bot clients.")
    
    40
    +@click.option('--interval', type=click.INT, default=30,
    
    41
    +              help="Interval for calling central server")
    
    39 42
     @click.option('--remote', type=click.STRING, default='http://localhost:50051', show_default=True,
    
    40 43
                   help="Remote execution server's URL (port defaults to 50051 if not specified).")
    
    41 44
     @click.option('--client-key', type=click.Path(exists=True, dir_okay=False), default=None,
    
    ... ... @@ -58,7 +61,7 @@ from ..cli import pass_context
    58 61
                   help="Targeted farm resource.")
    
    59 62
     @pass_context
    
    60 63
     def cli(context, parent, update_period, remote, client_key, client_cert, server_cert,
    
    61
    -        remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
    
    64
    +        remote_cas, cas_client_key, cas_client_cert, cas_server_cert, interval):
    
    62 65
         # Setup the remote execution server channel:
    
    63 66
         url = urlparse(remote)
    
    64 67
     
    
    ... ... @@ -123,7 +126,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
    123 126
         context.logger = logging.getLogger(__name__)
    
    124 127
         context.logger.debug("Starting for remote {}".format(context.remote))
    
    125 128
     
    
    126
    -    interface = bot_interface.BotInterface(context.channel)
    
    129
    +    interface = bot_interface.BotInterface(context.channel, interval + INTERVAL_BUFFER)
    
    127 130
     
    
    128 131
         worker = Worker()
    
    129 132
         worker.add_device(Device())
    

  • buildgrid/bot/bot_interface.py
    ... ... @@ -30,10 +30,11 @@ class BotInterface:
    30 30
         Interface handles calls to the server.
    
    31 31
         """
    
    32 32
     
    
    33
    -    def __init__(self, channel):
    
    33
    +    def __init__(self, channel, interval):
    
    34 34
             self.logger = logging.getLogger(__name__)
    
    35 35
             self.logger.info(channel)
    
    36 36
             self._stub = bots_pb2_grpc.BotsStub(channel)
    
    37
    +        self._interval = interval
    
    37 38
     
    
    38 39
         def create_bot_session(self, parent, bot_session):
    
    39 40
             request = bots_pb2.CreateBotSessionRequest(parent=parent,
    
    ... ... @@ -44,4 +45,4 @@ class BotInterface:
    44 45
             request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
    
    45 46
                                                        bot_session=bot_session,
    
    46 47
                                                        update_mask=update_mask)
    
    47
    -        return self._stub.UpdateBotSession(request)
    48
    +        return self._stub.UpdateBotSession(request, timeout=self._interval)

  • buildgrid/server/bots/instance.py
    ... ... @@ -26,6 +26,7 @@ import uuid
    26 26
     from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    27 27
     
    
    28 28
     from ..job import LeaseState
    
    29
    +from ...settings import INTERVAL_BUFFER
    
    29 30
     
    
    30 31
     
    
    31 32
     class BotsInterface:
    
    ... ... @@ -73,7 +74,7 @@ class BotsInterface:
    73 74
     
    
    74 75
             return bot_session
    
    75 76
     
    
    76
    -    def update_bot_session(self, name, bot_session):
    
    77
    +    def update_bot_session(self, name, bot_session, deadline=None):
    
    77 78
             """ Client updates the server. Any changes in state to the Lease should be
    
    78 79
             registered server side. Assigns available leases with work.
    
    79 80
             """
    
    ... ... @@ -87,7 +88,9 @@ class BotsInterface:
    87 88
     
    
    88 89
             # TODO: Send worker capabilities to the scheduler!
    
    89 90
             if not bot_session.leases:
    
    90
    -            leases = self._scheduler.request_job_leases({})
    
    91
    +            leases = self._scheduler.request_job_leases(
    
    92
    +                {}, block=True if deadline else None,
    
    93
    +                timeout=deadline - INTERVAL_BUFFER if deadline else None)
    
    91 94
                 if leases:
    
    92 95
                     bot_session.leases.extend(leases)
    
    93 96
     
    

  • buildgrid/server/bots/service.py
    ... ... @@ -64,8 +64,11 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    64 64
                 instance_name = ''.join(names[0:-1])
    
    65 65
     
    
    66 66
                 instance = self._get_instance(instance_name)
    
    67
    +            # server side context time_remaining is maxint - unix time
    
    68
    +            print(context.time_remaining())
    
    67 69
                 return instance.update_bot_session(request.name,
    
    68
    -                                               request.bot_session)
    
    70
    +                                               request.bot_session,
    
    71
    +                                               context.time_remaining())
    
    69 72
     
    
    70 73
             except InvalidArgumentError as e:
    
    71 74
                 self.logger.error(e)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -19,7 +19,7 @@ Scheduler
    19 19
     Schedules jobs.
    
    20 20
     """
    
    21 21
     
    
    22
    -from collections import deque
    
    22
    +from queue import Queue, Empty
    
    23 23
     
    
    24 24
     from buildgrid._exceptions import NotFoundError
    
    25 25
     
    
    ... ... @@ -33,7 +33,7 @@ class Scheduler:
    33 33
         def __init__(self, action_cache=None):
    
    34 34
             self._action_cache = action_cache
    
    35 35
             self.jobs = {}
    
    36
    -        self.queue = deque()
    
    36
    +        self.queue = Queue()
    
    37 37
     
    
    38 38
         def register_client(self, job_name, queue):
    
    39 39
             self.jobs[job_name].register_client(queue)
    
    ... ... @@ -53,7 +53,7 @@ class Scheduler:
    53 53
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    54 54
                 except NotFoundError:
    
    55 55
                     operation_stage = OperationStage.QUEUED
    
    56
    -                self.queue.append(job)
    
    56
    +                self.queue.put(job)
    
    57 57
     
    
    58 58
                 else:
    
    59 59
                     job.set_cached_result(action_result)
    
    ... ... @@ -61,7 +61,7 @@ class Scheduler:
    61 61
     
    
    62 62
             else:
    
    63 63
                 operation_stage = OperationStage.QUEUED
    
    64
    -            self.queue.append(job)
    
    64
    +            self.queue.put(job)
    
    65 65
     
    
    66 66
             job.update_operation_stage(operation_stage)
    
    67 67
     
    
    ... ... @@ -74,12 +74,12 @@ class Scheduler:
    74 74
                     # TODO: Mark these jobs as done
    
    75 75
                 else:
    
    76 76
                     job.update_operation_stage(OperationStage.QUEUED)
    
    77
    -                self.queue.appendleft(job)
    
    77
    +                self.queue.queue.appendleft(job)
    
    78 78
     
    
    79 79
         def list_jobs(self):
    
    80 80
             return self.jobs.values()
    
    81 81
     
    
    82
    -    def request_job_leases(self, worker_capabilities):
    
    82
    +    def request_job_leases(self, worker_capabilities, block=False, timeout=None):
    
    83 83
             """Generates a list of the highest priority leases to be run.
    
    84 84
     
    
    85 85
             Args:
    
    ... ... @@ -87,10 +87,13 @@ class Scheduler:
    87 87
                     worker properties, configuration and state at the time of the
    
    88 88
                     request.
    
    89 89
             """
    
    90
    -        if not self.queue:
    
    90
    +        if not block and self.queue.empty():
    
    91 91
                 return []
    
    92 92
     
    
    93
    -        job = self.queue.popleft()
    
    93
    +        try:
    
    94
    +            job = self.queue.get(block, timeout)
    
    95
    +        except Empty:
    
    96
    +            return []
    
    94 97
             # For now, one lease at a time:
    
    95 98
             lease = job.create_lease()
    
    96 99
     
    

  • buildgrid/settings.py
    ... ... @@ -4,3 +4,6 @@ import hashlib
    4 4
     # The hash function that CAS uses
    
    5 5
     HASH = hashlib.sha256
    
    6 6
     HASH_LENGTH = HASH().digest_size * 2
    
    7
    +
    
    8
    +# time in seconds to pad timeouts
    
    9
    +INTERVAL_BUFFER = 5

  • tests/integration/bots_service.py
    ... ... @@ -39,7 +39,9 @@ server = mock.create_autospec(grpc.server)
    39 39
     # GRPC context
    
    40 40
     @pytest.fixture
    
    41 41
     def context():
    
    42
    -    yield mock.MagicMock(spec=_Context)
    
    42
    +    context_mock = mock.MagicMock(spec=_Context)
    
    43
    +    context_mock.time_remaining.return_value = None
    
    44
    +    yield context_mock
    
    43 45
     
    
    44 46
     
    
    45 47
     @pytest.fixture
    
    ... ... @@ -91,7 +93,6 @@ def test_update_bot_session(bot_session, context, instance):
    91 93
     
    
    92 94
         request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    93 95
                                                    bot_session=bot)
    
    94
    -
    
    95 96
         response = instance.UpdateBotSession(request, context)
    
    96 97
     
    
    97 98
         assert isinstance(response, bots_pb2.BotSession)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]