Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid
Commits:
- 
9b822f95
by Raoul Hidalgo Charman at 2018-11-27T15:39:08Z
3 changed files:
Changes:
| ... | ... | @@ -35,7 +35,7 @@ class Bot: | 
| 35 | 35 |          self.__bot_session.create_bot_session()
 | 
| 36 | 36 |  | 
| 37 | 37 |          try:
 | 
| 38 | -            task = asyncio.ensure_future(self.__update_bot_session())
 | |
| 38 | +            task = asyncio.ensure_future(self.__bot_session.run())
 | |
| 39 | 39 |              self.__loop.run_until_complete(task)
 | 
| 40 | 40 |  | 
| 41 | 41 |          except KeyboardInterrupt:
 | 
| ... | ... | @@ -44,15 +44,6 @@ class Bot: | 
| 44 | 44 |          self.__kill_everyone()
 | 
| 45 | 45 |          self.__logger.info("Bot shutdown.")
 | 
| 46 | 46 |  | 
| 47 | -    async def __update_bot_session(self):
 | |
| 48 | -        """Calls the server periodically to inform the server the client has not died."""
 | |
| 49 | -        try:
 | |
| 50 | -            while True:
 | |
| 51 | -                self.__bot_session.update_bot_session()
 | |
| 52 | - | |
| 53 | -        except asyncio.CancelledError:
 | |
| 54 | -            pass
 | |
| 55 | - | |
| 56 | 47 |      def __kill_everyone(self):
 | 
| 57 | 48 |          """Cancels and waits for them to stop."""
 | 
| 58 | 49 |          self.__logger.info("Cancelling remaining tasks...")
 | 
| ... | ... | @@ -24,6 +24,7 @@ import logging | 
| 24 | 24 |  import grpc
 | 
| 25 | 25 |  | 
| 26 | 26 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
 | 
| 27 | +from ..settings import INTERVAL_BUFFER
 | |
| 27 | 28 |  | 
| 28 | 29 |  | 
| 29 | 30 |  class BotInterface:
 | 
| ... | ... | @@ -36,25 +37,22 @@ class BotInterface: | 
| 36 | 37 |  | 
| 37 | 38 |          self.__logger.info(channel)
 | 
| 38 | 39 |          self._stub = bots_pb2_grpc.BotsStub(channel)
 | 
| 39 | -        self._interval = interval
 | |
| 40 | +        self.interval = interval
 | |
| 40 | 41 |  | 
| 41 | 42 |      def create_bot_session(self, parent, bot_session):
 | 
| 42 | 43 |          request = bots_pb2.CreateBotSessionRequest(parent=parent,
 | 
| 43 | 44 |                                                     bot_session=bot_session)
 | 
| 44 | -        try:
 | |
| 45 | -            return self._stub.CreateBotSession(request)
 | |
| 46 | - | |
| 47 | -        except grpc.RpcError as e:
 | |
| 48 | -            self.__logger.error(e)
 | |
| 49 | -            raise
 | |
| 45 | +        return self._bot_call(self._stub.CreateBotSession, request)
 | |
| 50 | 46 |  | 
| 51 | 47 |      def update_bot_session(self, bot_session, update_mask=None):
 | 
| 52 | 48 |          request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
 | 
| 53 | 49 |                                                     bot_session=bot_session,
 | 
| 54 | 50 |                                                     update_mask=update_mask)
 | 
| 55 | -        try:
 | |
| 56 | -            return self._stub.UpdateBotSession(request, timeout=self._interval)
 | |
| 51 | +        return self._bot_call(self._stub.UpdateBotSession, request)
 | |
| 57 | 52 |  | 
| 53 | +    def _bot_call(self, call, request):
 | |
| 54 | +        try:
 | |
| 55 | +            return call(request, timeout=self.interval + INTERVAL_BUFFER)
 | |
| 58 | 56 |          except grpc.RpcError as e:
 | 
| 59 | 57 |              self.__logger.error(e)
 | 
| 60 | 58 |              raise | 
| ... | ... | @@ -19,6 +19,7 @@ Bot Session | 
| 19 | 19 |  | 
| 20 | 20 |  Allows connections
 | 
| 21 | 21 |  """
 | 
| 22 | +import asyncio
 | |
| 22 | 23 |  import logging
 | 
| 23 | 24 |  import platform
 | 
| 24 | 25 |  | 
| ... | ... | @@ -47,6 +48,9 @@ class BotSession: | 
| 47 | 48 |          self._status = BotStatus.OK.value
 | 
| 48 | 49 |          self._tenant_manager = TenantManager()
 | 
| 49 | 50 |  | 
| 51 | +        self.connected = False
 | |
| 52 | +        self._futures = {}
 | |
| 53 | + | |
| 50 | 54 |          self.__parent = parent
 | 
| 51 | 55 |          self.__bot_id = '{}.{}'.format(parent, platform.node())
 | 
| 52 | 56 |          self.__name = None
 | 
| ... | ... | @@ -58,10 +62,34 @@ class BotSession: | 
| 58 | 62 |      def bot_id(self):
 | 
| 59 | 63 |          return self.__bot_id
 | 
| 60 | 64 |  | 
| 65 | +    async def run(self):
 | |
| 66 | +        """ Run a bot session
 | |
| 67 | + | |
| 68 | +        This connects and reconnects via create bot session and waits on update
 | |
| 69 | +        bot session calls.
 | |
| 70 | +        """
 | |
| 71 | +        self.__logger.debug("Starting bot session")
 | |
| 72 | +        while True:
 | |
| 73 | +            if self.connected is False:
 | |
| 74 | +                self.create_bot_session()
 | |
| 75 | +            else:
 | |
| 76 | +                self.update_bot_session()
 | |
| 77 | + | |
| 78 | +            if self._futures:
 | |
| 79 | +                await asyncio.wait(self._futures.values(),
 | |
| 80 | +                                   timeout=self._bots_interface.interval,
 | |
| 81 | +                                   return_when=asyncio.FIRST_COMPLETED)
 | |
| 82 | +            elif self.connected is False:
 | |
| 83 | +                await asyncio.sleep(self._bots_interface.interval)
 | |
| 84 | + | |
| 61 | 85 |      def create_bot_session(self):
 | 
| 62 | 86 |          self.__logger.debug("Creating bot session")
 | 
| 63 | 87 |  | 
| 64 | 88 |          session = self._bots_interface.create_bot_session(self.__parent, self.get_pb2())
 | 
| 89 | +        if session is None:
 | |
| 90 | +            self.connected = False
 | |
| 91 | +            return
 | |
| 92 | +        self.connected = True
 | |
| 65 | 93 |          self.__name = session.name
 | 
| 66 | 94 |  | 
| 67 | 95 |          self.__logger.info("Created bot session with name: [%s]", self.__name)
 | 
| ... | ... | @@ -73,6 +101,10 @@ class BotSession: | 
| 73 | 101 |          self.__logger.debug("Updating bot session: [%s]", self.__bot_id)
 | 
| 74 | 102 |  | 
| 75 | 103 |          session = self._bots_interface.update_bot_session(self.get_pb2())
 | 
| 104 | +        if session is None:
 | |
| 105 | +            self.connected = False
 | |
| 106 | +            return
 | |
| 107 | +        self.connected = True
 | |
| 76 | 108 |          server_ids = []
 | 
| 77 | 109 |  | 
| 78 | 110 |          for lease in session.leases:
 | 
