[Notes] [Git][BuildGrid/buildgrid][725-job-cancellation-on-remote-builds] Bot: Reconnects and timeouts on the bot side



Title: GitLab

Raoul Hidalgo Charman pushed to branch 725-job-cancellation-on-remote-builds at BuildGrid / buildgrid

Commits:

4 changed files:

Changes:

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -33,7 +33,6 @@ from buildgrid.bot.bot_session import BotSession, Device, Worker
    33 33
     
    
    34 34
     from ..bots import buildbox, dummy, host
    
    35 35
     from ..cli import pass_context
    
    36
    -from ...settings import INTERVAL_BUFFER
    
    37 36
     
    
    38 37
     
    
    39 38
     @click.group(name='bot', short_help="Create and register bot clients.")
    
    ... ... @@ -53,6 +52,7 @@ from ...settings import INTERVAL_BUFFER
    53 52
                   help="Public CAS client certificate for TLS (PEM-encoded)")
    
    54 53
     @click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
    
    55 54
                   help="Public CAS server certificate for TLS (PEM-encoded)")
    
    55
    +# TODO change default to 30
    
    56 56
     @click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
    
    57 57
                   help="Time period for bot updates to the server in seconds.")
    
    58 58
     @click.option('--parent', type=click.STRING, default='main', show_default=True,
    

  • buildgrid/bot/bot.py
    ... ... @@ -37,10 +37,8 @@ class Bot:
    37 37
         def session(self, work, context):
    
    38 38
             loop = asyncio.get_event_loop()
    
    39 39
     
    
    40
    -        self._bot_session.create_bot_session(work, context)
    
    41
    -
    
    42 40
             try:
    
    43
    -            task = asyncio.ensure_future(self._update_bot_session())
    
    41
    +            task = asyncio.ensure_future(self._bot_session.run(work, context))
    
    44 42
                 loop.run_forever()
    
    45 43
             except KeyboardInterrupt:
    
    46 44
                 pass
    
    ... ... @@ -48,9 +46,19 @@ class Bot:
    48 46
                 task.cancel()
    
    49 47
                 loop.close()
    
    50 48
     
    
    51
    -    async def _update_bot_session(self):
    
    49
    +    async def _run_bot_session(self, work, context):
    
    52 50
             """
    
    53 51
             Calls the server periodically to inform the server the client has not died.
    
    54 52
             """
    
    55 53
             while True:
    
    56
    -            self._bot_session.update_bot_session()
    54
    +            if self._bot_session.connected is False:
    
    55
    +                self._bot_session.create_bot_session(work, context)
    
    56
    +            else:
    
    57
    +                self._bot_session.update_bot_session()
    
    58
    +
    
    59
    +            if self._bot_session._futures:
    
    60
    +                await asyncio.wait(self._bot_session._futures.values(),
    
    61
    +                                   timeout=30,
    
    62
    +                                   return_when=asyncio.FIRST_COMPLETED)
    
    63
    +            elif self._bot_session.connected is False:
    
    64
    +                await asyncio.sleep(30)

  • buildgrid/bot/bot_interface.py
    ... ... @@ -21,8 +21,10 @@ Interface to grpc
    21 21
     """
    
    22 22
     
    
    23 23
     import logging
    
    24
    +import grpc
    
    24 25
     
    
    25 26
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
    
    27
    +from ..settings import INTERVAL_BUFFER
    
    26 28
     
    
    27 29
     
    
    28 30
     class BotInterface:
    
    ... ... @@ -34,15 +36,23 @@ class BotInterface:
    34 36
             self.logger = logging.getLogger(__name__)
    
    35 37
             self.logger.info(channel)
    
    36 38
             self._stub = bots_pb2_grpc.BotsStub(channel)
    
    37
    -        self._interval = interval
    
    39
    +        self.interval = interval
    
    38 40
     
    
    39 41
         def create_bot_session(self, parent, bot_session):
    
    40 42
             request = bots_pb2.CreateBotSessionRequest(parent=parent,
    
    41 43
                                                        bot_session=bot_session)
    
    42
    -        return self._stub.CreateBotSession(request)
    
    44
    +        return self._bot_call(self._stub.CreateBotSession, request)
    
    43 45
     
    
    44 46
         def update_bot_session(self, bot_session, update_mask=None):
    
    45 47
             request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
    
    46 48
                                                        bot_session=bot_session,
    
    47 49
                                                        update_mask=update_mask)
    
    48
    -        return self._stub.UpdateBotSession(request, timeout=self._interval)
    50
    +        return self._bot_call(self._stub.UpdateBotSession, request)
    
    51
    +
    
    52
    +    def _bot_call(self, call, request):
    
    53
    +        try:
    
    54
    +            return call(request, timeout=self.interval + INTERVAL_BUFFER)
    
    55
    +        except grpc.RpcError as e:
    
    56
    +            if e.code() in grpc.StatusCode:
    
    57
    +                self.logger.warning("Server responded with error: {}".format(e.code()))
    
    58
    +                return None

  • buildgrid/bot/bot_session.py
    ... ... @@ -49,7 +49,9 @@ class BotSession:
    49 49
             self._bot_id = '{}.{}'.format(parent, platform.node())
    
    50 50
             self._context = None
    
    51 51
             self._interface = interface
    
    52
    +        self.connected = False
    
    52 53
             self._leases = {}
    
    54
    +        self._futures = {}
    
    53 55
             self._name = None
    
    54 56
             self._parent = parent
    
    55 57
             self._status = BotStatus.OK.value
    
    ... ... @@ -63,12 +65,31 @@ class BotSession:
    63 65
         def add_worker(self, worker):
    
    64 66
             self._worker = worker
    
    65 67
     
    
    68
    +    async def run(self, work, context=None):
    
    69
    +        self.logger.info("Starting bot session runner")
    
    70
    +        while True:
    
    71
    +            if self.connected is False:
    
    72
    +                self.create_bot_session(work, context)
    
    73
    +            else:
    
    74
    +                self.update_bot_session()
    
    75
    +
    
    76
    +            if self._futures:
    
    77
    +                await asyncio.wait(self._futures.values(),
    
    78
    +                                   timeout=self._interface.interval,
    
    79
    +                                   return_when=asyncio.FIRST_COMPLETED)
    
    80
    +            elif self.connected is False:
    
    81
    +                await asyncio.sleep(self._interface.interval)
    
    82
    +
    
    66 83
         def create_bot_session(self, work, context=None):
    
    67 84
             self.logger.debug("Creating bot session")
    
    68 85
             self._work = work
    
    69 86
             self._context = context
    
    70 87
     
    
    71 88
             session = self._interface.create_bot_session(self._parent, self.get_pb2())
    
    89
    +        if session is None:
    
    90
    +            self.connected = False
    
    91
    +            return
    
    92
    +        self.connected = True
    
    72 93
             self._name = session.name
    
    73 94
     
    
    74 95
             self.logger.info("Created bot session with name: [{}]".format(self._name))
    
    ... ... @@ -79,6 +100,10 @@ class BotSession:
    79 100
         def update_bot_session(self):
    
    80 101
             self.logger.debug("Updating bot session: [{}]".format(self._bot_id))
    
    81 102
             session = self._interface.update_bot_session(self.get_pb2())
    
    103
    +        if session is None:
    
    104
    +            self.connected = False
    
    105
    +            return
    
    106
    +        self.connected = True
    
    82 107
             for k, v in list(self._leases.items()):
    
    83 108
                 if v.state == LeaseState.COMPLETED.value:
    
    84 109
                     del self._leases[k]
    
    ... ... @@ -110,7 +135,7 @@ class BotSession:
    110 135
                 lease.state = LeaseState.ACTIVE.value
    
    111 136
                 self._leases[lease.id] = lease
    
    112 137
                 self.update_bot_session()
    
    113
    -            asyncio.ensure_future(self.create_work(lease))
    
    138
    +            self._futures[lease.id] = asyncio.ensure_future(self.create_work(lease))
    
    114 139
     
    
    115 140
         async def create_work(self, lease):
    
    116 141
             self.logger.debug("Work created: [{}]".format(lease.id))
    
    ... ... @@ -133,6 +158,7 @@ class BotSession:
    133 158
     
    
    134 159
             self.logger.debug("Work complete: [{}]".format(lease.id))
    
    135 160
             self.lease_completed(lease)
    
    161
    +        del self._futures[lease.id]
    
    136 162
     
    
    137 163
     
    
    138 164
     class Worker:
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]