finnball pushed to branch finn/bot-refactor at BuildGrid / buildgrid
Commits:
- 
47baa9d7
by finn at 2018-08-07T14:57:29Z
11 changed files:
- .coveragerc
- .gitlab-ci.yml
- app/commands/cmd_bot.py
- buildgrid/bot/bot.py
- buildgrid/bot/bot_interface.py
- + buildgrid/bot/bot_session.py
- buildgrid/server/scheduler.py
- buildgrid/server/worker/bots_interface.py
- buildgrid/server/worker/bots_service.py
- + tests/integration/bot_session.py
- tests/integration/bots_service.py
Changes:
| ... | ... | @@ -7,6 +7,8 @@ omit = | 
| 7 | 7 |    # Omit profiling helper module
 | 
| 8 | 8 |    # Omit generated code
 | 
| 9 | 9 |    */buildgrid/google/*
 | 
| 10 | +  *_pb2.py
 | |
| 11 | +  *_pb2_grpc.py
 | |
| 10 | 12 |    */.eggs/*
 | 
| 11 | 13 |  | 
| 12 | 14 |  [report]
 | 
| ... | ... | @@ -30,7 +30,7 @@ before_script: | 
| 30 | 30 |    script:
 | 
| 31 | 31 |      - ${BGD} server start &
 | 
| 32 | 32 |      - sleep 1 # Allow server to boot
 | 
| 33 | -    - ${BGD} bot --host=0.0.0.0 dummy &
 | |
| 33 | +    - ${BGD} bot --host=0.0.0.0 --continuous dummy &
 | |
| 34 | 34 |      - ${BGD} execute --host=0.0.0.0 request --wait-for-completion
 | 
| 35 | 35 |  | 
| 36 | 36 |  tests-debian-stretch:
 | 
| ... | ... | @@ -30,10 +30,12 @@ import os | 
| 30 | 30 |  import random
 | 
| 31 | 31 |  import subprocess
 | 
| 32 | 32 |  import tempfile
 | 
| 33 | +import time
 | |
| 33 | 34 |  | 
| 34 | 35 |  from pathlib import Path, PurePath
 | 
| 35 | 36 |  | 
| 36 | -from buildgrid.bot import bot
 | |
| 37 | +from buildgrid.bot import bot, bot_interface
 | |
| 38 | +from buildgrid.bot.bot_session import BotSession, Device, Worker
 | |
| 37 | 39 |  from buildgrid._exceptions import BotError
 | 
| 38 | 40 |  | 
| 39 | 41 |  from ..cli import pass_context
 | 
| ... | ... | @@ -43,18 +45,27 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p | 
| 43 | 45 |  from google.protobuf import any_pb2
 | 
| 44 | 46 |  | 
| 45 | 47 |  @click.group(short_help = 'Create a bot client')
 | 
| 48 | +@click.option('--continuous', is_flag=True)
 | |
| 46 | 49 |  @click.option('--parent', default='bgd_test')
 | 
| 47 | -@click.option('--number-of-leases', default=1)
 | |
| 48 | 50 |  @click.option('--port', default='50051')
 | 
| 49 | 51 |  @click.option('--host', default='localhost')
 | 
| 50 | 52 |  @pass_context
 | 
| 51 | -def cli(context, host, port, number_of_leases, parent):
 | |
| 53 | +def cli(context, host, port, parent, continuous):
 | |
| 54 | +    channel = grpc.insecure_channel('{}:{}'.format(host, port))
 | |
| 55 | +    interface = bot_interface.BotInterface(channel)
 | |
| 56 | + | |
| 52 | 57 |      context.logger = logging.getLogger(__name__)
 | 
| 53 | 58 |      context.logger.info("Starting on port {}".format(port))
 | 
| 54 | 59 |  | 
| 55 | -    context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
 | |
| 56 | -    context.number_of_leases = number_of_leases
 | |
| 57 | -    context.parent = parent
 | |
| 60 | +    context.continuous = continuous
 | |
| 61 | + | |
| 62 | +    worker = Worker()
 | |
| 63 | +    worker.add_device(Device())
 | |
| 64 | + | |
| 65 | +    bot_session = BotSession(parent, interface)
 | |
| 66 | +    bot_session.add_worker(worker)
 | |
| 67 | + | |
| 68 | +    context.bot_session = bot_session
 | |
| 58 | 69 |  | 
| 59 | 70 |  @cli.command('dummy', short_help='Create a dummy bot session')
 | 
| 60 | 71 |  @pass_context
 | 
| ... | ... | @@ -63,15 +74,11 @@ def dummy(context): | 
| 63 | 74 |      Simple dummy client. Creates a session, accepts leases, does fake work and
 | 
| 64 | 75 |      updates the server.
 | 
| 65 | 76 |      """
 | 
| 66 | - | |
| 67 | -    context.logger.info("Creating a bot session")
 | |
| 68 | - | |
| 69 | 77 |      try:
 | 
| 70 | -        bot.Bot(work=_work_dummy,
 | |
| 71 | -                context=context,
 | |
| 72 | -                channel=context.channel,
 | |
| 73 | -                parent=context.parent,
 | |
| 74 | -                number_of_leases=context.number_of_leases)
 | |
| 78 | +        b = bot.Bot(context.bot_session)
 | |
| 79 | +        b.session(_work_dummy,
 | |
| 80 | +                  context,
 | |
| 81 | +                  context.continuous)
 | |
| 75 | 82 |  | 
| 76 | 83 |      except KeyboardInterrupt:
 | 
| 77 | 84 |          pass
 | 
| ... | ... | @@ -85,7 +92,7 @@ def dummy(context): | 
| 85 | 92 |  @click.option('--port', show_default = True, default=11001)
 | 
| 86 | 93 |  @click.option('--remote', show_default = True, default='localhost')
 | 
| 87 | 94 |  @pass_context
 | 
| 88 | -def _work_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
 | |
| 95 | +def work_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
 | |
| 89 | 96 |      """
 | 
| 90 | 97 |      Uses BuildBox to run commands.
 | 
| 91 | 98 |      """
 | 
| ... | ... | @@ -101,11 +108,14 @@ def _work_buildbox(context, remote, port, server_cert, client_key, client_cert, | 
| 101 | 108 |      context.fuse_dir = fuse_dir
 | 
| 102 | 109 |  | 
| 103 | 110 |      try:
 | 
| 104 | -        bot.Bot(work=_work_buildbox,
 | |
| 105 | -                context=context,
 | |
| 106 | -                channel=context.channel,
 | |
| 107 | -                parent=context.parent,
 | |
| 108 | -                number_of_leases=context.number_of_leases)
 | |
| 111 | +        b = bot.Bot(work=_work_buildbox,
 | |
| 112 | +                    bot_session=context.bot_session,
 | |
| 113 | +                    channel=context.channel,
 | |
| 114 | +                    parent=context.parent)
 | |
| 115 | + | |
| 116 | +        b.session(context.parent,
 | |
| 117 | +                  _work_buildbox,
 | |
| 118 | +                  context)
 | |
| 109 | 119 |  | 
| 110 | 120 |      except KeyboardInterrupt:
 | 
| 111 | 121 |          pass
 | 
| ... | ... | @@ -23,163 +23,46 @@ Creates a bot session. | 
| 23 | 23 |  """
 | 
| 24 | 24 |  | 
| 25 | 25 |  import asyncio
 | 
| 26 | -import inspect
 | |
| 26 | +import collections
 | |
| 27 | 27 |  import logging
 | 
| 28 | -import platform
 | |
| 29 | -import queue
 | |
| 30 | 28 |  import time
 | 
| 31 | 29 |  | 
| 32 | -from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
 | |
| 33 | - | |
| 34 | -from . import bot_interface
 | |
| 30 | +from . import bot_interface, bot_session
 | |
| 31 | +from .bot_session import BotStatus, LeaseState
 | |
| 35 | 32 |  from .._exceptions import BotError
 | 
| 36 | 33 |  | 
| 37 | -class Bot(object):
 | |
| 34 | +class Bot:
 | |
| 38 | 35 |      """
 | 
| 39 | 36 |      Creates a local BotSession.
 | 
| 40 | 37 |      """
 | 
| 41 | 38 |  | 
| 42 | -    def __init__(self, work, context, channel, parent, number_of_leases):
 | |
| 43 | -        if not inspect.iscoroutinefunction(work):
 | |
| 44 | -            raise BotError("work function must be async")
 | |
| 39 | +    UPDATE_PERIOD = 1
 | |
| 45 | 40 |  | 
| 46 | -        self.interface = bot_interface.BotInterface(channel)
 | |
| 41 | +    def __init__(self, bot_session):
 | |
| 47 | 42 |          self.logger = logging.getLogger(__name__)
 | 
| 48 | 43 |  | 
| 49 | -        self._create_session(parent, number_of_leases)
 | |
| 50 | -        self._work_queue = queue.Queue(maxsize = number_of_leases)
 | |
| 51 | - | |
| 52 | -        try:
 | |
| 53 | -            while True:
 | |
| 54 | -                ## TODO: Leases independently finish
 | |
| 55 | -                ## Allow leases to queue finished work independently instead
 | |
| 56 | -                ## of waiting for all to finish
 | |
| 57 | -                futures = [self._do_work(work, context, lease) for lease in self._get_work()]
 | |
| 58 | -                if futures:
 | |
| 59 | -                    loop = asyncio.new_event_loop()
 | |
| 60 | -                    leases_complete, _ = loop.run_until_complete(asyncio.wait(futures))
 | |
| 61 | -                    work_complete = [(lease.result().id, lease.result(),) for lease in leases_complete]
 | |
| 62 | -                    self._work_complete(work_complete)
 | |
| 63 | -                    loop.close()
 | |
| 64 | -                self._update_bot_session()
 | |
| 65 | -                time.sleep(2)
 | |
| 66 | -        except Exception as e:
 | |
| 67 | -            self.logger.error(e)
 | |
| 68 | -            raise BotError(e)
 | |
| 69 | - | |
| 70 | -    @property
 | |
| 71 | -    def bot_session(self):
 | |
| 72 | -        ## Read only, shouldn't have to set any of the variables in here
 | |
| 73 | -        return self._bot_session
 | |
| 74 | - | |
| 75 | -    def close_session(self):
 | |
| 76 | -        self.logger.warning("Session closing not yet implemented")
 | |
| 77 | - | |
| 78 | -    async def _do_work(self, work, context, lease):
 | |
| 79 | -        """ Work is done here, work function should be asynchronous
 | |
| 80 | -        """
 | |
| 81 | -        self.logger.info("Work found: {}".format(lease.id))
 | |
| 82 | -        lease = await work(context=context, lease=lease)
 | |
| 83 | -        lease.state = bots_pb2.LeaseState.Value('COMPLETED')
 | |
| 84 | -        self.logger.info("Work complete: {}".format(lease.id))
 | |
| 85 | -        return lease
 | |
| 86 | - | |
| 87 | -    def _update_bot_session(self):
 | |
| 88 | -        """ Should call the server periodically to inform the server the client
 | |
| 89 | -        has not died.
 | |
| 90 | -        """
 | |
| 91 | -        self.logger.debug("Updating bot session")
 | |
| 92 | -        self._bot_session = self.interface.update_bot_session(self._bot_session)
 | |
| 93 | -        leases_update = ([self._update_lease(lease) for lease in self._bot_session.leases])
 | |
| 94 | -        del self._bot_session.leases[:]
 | |
| 95 | -        self._bot_session.leases.extend(leases_update)
 | |
| 96 | - | |
| 97 | -    def _get_work(self):
 | |
| 98 | -        while not self._work_queue.empty():
 | |
| 99 | -            yield self._work_queue.get()
 | |
| 100 | -            self._work_queue.task_done()
 | |
| 101 | - | |
| 102 | -    def _work_complete(self, leases_complete):
 | |
| 103 | -        """ Bot updates itself with any completed work.
 | |
| 104 | -        """
 | |
| 105 | -        # Should really improve this...
 | |
| 106 | -        # Maybe add some call back function sentoff work...
 | |
| 107 | -        leases_active = list(filter(self._lease_active, self._bot_session.leases))
 | |
| 108 | -        leases_not_active = [lease for lease in self._bot_session.leases if not self._lease_active(lease)]
 | |
| 109 | -        del self._bot_session.leases[:]
 | |
| 110 | -        for lease in leases_active:
 | |
| 111 | -            for lease_tuple in leases_complete:
 | |
| 112 | -                if lease.id == lease_tuple[0]:
 | |
| 113 | -                    leases_not_active.extend([lease_tuple[1]])
 | |
| 114 | -        self._bot_session.leases.extend(leases_not_active)
 | |
| 115 | - | |
| 116 | -    def _update_lease(self, lease):
 | |
| 117 | -        """
 | |
| 118 | -        State machine for any recieved updates to the leases.
 | |
| 119 | -        """
 | |
| 120 | -        if self._lease_pending(lease):
 | |
| 121 | -            lease.state = bots_pb2.LeaseState.Value('ACTIVE')
 | |
| 122 | -            self._work_queue.put(lease)
 | |
| 123 | -            return lease
 | |
| 124 | - | |
| 125 | -        else:
 | |
| 126 | -            return lease
 | |
| 127 | - | |
| 128 | -    def _create_session(self, parent, number_of_leases):
 | |
| 129 | -        self.logger.debug("Creating bot session")
 | |
| 130 | -        worker = self._create_worker()
 | |
| 44 | +        self._bot_session = bot_session
 | |
| 131 | 45 |  | 
| 132 | -        """ Unique bot ID within the farm used to identify this bot
 | |
| 133 | -        Needs to be human readable.
 | |
| 134 | -        All prior sessions with bot_id of same ID are invalidated.
 | |
| 135 | -        If a bot attempts to update an invalid session, it must be rejected and
 | |
| 136 | -        may be put in quarantine.
 | |
| 137 | -        """
 | |
| 138 | -        bot_id = '{}.{}'.format(parent, platform.node())
 | |
| 46 | +    def session(self, work, context, continuous = False):
 | |
| 47 | +        loop = asyncio.get_event_loop()
 | |
| 139 | 48 |  | 
| 140 | -        leases = [bots_pb2.Lease() for x in range(number_of_leases)]
 | |
| 49 | +        self._bot_session.create_bot_session(work, context)
 | |
| 141 | 50 |  | 
| 142 | -        bot_session = bots_pb2.BotSession(worker = worker,
 | |
| 143 | -                                          status = bots_pb2.BotStatus.Value('OK'),
 | |
| 144 | -                                          leases = leases,
 | |
| 145 | -                                          bot_id = bot_id)
 | |
| 146 | -        self._bot_session = self.interface.create_bot_session(parent, bot_session)
 | |
| 147 | -        self.logger.info("Name: {}, Id: {}".format(self._bot_session.name,
 | |
| 148 | -                                                      self._bot_session.bot_id))
 | |
| 149 | - | |
| 150 | -    def _create_worker(self):
 | |
| 151 | -        devices = self._create_devices()
 | |
| 152 | - | |
| 153 | -        # Contains a list of devices and the connections between them.
 | |
| 154 | -        worker = worker_pb2.Worker(devices = devices)
 | |
| 155 | - | |
| 156 | -        """ Keys supported:
 | |
| 157 | -        *pool
 | |
| 158 | -        """
 | |
| 159 | -        worker.Property.key = "pool"
 | |
| 160 | -        worker.Property.value = "all"
 | |
| 161 | - | |
| 162 | -        return worker
 | |
| 163 | - | |
| 164 | -    def _create_devices(self):
 | |
| 165 | -        """ Creates devices available to the worker
 | |
| 166 | -        The first device is know as the Primary Device - the revice which
 | |
| 167 | -        is running a bit and responsible to actually executing commands.
 | |
| 168 | -        All other devices are known as Attatched Devices and must be controlled
 | |
| 169 | -        by the Primary Device.
 | |
| 170 | -        """
 | |
| 171 | - | |
| 172 | -        devices = []
 | |
| 173 | - | |
| 174 | -        for i in range(0, 1): # Append one device for now
 | |
| 175 | -            dev = worker_pb2.Device()
 | |
| 176 | - | |
| 177 | -            devices.append(dev)
 | |
| 178 | - | |
| 179 | -        return devices
 | |
| 180 | - | |
| 181 | -    def _lease_pending(self, lease):
 | |
| 182 | -        return lease.state == bots_pb2.LeaseState.Value('PENDING')
 | |
| 183 | - | |
| 184 | -    def _lease_active(self, lease):
 | |
| 185 | -        return lease.state == bots_pb2.LeaseState.Value('ACTIVE') | |
| 51 | +        try:
 | |
| 52 | +            task = asyncio.ensure_future(self._update_bot_session())
 | |
| 53 | +            loop.run_forever()
 | |
| 54 | + | |
| 55 | +        except KeyboardInterrupt:
 | |
| 56 | +            pass
 | |
| 57 | + | |
| 58 | +        finally:
 | |
| 59 | +            task.cancel()
 | |
| 60 | +            loop.close()
 | |
| 61 | + | |
| 62 | +    async def _update_bot_session(self):
 | |
| 63 | +        while True:
 | |
| 64 | +            """ Calls the server periodically to inform the server the client
 | |
| 65 | +            has not died.
 | |
| 66 | +            """
 | |
| 67 | +            self._bot_session.update_bot_session()
 | |
| 68 | +            await asyncio.sleep(self.UPDATE_PERIOD) | 
| ... | ... | @@ -29,7 +29,7 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bo | 
| 29 | 29 |  | 
| 30 | 30 |  from .._exceptions import BotError
 | 
| 31 | 31 |  | 
| 32 | -class BotInterface(object):
 | |
| 32 | +class BotInterface:
 | |
| 33 | 33 |      """ Interface handles calls to the server.
 | 
| 34 | 34 |      """
 | 
| 35 | 35 |  | 
| ... | ... | @@ -39,22 +39,12 @@ class BotInterface(object): | 
| 39 | 39 |          self._stub = bots_pb2_grpc.BotsStub(channel)
 | 
| 40 | 40 |  | 
| 41 | 41 |      def create_bot_session(self, parent, bot_session):
 | 
| 42 | -        try:
 | |
| 43 | -            request = bots_pb2.CreateBotSessionRequest(parent = parent,
 | |
| 44 | -                                                       bot_session = bot_session)
 | |
| 45 | -            return self._stub.CreateBotSession(request)
 | |
| 46 | - | |
| 47 | -        except Exception as e:
 | |
| 48 | -            self.logger.error(e)
 | |
| 49 | -            raise BotError(e)
 | |
| 42 | +        request = bots_pb2.CreateBotSessionRequest(parent = parent,
 | |
| 43 | +                                                   bot_session = bot_session)
 | |
| 44 | +        return self._stub.CreateBotSession(request)
 | |
| 50 | 45 |  | 
| 51 | 46 |      def update_bot_session(self, bot_session, update_mask = None):
 | 
| 52 | -        try:
 | |
| 53 | -            request = bots_pb2.UpdateBotSessionRequest(name = bot_session.name,
 | |
| 54 | -                                                       bot_session = bot_session,
 | |
| 55 | -                                                       update_mask = update_mask)
 | |
| 56 | -            return self._stub.UpdateBotSession(request)
 | |
| 57 | - | |
| 58 | -        except Exception as e:
 | |
| 59 | -            self.logger.error(e)
 | |
| 60 | -            raise BotError(e) | |
| 47 | +        request = bots_pb2.UpdateBotSessionRequest(name = bot_session.name,
 | |
| 48 | +                                                   bot_session = bot_session,
 | |
| 49 | +                                                   update_mask = update_mask)
 | |
| 50 | +        return self._stub.UpdateBotSession(request) | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | +"""
 | |
| 16 | +Bot Session
 | |
| 17 | +====
 | |
| 18 | + | |
| 19 | +Allows connections
 | |
| 20 | +"""
 | |
| 21 | +import asyncio
 | |
| 22 | +import logging
 | |
| 23 | +import platform
 | |
| 24 | +import uuid
 | |
| 25 | + | |
| 26 | +from enum import Enum
 | |
| 27 | + | |
| 28 | +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
 | |
| 29 | + | |
| 30 | +class BotStatus(Enum):
 | |
| 31 | +    BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
 | |
| 32 | +    OK                     = bots_pb2.BotStatus.Value('OK')
 | |
| 33 | +    UNHEALTHY              = bots_pb2.BotStatus.Value('UNHEALTHY');
 | |
| 34 | +    HOST_REBOOTING         = bots_pb2.BotStatus.Value('HOST_REBOOTING')
 | |
| 35 | +    BOT_TERMINATING        = bots_pb2.BotStatus.Value('BOT_TERMINATING')
 | |
| 36 | + | |
| 37 | +class LeaseState(Enum):
 | |
| 38 | +    LEASE_STATE_UNSPECIFIED = bots_pb2.LeaseState.Value('LEASE_STATE_UNSPECIFIED')
 | |
| 39 | +    PENDING                 = bots_pb2.LeaseState.Value('PENDING')
 | |
| 40 | +    ACTIVE                  = bots_pb2.LeaseState.Value('ACTIVE')
 | |
| 41 | +    COMPLETED               = bots_pb2.LeaseState.Value('COMPLETED')
 | |
| 42 | +    CANCELLED               = bots_pb2.LeaseState.Value('CANCELLED')
 | |
| 43 | + | |
| 44 | + | |
| 45 | +class BotSession:
 | |
| 46 | +    def __init__(self, parent, interface):
 | |
| 47 | +        """ Unique bot ID within the farm used to identify this bot
 | |
| 48 | +        Needs to be human readable.
 | |
| 49 | +        All prior sessions with bot_id of same ID are invalidated.
 | |
| 50 | +        If a bot attempts to update an invalid session, it must be rejected and
 | |
| 51 | +        may be put in quarantine.
 | |
| 52 | +        """
 | |
| 53 | + | |
| 54 | +        self.logger = logging.getLogger(__name__)
 | |
| 55 | + | |
| 56 | +        self._bot_id = '{}.{}'.format(parent, platform.node())
 | |
| 57 | +        self._interface = interface
 | |
| 58 | +        self._leases = {}
 | |
| 59 | +        self._name = None
 | |
| 60 | +        self._parent = parent
 | |
| 61 | +        self._status = BotStatus.OK.value
 | |
| 62 | +        self._work = None
 | |
| 63 | +        self._worker = None
 | |
| 64 | + | |
| 65 | +    @property
 | |
| 66 | +    def bot_id(self):
 | |
| 67 | +        return self._bot_id
 | |
| 68 | + | |
| 69 | +    def add_worker(self, worker):
 | |
| 70 | +        self._worker = worker
 | |
| 71 | + | |
| 72 | +    def create_bot_session(self, work, context=None):
 | |
| 73 | +        self.logger.debug("Creating bot session")
 | |
| 74 | +        self._work = work
 | |
| 75 | +        self._context = context
 | |
| 76 | + | |
| 77 | +        session = self._interface.create_bot_session(self._parent, self.get_pb2())
 | |
| 78 | +        self._name = session.name
 | |
| 79 | +        self.logger.info("Created bot session with name: {}".format(self._name))
 | |
| 80 | + | |
| 81 | +    def update_bot_session(self):
 | |
| 82 | +        session = self._interface.update_bot_session(self.get_pb2())
 | |
| 83 | +        for lease in session.leases:
 | |
| 84 | +            self._update_lease_from_server(lease)
 | |
| 85 | + | |
| 86 | +    def get_pb2(self):
 | |
| 87 | +        leases = list(self._leases.values())
 | |
| 88 | +        if not leases:
 | |
| 89 | +            leases = None
 | |
| 90 | + | |
| 91 | +        return bots_pb2.BotSession(worker=self._worker.get_pb2(),
 | |
| 92 | +                                   status=self._status,
 | |
| 93 | +                                   leases=leases,
 | |
| 94 | +                                   bot_id=self._bot_id,
 | |
| 95 | +                                   name = self._name)
 | |
| 96 | + | |
| 97 | +    def lease_completed(self, lease):
 | |
| 98 | +        lease.state = LeaseState.COMPLETED.value
 | |
| 99 | +        self._leases[lease.id] = lease
 | |
| 100 | + | |
| 101 | +    def _update_lease_from_server(self, lease):
 | |
| 102 | +        """
 | |
| 103 | +        State machine for any recieved updates to the leases.
 | |
| 104 | +        """
 | |
| 105 | +        ## TODO: Compare with previous state of lease
 | |
| 106 | +        lease_bot = self._leases.get(lease.id)
 | |
| 107 | + | |
| 108 | +        if lease.state == LeaseState.PENDING.value:
 | |
| 109 | +            lease.state = LeaseState.ACTIVE.value
 | |
| 110 | +            asyncio.ensure_future(self.create_work(lease))
 | |
| 111 | +            self._leases[lease.id] = lease
 | |
| 112 | + | |
| 113 | +        elif lease.state == LeaseState.COMPLETED.value and \
 | |
| 114 | +           lease_bot.state == LeaseState.COMPLETED.value:
 | |
| 115 | +            del self._leases[lease.id]
 | |
| 116 | + | |
| 117 | +    async def create_work(self, lease):
 | |
| 118 | +        self.logger.debug("Work created: {}".format(lease.id))
 | |
| 119 | +        lease = await self._work(self._context, lease)
 | |
| 120 | +        self.logger.debug("Work complete: {}".format(lease.id))
 | |
| 121 | +        self.lease_completed(lease)
 | |
| 122 | + | |
| 123 | +class Worker:
 | |
| 124 | +    def __init__(self, properties=None, configs=None):
 | |
| 125 | +        self.properties = {}
 | |
| 126 | +        self._configs = {}
 | |
| 127 | +        self._devices = []
 | |
| 128 | + | |
| 129 | +        if properties:
 | |
| 130 | +            for k, v in properties.items():
 | |
| 131 | +                if k == 'pool':
 | |
| 132 | +                    self.properties[k] = v
 | |
| 133 | +                else:
 | |
| 134 | +                    raise KeyError('Key not supported: {}'.format(k))
 | |
| 135 | + | |
| 136 | +        if configs:
 | |
| 137 | +            for k, v in configs.items():
 | |
| 138 | +                if k == 'DockerImage':
 | |
| 139 | +                    self.configs[k] = v
 | |
| 140 | +                else:
 | |
| 141 | +                    raise KeyError('Key not supported: {}'.format(k))
 | |
| 142 | + | |
| 143 | +    @property
 | |
| 144 | +    def configs(self):
 | |
| 145 | +        return self._configs
 | |
| 146 | + | |
| 147 | +    def add_device(self, device):
 | |
| 148 | +        self._devices.append(device)
 | |
| 149 | + | |
| 150 | +    def get_pb2(self):
 | |
| 151 | +        devices = [device.get_pb2() for device in self._devices]
 | |
| 152 | +        worker = worker_pb2.Worker(devices=devices)
 | |
| 153 | +        property_message = worker_pb2.Worker.Property()
 | |
| 154 | +        for k, v in self.properties.items():
 | |
| 155 | +            property_message.key = k
 | |
| 156 | +            property_message.value = v
 | |
| 157 | +            worker.properties.extend([property_message])
 | |
| 158 | + | |
| 159 | +        config_message = worker_pb2.Worker.Config()
 | |
| 160 | +        for k, v in self.properties.items():
 | |
| 161 | +            property_message.key = k
 | |
| 162 | +            property_message.value = v
 | |
| 163 | +            worker.configs.extend([config_message])
 | |
| 164 | + | |
| 165 | +        return worker
 | |
| 166 | + | |
| 167 | +class Device:
 | |
| 168 | +    def __init__(self, properties=None):
 | |
| 169 | +        """ Creates devices available to the worker
 | |
| 170 | +        The first device is know as the Primary Device - the revice which
 | |
| 171 | +        is running a bit and responsible to actually executing commands.
 | |
| 172 | +        All other devices are known as Attatched Devices and must be controlled
 | |
| 173 | +        by the Primary Device.
 | |
| 174 | +        """
 | |
| 175 | + | |
| 176 | +        self._name = str(uuid.uuid4())
 | |
| 177 | +        self._properties = {}
 | |
| 178 | + | |
| 179 | +        if properties:
 | |
| 180 | +            for k, v in properties.items():
 | |
| 181 | +                if k == 'os':
 | |
| 182 | +                    self._properties[k] = v
 | |
| 183 | + | |
| 184 | +                elif k == 'docker':
 | |
| 185 | +                    if v not in ('True', 'False'):
 | |
| 186 | +                        raise ValueError('Value not supported: {}'.format(v))
 | |
| 187 | +                    self._properties[k] = v
 | |
| 188 | + | |
| 189 | +                else:
 | |
| 190 | +                    raise KeyError('Key not supported: {}'.format(k))
 | |
| 191 | + | |
| 192 | +    @property
 | |
| 193 | +    def name(self):
 | |
| 194 | +        return self._name
 | |
| 195 | + | |
| 196 | +    @property
 | |
| 197 | +    def properties(self):
 | |
| 198 | +        return self._properties
 | |
| 199 | + | |
| 200 | + | |
| 201 | +    def get_pb2(self):
 | |
| 202 | +        device = worker_pb2.Device(handle=self._name)
 | |
| 203 | +        property_message = worker_pb2.Device.Property()
 | |
| 204 | +        for k, v in self._properties.items():
 | |
| 205 | +            property_message.key = k
 | |
| 206 | +            property_message.value = v
 | |
| 207 | +            device.properties.extend([property_message])
 | |
| 208 | +        return device | 
| ... | ... | @@ -50,24 +50,19 @@ class Scheduler(): | 
| 50 | 50 |          self.queue.append(job)
 | 
| 51 | 51 |  | 
| 52 | 52 |      def retry_job(self, name):
 | 
| 53 | -        job = self.jobs[name]
 | |
| 54 | - | |
| 55 | -        if job.n_tries >= self.MAX_N_TRIES:
 | |
| 56 | -            # TODO: Decide what to do with these jobs
 | |
| 57 | -            job.update_execute_stage(ExecuteStage.COMPLETED)
 | |
| 58 | -        else:
 | |
| 59 | -            job.update_execute_stage(ExecuteStage.QUEUED)
 | |
| 60 | -            job.n_tries += 1
 | |
| 61 | -            self.queue.appendleft(job)
 | |
| 53 | +        job = self.jobs.get(name)
 | |
| 62 | 54 |  | 
| 63 | -        self.jobs[name] = job
 | |
| 55 | +        if job is not None:
 | |
| 56 | +            if job.n_tries >= self.MAX_N_TRIES:
 | |
| 57 | +                # TODO: Decide what to do with these jobs
 | |
| 58 | +                job.update_execute_stage(ExecuteStage.COMPLETED)
 | |
| 59 | +                # TODO: Mark these jobs as done
 | |
| 60 | +            else:
 | |
| 61 | +                job.update_execute_stage(ExecuteStage.QUEUED)
 | |
| 62 | +                job.n_tries += 1
 | |
| 63 | +                self.queue.appendleft(job)
 | |
| 64 | 64 |  | 
| 65 | -    def create_job(self):
 | |
| 66 | -        if len(self.queue) > 0:
 | |
| 67 | -            job = self.queue.popleft()
 | |
| 68 | -            job.update_execute_stage(ExecuteStage.EXECUTING)
 | |
| 69 | -            self.jobs[job.name] = job
 | |
| 70 | -            return job
 | |
| 65 | +            self.jobs[name] = job
 | |
| 71 | 66 |  | 
| 72 | 67 |      def job_complete(self, name, result):
 | 
| 73 | 68 |          job = self.jobs[name]
 | 
| ... | ... | @@ -81,48 +76,13 @@ class Scheduler(): | 
| 81 | 76 |              response.operations.extend([v.get_operation()])
 | 
| 82 | 77 |          return response
 | 
| 83 | 78 |  | 
| 84 | -    def update_lease(self, lease):
 | |
| 85 | -        name = lease.id
 | |
| 79 | +    def update_job_lease_state(self, name, state):
 | |
| 86 | 80 |          job = self.jobs.get(name)
 | 
| 87 | -        state = lease.state
 | |
| 88 | - | |
| 89 | -        if state   == LeaseState.LEASE_STATE_UNSPECIFIED.value:
 | |
| 90 | -            create_job = self.create_job()
 | |
| 91 | -            if create_job is None:
 | |
| 92 | -                # No job? Return lease.
 | |
| 93 | -                return lease
 | |
| 94 | -            else:
 | |
| 95 | -                job = create_job
 | |
| 96 | -                job.create_lease()
 | |
| 97 | - | |
| 98 | -        elif state == LeaseState.PENDING.value:
 | |
| 99 | -            job.lease = lease
 | |
| 100 | - | |
| 101 | -        elif state == LeaseState.ACTIVE.value:
 | |
| 102 | -            job.lease = lease
 | |
| 103 | - | |
| 104 | -        elif state == LeaseState.COMPLETED.value:
 | |
| 105 | -            self.job_complete(job.name, lease.result)
 | |
| 106 | - | |
| 107 | -            create_job = self.create_job()
 | |
| 108 | -            if create_job is None:
 | |
| 109 | -                # Docs say not to use this state though if job has
 | |
| 110 | -                # completed and no more jobs, then use this state to stop
 | |
| 111 | -                # job being processed again
 | |
| 112 | -                job.lease = lease
 | |
| 113 | -                job.lease.state = LeaseState.LEASE_STATE_UNSPECIFIED.value
 | |
| 114 | -            else:
 | |
| 115 | -                job = create_job
 | |
| 116 | -                job.lease = job.create_lease()
 | |
| 117 | - | |
| 118 | -        elif state == LeaseState.CANCELLED.value:
 | |
| 119 | -            job.lease = lease
 | |
| 120 | - | |
| 121 | -        else:
 | |
| 122 | -            raise Exception("Unknown state: {}".format(state))
 | |
| 123 | - | |
| 81 | +        job.lease.state = state
 | |
| 124 | 82 |          self.jobs[name] = job
 | 
| 125 | -        return job.lease
 | |
| 83 | + | |
| 84 | +    def get_job_lease(self, name):
 | |
| 85 | +        return self.jobs[name].lease
 | |
| 126 | 86 |  | 
| 127 | 87 |      def cancel_session(self, name):
 | 
| 128 | 88 |          job = self.jobs[name]
 | 
| ... | ... | @@ -130,3 +90,12 @@ class Scheduler(): | 
| 130 | 90 |          if state == LeaseState.PENDING.value or \
 | 
| 131 | 91 |             state == LeaseState.ACTIVE.value:
 | 
| 132 | 92 |              self.retry_job(name)
 | 
| 93 | + | |
| 94 | +    def create_leases(self):
 | |
| 95 | +        while len(self.queue) > 0:
 | |
| 96 | +            job = self.queue.popleft()
 | |
| 97 | +            job.update_execute_stage(ExecuteStage.EXECUTING)
 | |
| 98 | +            job.lease = job.create_lease()
 | |
| 99 | +            job.lease.state = LeaseState.PENDING.value
 | |
| 100 | +            self.jobs[job.name] = job
 | |
| 101 | +            yield job.lease | 
| ... | ... | @@ -35,6 +35,7 @@ class BotsInterface(): | 
| 35 | 35 |          self.logger = logging.getLogger(__name__)
 | 
| 36 | 36 |  | 
| 37 | 37 |          self._bot_ids = {}
 | 
| 38 | +        self._bot_sessions = {}
 | |
| 38 | 39 |          self._scheduler = scheduler
 | 
| 39 | 40 |  | 
| 40 | 41 |      def create_bot_session(self, parent, bot_session):
 | 
| ... | ... | @@ -59,6 +60,7 @@ class BotsInterface(): | 
| 59 | 60 |          bot_session.name = name
 | 
| 60 | 61 |  | 
| 61 | 62 |          self._bot_ids[name] = bot_id
 | 
| 63 | +        self._bot_sessions[name] = bot_session
 | |
| 62 | 64 |          self.logger.info("Created bot session name={} with bot_id={}".format(name, bot_id))
 | 
| 63 | 65 |          return bot_session
 | 
| 64 | 66 |  | 
| ... | ... | @@ -69,13 +71,63 @@ class BotsInterface(): | 
| 69 | 71 |          self.logger.debug("Updating bot session name={}".format(name))
 | 
| 70 | 72 |          self._check_bot_ids(bot_session.bot_id, name)
 | 
| 71 | 73 |  | 
| 72 | -        leases = [self._scheduler.update_lease(lease) for lease in bot_session.leases]
 | |
| 74 | +        server_session = self._bot_sessions[name]
 | |
| 75 | + | |
| 76 | +        leases = filter(None, [self.check_states(lease) for lease in bot_session.leases])
 | |
| 73 | 77 |  | 
| 74 | 78 |          del bot_session.leases[:]
 | 
| 75 | 79 |          bot_session.leases.extend(leases)
 | 
| 76 | 80 |  | 
| 81 | +        for lease in self._scheduler.create_leases():
 | |
| 82 | +            bot_session.leases.extend([lease])
 | |
| 83 | + | |
| 84 | +        self._bot_sessions[name] = bot_session
 | |
| 77 | 85 |          return bot_session
 | 
| 78 | 86 |  | 
| 87 | +    def check_states(self, client_lease):
 | |
| 88 | +        """ Edge detector for states
 | |
| 89 | +        """
 | |
| 90 | +        ## TODO: Handle cancelled states
 | |
| 91 | +        server_lease = self._scheduler.get_job_lease(client_lease.id)
 | |
| 92 | +        server_state = LeaseState(server_lease.state)
 | |
| 93 | +        client_state = LeaseState(client_lease.state)
 | |
| 94 | + | |
| 95 | +        if server_state == LeaseState.PENDING:
 | |
| 96 | + | |
| 97 | +            if client_state == LeaseState.ACTIVE:
 | |
| 98 | +                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
 | |
| 99 | +            elif client_state == LeaseState.COMPLETED:
 | |
| 100 | +                # TODO: Lease was rejected
 | |
| 101 | +                raise NotImplementedError("'Not Accepted' is unsupported")
 | |
| 102 | +            else:
 | |
| 103 | +                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
 | |
| 104 | + | |
| 105 | +        elif server_state == LeaseState.ACTIVE:
 | |
| 106 | + | |
| 107 | +            if client_state == LeaseState.ACTIVE:
 | |
| 108 | +                pass
 | |
| 109 | + | |
| 110 | +            elif client_state == LeaseState.COMPLETED:
 | |
| 111 | +                self._scheduler.job_complete(client_lease.id, client_lease.result)
 | |
| 112 | +                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
 | |
| 113 | +                return None
 | |
| 114 | + | |
| 115 | +            else:
 | |
| 116 | +                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
 | |
| 117 | + | |
| 118 | +        elif server_state == LeaseState.COMPLETED:
 | |
| 119 | +            raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
 | |
| 120 | + | |
| 121 | +        elif server_state == LeaseState.CANCELLED:
 | |
| 122 | +            raise NotImplementedError("Cancelled states not supported yet")
 | |
| 123 | + | |
| 124 | +        else:
 | |
| 125 | +            # Sould never get here
 | |
| 126 | +            raise OutofSyncError("State now allowed: {}".format(server_state))
 | |
| 127 | + | |
| 128 | +        return client_lease
 | |
| 129 | + | |
| 130 | + | |
| 79 | 131 |      def _check_bot_ids(self, bot_id, name = None):
 | 
| 80 | 132 |          """ Checks the ID and the name of the bot.
 | 
| 81 | 133 |          """
 | 
| ... | ... | @@ -103,7 +155,11 @@ class BotsInterface(): | 
| 103 | 155 |              raise InvalidArgumentError("Bot id does not exist: {}".format(name))
 | 
| 104 | 156 |  | 
| 105 | 157 |          self.logger.debug("Attempting to close {} with name: {}".format(bot_id, name))
 | 
| 106 | -        self._scheduler.retry_job(name)
 | |
| 158 | +        for lease in self._bot_sessions[name].leases:
 | |
| 159 | +            if lease.state != LeaseState.COMPLETED.value:
 | |
| 160 | +                # TODO: Be wary here, may need to handle rejected leases in future
 | |
| 161 | +                self._scheduler.retry_job(lease.id)
 | |
| 162 | + | |
| 107 | 163 |          self.logger.debug("Closing bot session: {}".format(name))
 | 
| 108 | 164 |          self._bot_ids.pop(name)
 | 
| 109 | 165 |          self.logger.info("Closed bot {} with name: {}".format(bot_id, name)) | 
| ... | ... | @@ -43,7 +43,6 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 43 | 43 |              self.logger.error(e)
 | 
| 44 | 44 |              context.set_details(str(e))
 | 
| 45 | 45 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 46 | -            return bots_pb2.BotSession()
 | |
| 47 | 46 |  | 
| 48 | 47 |      def UpdateBotSession(self, request, context):
 | 
| 49 | 48 |          try:
 | 
| ... | ... | @@ -53,13 +52,16 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 53 | 52 |              self.logger.error(e)
 | 
| 54 | 53 |              context.set_details(str(e))
 | 
| 55 | 54 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 56 | -            return bots_pb2.BotSession()
 | |
| 57 | 55 |  | 
| 58 | 56 |          except OutofSyncError as e:
 | 
| 59 | 57 |              self.logger.error(e)
 | 
| 60 | 58 |              context.set_details(str(e))
 | 
| 61 | 59 |              context.set_code(grpc.StatusCode.DATA_LOSS)
 | 
| 62 | -            return bots_pb2.BotSession()
 | |
| 60 | + | |
| 61 | +        except NotImplementedError as e:
 | |
| 62 | +            self.logger.error(e)
 | |
| 63 | +            context.set_details(str(e))
 | |
| 64 | +            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | |
| 63 | 65 |  | 
| 64 | 66 |      def PostBotEventTemp(self, request, context):
 | 
| 65 | 67 |          context.set_code(grpc.StatusCode.UNIMPLEMENTED) | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | +import grpc
 | |
| 16 | +import pytest
 | |
| 17 | +import uuid
 | |
| 18 | + | |
| 19 | +from unittest import mock
 | |
| 20 | + | |
| 21 | +from grpc._server import _Context
 | |
| 22 | +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
 | |
| 23 | +from google.protobuf import any_pb2
 | |
| 24 | + | |
| 25 | +from buildgrid.bot import bot_session, bot_interface
 | |
| 26 | + | |
| 27 | +# GRPC context
 | |
| 28 | +@pytest.fixture
 | |
| 29 | +def channel():
 | |
| 30 | +    yield mock.MagicMock(spec = grpc.insecure_channel)
 | |
| 31 | + | |
| 32 | +@pytest.fixture
 | |
| 33 | +@mock.patch.object(bot_interface.bots_pb2_grpc, 'BotsStub', autospec = True)
 | |
| 34 | +def interface(stub):
 | |
| 35 | +    yield bot_interface.BotInterface(None)
 | |
| 36 | + | |
| 37 | +# Instance to test
 | |
| 38 | +@pytest.fixture
 | |
| 39 | +def instance(interface):
 | |
| 40 | +    yield bot_session.BotSession('deckard', interface)
 | |
| 41 | + | |
| 42 | +@pytest.mark.parametrize("docker_value", ["True", "False"])
 | |
| 43 | +@pytest.mark.parametrize("os_value", ["nexus7", "nexus8"])
 | |
| 44 | +def test_create_device(docker_value, os_value):
 | |
| 45 | +    properties = {'docker' : docker_value, 'os' : os_value}
 | |
| 46 | +    device = bot_session.Device(properties)
 | |
| 47 | + | |
| 48 | +    assert uuid.UUID(device.name, version=4)
 | |
| 49 | +    assert properties == device.properties
 | |
| 50 | + | |
| 51 | +def test_create_device_key_fail():
 | |
| 52 | +    properties = {'voight' : 'kampff'}
 | |
| 53 | + | |
| 54 | +    with pytest.raises(KeyError):
 | |
| 55 | +        device = bot_session.Device(properties)
 | |
| 56 | + | |
| 57 | +def test_create_device_value_fail():
 | |
| 58 | +    properties = {'docker' :  True}
 | |
| 59 | + | |
| 60 | +    with pytest.raises(ValueError):
 | |
| 61 | +        device = bot_session.Device(properties)
 | |
| 62 | + | |
| 63 | +def test_create_worker():
 | |
| 64 | +    properties = {'pool' : 'swim'}
 | |
| 65 | +    configs = {'DockerImage' : 'Windows'}
 | |
| 66 | +    worker = bot_session.Worker(properties, configs)
 | |
| 67 | + | |
| 68 | +    assert properties == worker.properties
 | |
| 69 | +    assert configs == worker.configs
 | |
| 70 | + | |
| 71 | +    device = bot_session.Device()
 | |
| 72 | +    worker.add_device(device)
 | |
| 73 | + | |
| 74 | +    assert worker._devices[0] == device
 | |
| 75 | + | |
| 76 | +def test_create_worker_key_fail():
 | |
| 77 | +    properties = {'voight' : 'kampff'}
 | |
| 78 | +    configs = {'voight' : 'kampff'}
 | |
| 79 | + | |
| 80 | +    with pytest.raises(KeyError):
 | |
| 81 | +        bot_session.Worker(properties)
 | |
| 82 | +    with pytest.raises(KeyError):
 | |
| 83 | +        bot_session.Worker(configs)
 | |
| 84 | + | |
| 85 | +def test_add_worker(instance):
 | |
| 86 | +    worker = bot_session.Worker()
 | |
| 87 | +    instance.add_worker(worker)
 | |
| 88 | + | |
| 89 | +    assert instance._worker == worker | 
| ... | ... | @@ -36,6 +36,12 @@ from buildgrid.server.worker import bots_interface, bots_service | 
| 36 | 36 |  def context():
 | 
| 37 | 37 |      yield mock.MagicMock(spec = _Context)
 | 
| 38 | 38 |  | 
| 39 | +@pytest.fixture
 | |
| 40 | +def action_job():
 | |
| 41 | +    action_digest = remote_execution_pb2.Digest()
 | |
| 42 | +    j = job.Job(action_digest, None)
 | |
| 43 | +    yield j
 | |
| 44 | + | |
| 39 | 45 |  @pytest.fixture
 | 
| 40 | 46 |  def bot_session():
 | 
| 41 | 47 |      bot = bots_pb2.BotSession()
 | 
| ... | ... | @@ -101,7 +107,6 @@ def test_update_bot_session_zombie(bot_session, context, instance): | 
| 101 | 107 |  | 
| 102 | 108 |      response = instance.UpdateBotSession(request, context)
 | 
| 103 | 109 |  | 
| 104 | -    assert isinstance(response, bots_pb2.BotSession)
 | |
| 105 | 110 |      context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 106 | 111 |  | 
| 107 | 112 |  def test_update_bot_session_bot_id_fail(bot_session, context, instance):
 | 
| ... | ... | @@ -113,35 +118,33 @@ def test_update_bot_session_bot_id_fail(bot_session, context, instance): | 
| 113 | 118 |  | 
| 114 | 119 |      context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 115 | 120 |  | 
| 116 | -@pytest.mark.parametrize("number_of_leases", [1, 3, 500])
 | |
| 117 | -def test_update_leases(number_of_leases, bot_session, context, instance):
 | |
| 118 | -    leases = [bots_pb2.Lease() for x in range(number_of_leases)]
 | |
| 119 | -    bot_session.leases.extend(leases)
 | |
| 121 | +@pytest.mark.parametrize("number_of_jobs", [0, 1, 3, 500])
 | |
| 122 | +def test_number_of_leases(number_of_jobs, bot_session, context, instance):
 | |
| 120 | 123 |      request = bots_pb2.CreateBotSessionRequest(parent='',
 | 
| 121 | 124 |                                                 bot_session=bot_session)
 | 
| 125 | +    # Inject work
 | |
| 126 | +    for n in range(0, number_of_jobs):
 | |
| 127 | +        action_digest = remote_execution_pb2.Digest()
 | |
| 128 | +        instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 122 | 129 |      # Simulated the severed binding between client and server
 | 
| 123 | 130 |      bot = copy.deepcopy(instance.CreateBotSession(request, context))
 | 
| 124 | 131 |  | 
| 132 | +    # Creation of bot session should not create leases
 | |
| 133 | +    assert len(bot.leases) == 0
 | |
| 134 | + | |
| 125 | 135 |      request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
 | 
| 126 | 136 |                                                 bot_session=bot)
 | 
| 127 | 137 |  | 
| 128 | 138 |      response = instance.UpdateBotSession(request, context)
 | 
| 129 | 139 |  | 
| 130 | -    assert isinstance(response, bots_pb2.BotSession)
 | |
| 131 | -    assert len(response.leases) == len(bot.leases)
 | |
| 132 | -    assert bot == response
 | |
| 140 | +    assert len(response.leases) == number_of_jobs
 | |
| 133 | 141 |  | 
| 134 | 142 |  def test_update_leases_with_work(bot_session, context, instance):
 | 
| 135 | -    leases = [bots_pb2.Lease() for x in range(2)]
 | |
| 136 | -    bot_session.leases.extend(leases)
 | |
| 137 | - | |
| 138 | -    # Inject some work to be done
 | |
| 139 | -    action = remote_execution_pb2.Action()
 | |
| 140 | -    action.command_digest.hash = 'rick'
 | |
| 141 | -    instance._instance._scheduler.append_job(job.Job(action))
 | |
| 142 | - | |
| 143 | 143 |      request = bots_pb2.CreateBotSessionRequest(parent='',
 | 
| 144 | 144 |                                                 bot_session=bot_session)
 | 
| 145 | +    # Inject work
 | |
| 146 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 147 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 145 | 148 |      # Simulated the severed binding between client and server
 | 
| 146 | 149 |      bot = copy.deepcopy(instance.CreateBotSession(request, context))
 | 
| 147 | 150 |  | 
| ... | ... | @@ -149,26 +152,50 @@ def test_update_leases_with_work(bot_session, context, instance): | 
| 149 | 152 |                                                 bot_session=bot)
 | 
| 150 | 153 |  | 
| 151 | 154 |      response = instance.UpdateBotSession(request, context)
 | 
| 152 | -    response_action = remote_execution_pb2.Action()
 | |
| 153 | -    _unpack_any(response.leases[0].payload, response_action)
 | |
| 154 | 155 |  | 
| 156 | +    response_action = remote_execution_pb2.Digest()
 | |
| 157 | +    response.leases[0].payload.Unpack(response_action)
 | |
| 155 | 158 |      assert isinstance(response, bots_pb2.BotSession)
 | 
| 156 | 159 |      assert response.leases[0].state == LeaseState.PENDING.value
 | 
| 157 | -    assert response.leases[1].state == LeaseState.LEASE_STATE_UNSPECIFIED.value
 | |
| 158 | 160 |      assert uuid.UUID(response.leases[0].id, version=4)
 | 
| 159 | -    assert response_action == action
 | |
| 161 | +    assert response_action == action_digest
 | |
| 160 | 162 |  | 
| 161 | 163 |  def test_update_leases_work_complete(bot_session, context, instance):
 | 
| 162 | -    leases = [bots_pb2.Lease() for x in range(2)]
 | |
| 163 | -    bot_session.leases.extend(leases)
 | |
| 164 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 165 | +                                               bot_session=bot_session)
 | |
| 166 | +    # Inject work
 | |
| 167 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 168 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 169 | +    # Simulated the severed binding between client and server
 | |
| 170 | +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
 | |
| 171 | + | |
| 172 | +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
 | |
| 173 | +                                               bot_session=bot)
 | |
| 174 | + | |
| 175 | +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | |
| 176 | + | |
| 177 | +    response.leases[0].state = LeaseState.ACTIVE.value
 | |
| 178 | + | |
| 179 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 180 | +                                               bot_session=response)
 | |
| 181 | + | |
| 182 | +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | |
| 183 | + | |
| 184 | +    response.leases[0].state = LeaseState.COMPLETED.value
 | |
| 185 | + | |
| 186 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 187 | +                                               bot_session=response)
 | |
| 164 | 188 |  | 
| 165 | -    # Inject some work to be done
 | |
| 166 | -    action = remote_execution_pb2.Action()
 | |
| 167 | -    action.command_digest.hash = 'rick'
 | |
| 168 | -    instance._instance._scheduler.append_job(job.Job(action))
 | |
| 189 | +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | |
| 190 | + | |
| 191 | +    assert len(response.leases) == 0
 | |
| 169 | 192 |  | 
| 193 | +def test_work_rejected_by_bot(bot_session, context, instance):
 | |
| 170 | 194 |      request = bots_pb2.CreateBotSessionRequest(parent='',
 | 
| 171 | 195 |                                                 bot_session=bot_session)
 | 
| 196 | +    # Inject work
 | |
| 197 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 198 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 172 | 199 |      # Simulated the severed binding between client and server
 | 
| 173 | 200 |      bot = copy.deepcopy(instance.CreateBotSession(request, context))
 | 
| 174 | 201 |  | 
| ... | ... | @@ -177,26 +204,125 @@ def test_update_leases_work_complete(bot_session, context, instance): | 
| 177 | 204 |  | 
| 178 | 205 |      response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | 
| 179 | 206 |  | 
| 180 | -    operation_name = response.leases[0].id
 | |
| 207 | +    response.leases[0].state = LeaseState.COMPLETED.value
 | |
| 208 | + | |
| 209 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 210 | +                                               bot_session=response)
 | |
| 211 | + | |
| 212 | +    response = instance.UpdateBotSession(request, context)
 | |
| 213 | + | |
| 214 | +    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | |
| 215 | + | |
| 216 | +def test_work_rejected_by_bot(bot_session, context, instance):
 | |
| 217 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 218 | +                                               bot_session=bot_session)
 | |
| 219 | +    # Inject work
 | |
| 220 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 221 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 222 | +    # Simulated the severed binding between client and server
 | |
| 223 | +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
 | |
| 224 | + | |
| 225 | +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
 | |
| 226 | +                                               bot_session=bot)
 | |
| 227 | + | |
| 228 | +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | |
| 181 | 229 |  | 
| 182 | -    assert response.leases[0].state == LeaseState.PENDING.value
 | |
| 183 | 230 |      response.leases[0].state = LeaseState.COMPLETED.value
 | 
| 184 | 231 |  | 
| 185 | 232 |      request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | 
| 186 | 233 |                                                 bot_session=response)
 | 
| 234 | + | |
| 235 | +    response = instance.UpdateBotSession(request, context)
 | |
| 236 | + | |
| 237 | +    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | |
| 238 | + | |
| 239 | +@pytest.mark.parametrize("state", [ LeaseState.LEASE_STATE_UNSPECIFIED, LeaseState.PENDING])
 | |
| 240 | +def test_work_out_of_sync_from_pending(state, bot_session, context, instance):
 | |
| 241 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 242 | +                                               bot_session=bot_session)
 | |
| 243 | +    # Inject work
 | |
| 244 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 245 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 246 | +    # Simulated the severed binding between client and server
 | |
| 247 | +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
 | |
| 248 | + | |
| 249 | +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
 | |
| 250 | +                                               bot_session=bot)
 | |
| 251 | + | |
| 252 | +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | |
| 253 | + | |
| 254 | +    response.leases[0].state = state.value
 | |
| 255 | + | |
| 256 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 257 | +                                               bot_session=response)
 | |
| 258 | + | |
| 259 | +    response = instance.UpdateBotSession(request, context)
 | |
| 260 | + | |
| 261 | +    context.set_code.assert_called_once_with(grpc.StatusCode.DATA_LOSS)
 | |
| 262 | + | |
| 263 | +@pytest.mark.parametrize("state", [ LeaseState.LEASE_STATE_UNSPECIFIED, LeaseState.PENDING])
 | |
| 264 | +def test_work_out_of_sync_from_active(state, bot_session, context, instance):
 | |
| 265 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 266 | +                                               bot_session=bot_session)
 | |
| 267 | +    # Inject work
 | |
| 268 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 269 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 187 | 270 |      # Simulated the severed binding between client and server
 | 
| 271 | +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
 | |
| 272 | + | |
| 273 | +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
 | |
| 274 | +                                               bot_session=bot)
 | |
| 275 | + | |
| 188 | 276 |      response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | 
| 189 | -    assert isinstance(response, bots_pb2.BotSession)
 | |
| 190 | -    assert instance._instance._scheduler.jobs[operation_name]._execute_stage == ExecuteStage.COMPLETED
 | |
| 277 | + | |
| 278 | +    response.leases[0].state = LeaseState.ACTIVE.value
 | |
| 279 | + | |
| 280 | +    request = copy.deepcopy(bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 281 | +                                                             bot_session=response))
 | |
| 282 | + | |
| 283 | +    response = instance.UpdateBotSession(request, context)
 | |
| 284 | + | |
| 285 | +    response.leases[0].state = state.value
 | |
| 286 | + | |
| 287 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 288 | +                                               bot_session=response)
 | |
| 289 | + | |
| 290 | +    response = instance.UpdateBotSession(request, context)
 | |
| 291 | + | |
| 292 | +    context.set_code.assert_called_once_with(grpc.StatusCode.DATA_LOSS)
 | |
| 293 | + | |
| 294 | +def test_work_active_to_active(bot_session, context, instance):
 | |
| 295 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 296 | +                                               bot_session=bot_session)
 | |
| 297 | +    # Inject work
 | |
| 298 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 299 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 300 | +    # Simulated the severed binding between client and server
 | |
| 301 | +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
 | |
| 302 | + | |
| 303 | +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
 | |
| 304 | +                                               bot_session=bot)
 | |
| 305 | + | |
| 306 | +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | |
| 307 | + | |
| 308 | +    response.leases[0].state = LeaseState.ACTIVE.value
 | |
| 309 | + | |
| 310 | +    request = copy.deepcopy(bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 311 | +                                                             bot_session=response))
 | |
| 312 | + | |
| 313 | +    response = instance.UpdateBotSession(request, context)
 | |
| 314 | + | |
| 315 | +    response.leases[0].state = LeaseState.ACTIVE.value
 | |
| 316 | + | |
| 317 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 318 | +                                               bot_session=response)
 | |
| 319 | + | |
| 320 | +    response = instance.UpdateBotSession(request, context)
 | |
| 321 | + | |
| 322 | +    assert response.leases[0].state == LeaseState.ACTIVE.value
 | |
| 191 | 323 |  | 
| 192 | 324 |  def test_post_bot_event_temp(context, instance):
 | 
| 193 | 325 |      request = bots_pb2.PostBotEventTempRequest()
 | 
| 194 | 326 |      instance.PostBotEventTemp(request, context)
 | 
| 195 | 327 |  | 
| 196 | 328 |      context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED) | 
| 197 | - | |
| 198 | -def _unpack_any(unpack_from, to):
 | |
| 199 | -    any = any_pb2.Any()
 | |
| 200 | -    any.CopyFrom(unpack_from)
 | |
| 201 | -    any.Unpack(to)
 | |
| 202 | -    return to | 
