[Notes] [Git][BuildGrid/buildgrid][raoul/smarter-bot-calls] Bot: Reconnects and timeouts on the bot side



Title: GitLab

Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid

Commits:

4 changed files:

Changes:

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -33,7 +33,6 @@ from buildgrid.bot.bot_session import BotSession, Device, Worker
    33 33
     
    
    34 34
     from ..bots import buildbox, dummy, host
    
    35 35
     from ..cli import pass_context
    
    36
    -from ...settings import INTERVAL_BUFFER
    
    37 36
     
    
    38 37
     
    
    39 38
     @click.group(name='bot', short_help="Create and register bot clients.")
    
    ... ... @@ -53,6 +52,7 @@ from ...settings import INTERVAL_BUFFER
    53 52
                   help="Public CAS client certificate for TLS (PEM-encoded)")
    
    54 53
     @click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
    
    55 54
                   help="Public CAS server certificate for TLS (PEM-encoded)")
    
    55
    +# TODO change default to 30
    
    56 56
     @click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
    
    57 57
                   help="Time period for bot updates to the server in seconds.")
    
    58 58
     @click.option('--parent', type=click.STRING, default='main', show_default=True,
    

  • buildgrid/bot/bot.py
    ... ... @@ -37,20 +37,11 @@ class Bot:
    37 37
         def session(self, work, context):
    
    38 38
             loop = asyncio.get_event_loop()
    
    39 39
     
    
    40
    -        self._bot_session.create_bot_session(work, context)
    
    41
    -
    
    42 40
             try:
    
    43
    -            task = asyncio.ensure_future(self._update_bot_session())
    
    41
    +            task = asyncio.ensure_future(self._bot_session.run(work, context))
    
    44 42
                 loop.run_forever()
    
    45 43
             except KeyboardInterrupt:
    
    46 44
                 pass
    
    47 45
             finally:
    
    48 46
                 task.cancel()
    
    49 47
                 loop.close()
    50
    -
    
    51
    -    async def _update_bot_session(self):
    
    52
    -        """
    
    53
    -        Calls the server periodically to inform the server the client has not died.
    
    54
    -        """
    
    55
    -        while True:
    
    56
    -            self._bot_session.update_bot_session()

  • buildgrid/bot/bot_interface.py
    ... ... @@ -21,8 +21,10 @@ Interface to grpc
    21 21
     """
    
    22 22
     
    
    23 23
     import logging
    
    24
    +import grpc
    
    24 25
     
    
    25 26
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
    
    27
    +from ..settings import INTERVAL_BUFFER
    
    26 28
     
    
    27 29
     
    
    28 30
     class BotInterface:
    
    ... ... @@ -34,15 +36,23 @@ class BotInterface:
    34 36
             self.logger = logging.getLogger(__name__)
    
    35 37
             self.logger.info(channel)
    
    36 38
             self._stub = bots_pb2_grpc.BotsStub(channel)
    
    37
    -        self._interval = interval
    
    39
    +        self.interval = interval
    
    38 40
     
    
    39 41
         def create_bot_session(self, parent, bot_session):
    
    40 42
             request = bots_pb2.CreateBotSessionRequest(parent=parent,
    
    41 43
                                                        bot_session=bot_session)
    
    42
    -        return self._stub.CreateBotSession(request)
    
    44
    +        return self._bot_call(self._stub.CreateBotSession, request)
    
    43 45
     
    
    44 46
         def update_bot_session(self, bot_session, update_mask=None):
    
    45 47
             request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
    
    46 48
                                                        bot_session=bot_session,
    
    47 49
                                                        update_mask=update_mask)
    
    48
    -        return self._stub.UpdateBotSession(request, timeout=self._interval)
    50
    +        return self._bot_call(self._stub.UpdateBotSession, request)
    
    51
    +
    
    52
    +    def _bot_call(self, call, request):
    
    53
    +        try:
    
    54
    +            return call(request, timeout=self.interval + INTERVAL_BUFFER)
    
    55
    +        except grpc.RpcError as e:
    
    56
    +            if e.code() in grpc.StatusCode:
    
    57
    +                self.logger.warning("Server responded with error: {}".format(e.code()))
    
    58
    +                return None

  • buildgrid/bot/bot_session.py
    ... ... @@ -49,7 +49,9 @@ class BotSession:
    49 49
             self._bot_id = '{}.{}'.format(parent, platform.node())
    
    50 50
             self._context = None
    
    51 51
             self._interface = interface
    
    52
    +        self.connected = False
    
    52 53
             self._leases = {}
    
    54
    +        self._futures = {}
    
    53 55
             self._name = None
    
    54 56
             self._parent = parent
    
    55 57
             self._status = BotStatus.OK.value
    
    ... ... @@ -63,12 +65,35 @@ class BotSession:
    63 65
         def add_worker(self, worker):
    
    64 66
             self._worker = worker
    
    65 67
     
    
    68
    +    async def run(self, work, context=None):
    
    69
    +        """
    
    70
    +        Run a bot session that waits on bot session calls and reconnects if
    
    71
    +        there is no response
    
    72
    +        """
    
    73
    +        self.logger.info("Starting bot session runner")
    
    74
    +        while True:
    
    75
    +            if self.connected is False:
    
    76
    +                self.create_bot_session(work, context)
    
    77
    +            else:
    
    78
    +                self.update_bot_session()
    
    79
    +
    
    80
    +            if self._futures:
    
    81
    +                await asyncio.wait(self._futures.values(),
    
    82
    +                                   timeout=self._interface.interval,
    
    83
    +                                   return_when=asyncio.FIRST_COMPLETED)
    
    84
    +            elif self.connected is False:
    
    85
    +                await asyncio.sleep(self._interface.interval)
    
    86
    +
    
    66 87
         def create_bot_session(self, work, context=None):
    
    67 88
             self.logger.debug("Creating bot session")
    
    68 89
             self._work = work
    
    69 90
             self._context = context
    
    70 91
     
    
    71 92
             session = self._interface.create_bot_session(self._parent, self.get_pb2())
    
    93
    +        if session is None:
    
    94
    +            self.connected = False
    
    95
    +            return
    
    96
    +        self.connected = True
    
    72 97
             self._name = session.name
    
    73 98
     
    
    74 99
             self.logger.info("Created bot session with name: [{}]".format(self._name))
    
    ... ... @@ -79,6 +104,10 @@ class BotSession:
    79 104
         def update_bot_session(self):
    
    80 105
             self.logger.debug("Updating bot session: [{}]".format(self._bot_id))
    
    81 106
             session = self._interface.update_bot_session(self.get_pb2())
    
    107
    +        if session is None:
    
    108
    +            self.connected = False
    
    109
    +            return
    
    110
    +        self.connected = True
    
    82 111
             for k, v in list(self._leases.items()):
    
    83 112
                 if v.state == LeaseState.COMPLETED.value:
    
    84 113
                     del self._leases[k]
    
    ... ... @@ -100,6 +129,7 @@ class BotSession:
    100 129
         def lease_completed(self, lease):
    
    101 130
             lease.state = LeaseState.COMPLETED.value
    
    102 131
             self._leases[lease.id] = lease
    
    132
    +        del self._futures[lease.id]
    
    103 133
     
    
    104 134
         def _update_lease_from_server(self, lease):
    
    105 135
             """
    
    ... ... @@ -110,7 +140,7 @@ class BotSession:
    110 140
                 lease.state = LeaseState.ACTIVE.value
    
    111 141
                 self._leases[lease.id] = lease
    
    112 142
                 self.update_bot_session()
    
    113
    -            asyncio.ensure_future(self.create_work(lease))
    
    143
    +            self._futures[lease.id] = asyncio.ensure_future(self.create_work(lease))
    
    114 144
     
    
    115 145
         async def create_work(self, lease):
    
    116 146
             self.logger.debug("Work created: [{}]".format(lease.id))
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]