finnball pushed to branch bloomberg/buildgrid-csande/action-cache at BuildGrid / buildgrid
Commits:
- 
9bd7e89c
by finn at 2018-08-08T14:25:29Z
- 
bb41de38
by finn at 2018-08-08T15:03:34Z
- 
9b618c1c
by finn at 2018-08-08T15:04:14Z
- 
8a839609
by finn at 2018-08-08T15:05:48Z
- 
f1484df3
by Carter Sande at 2018-08-08T16:29:07Z
- 
072b3397
by finn at 2018-08-08T16:29:07Z
19 changed files:
- app/commands/cmd_bot.py
- app/commands/cmd_server.py
- buildgrid/bot/bot.py
- buildgrid/bot/bot_interface.py
- + buildgrid/bot/bot_session.py
- + buildgrid/server/action_cache.py
- buildgrid/server/build_grid_server.py
- buildgrid/server/cas/storage/storage_abc.py
- buildgrid/server/execution/action_cache_service.py
- buildgrid/server/execution/execution_instance.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- buildgrid/server/worker/bots_interface.py
- buildgrid/server/worker/bots_service.py
- + tests/action_cache.py
- tests/integration/action_cache_service.py
- + tests/integration/bot_session.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
Changes:
| ... | ... | @@ -33,7 +33,8 @@ import tempfile | 
| 33 | 33 |  | 
| 34 | 34 |  from pathlib import Path, PurePath
 | 
| 35 | 35 |  | 
| 36 | -from buildgrid.bot import bot
 | |
| 36 | +from buildgrid.bot import bot, bot_interface
 | |
| 37 | +from buildgrid.bot.bot_session import BotSession, Device, Worker
 | |
| 37 | 38 |  from buildgrid._exceptions import BotError
 | 
| 38 | 39 |  | 
| 39 | 40 |  from ..cli import pass_context
 | 
| ... | ... | @@ -44,17 +45,23 @@ from google.protobuf import any_pb2 | 
| 44 | 45 |  | 
| 45 | 46 |  @click.group(short_help = 'Create a bot client')
 | 
| 46 | 47 |  @click.option('--parent', default='bgd_test')
 | 
| 47 | -@click.option('--number-of-leases', default=1)
 | |
| 48 | 48 |  @click.option('--port', default='50051')
 | 
| 49 | 49 |  @click.option('--host', default='localhost')
 | 
| 50 | 50 |  @pass_context
 | 
| 51 | -def cli(context, host, port, number_of_leases, parent):
 | |
| 51 | +def cli(context, host, port, parent):
 | |
| 52 | +    channel = grpc.insecure_channel('{}:{}'.format(host, port))
 | |
| 53 | +    interface = bot_interface.BotInterface(channel)
 | |
| 54 | + | |
| 52 | 55 |      context.logger = logging.getLogger(__name__)
 | 
| 53 | 56 |      context.logger.info("Starting on port {}".format(port))
 | 
| 54 | 57 |  | 
| 55 | -    context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
 | |
| 56 | -    context.number_of_leases = number_of_leases
 | |
| 57 | -    context.parent = parent
 | |
| 58 | +    worker = Worker()
 | |
| 59 | +    worker.add_device(Device())
 | |
| 60 | + | |
| 61 | +    bot_session = BotSession(parent, interface)
 | |
| 62 | +    bot_session.add_worker(worker)
 | |
| 63 | + | |
| 64 | +    context.bot_session = bot_session
 | |
| 58 | 65 |  | 
| 59 | 66 |  @cli.command('dummy', short_help='Create a dummy bot session')
 | 
| 60 | 67 |  @pass_context
 | 
| ... | ... | @@ -63,15 +70,10 @@ def dummy(context): | 
| 63 | 70 |      Simple dummy client. Creates a session, accepts leases, does fake work and
 | 
| 64 | 71 |      updates the server.
 | 
| 65 | 72 |      """
 | 
| 66 | - | |
| 67 | -    context.logger.info("Creating a bot session")
 | |
| 68 | - | |
| 69 | 73 |      try:
 | 
| 70 | -        bot.Bot(work=_work_dummy,
 | |
| 71 | -                context=context,
 | |
| 72 | -                channel=context.channel,
 | |
| 73 | -                parent=context.parent,
 | |
| 74 | -                number_of_leases=context.number_of_leases)
 | |
| 74 | +        b = bot.Bot(context.bot_session)
 | |
| 75 | +        b.session(_work_dummy,
 | |
| 76 | +                  context)
 | |
| 75 | 77 |  | 
| 76 | 78 |      except KeyboardInterrupt:
 | 
| 77 | 79 |          pass
 | 
| ... | ... | @@ -85,7 +87,7 @@ def dummy(context): | 
| 85 | 87 |  @click.option('--port', show_default = True, default=11001)
 | 
| 86 | 88 |  @click.option('--remote', show_default = True, default='localhost')
 | 
| 87 | 89 |  @pass_context
 | 
| 88 | -def _work_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
 | |
| 90 | +def work_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
 | |
| 89 | 91 |      """
 | 
| 90 | 92 |      Uses BuildBox to run commands.
 | 
| 91 | 93 |      """
 | 
| ... | ... | @@ -101,11 +103,14 @@ def _work_buildbox(context, remote, port, server_cert, client_key, client_cert, | 
| 101 | 103 |      context.fuse_dir = fuse_dir
 | 
| 102 | 104 |  | 
| 103 | 105 |      try:
 | 
| 104 | -        bot.Bot(work=_work_buildbox,
 | |
| 105 | -                context=context,
 | |
| 106 | -                channel=context.channel,
 | |
| 107 | -                parent=context.parent,
 | |
| 108 | -                number_of_leases=context.number_of_leases)
 | |
| 106 | +        b = bot.Bot(work=_work_buildbox,
 | |
| 107 | +                    bot_session=context.bot_session,
 | |
| 108 | +                    channel=context.channel,
 | |
| 109 | +                    parent=context.parent)
 | |
| 110 | + | |
| 111 | +        b.session(context.parent,
 | |
| 112 | +                  _work_buildbox,
 | |
| 113 | +                  context)
 | |
| 109 | 114 |  | 
| 110 | 115 |      except KeyboardInterrupt:
 | 
| 111 | 116 |          pass
 | 
| ... | ... | @@ -28,6 +28,7 @@ import click | 
| 28 | 28 |  import logging
 | 
| 29 | 29 |  | 
| 30 | 30 |  from buildgrid.server import build_grid_server
 | 
| 31 | +from buildgrid.server.action_cache import ActionCache
 | |
| 31 | 32 |  from buildgrid.server.cas.storage.disk import DiskStorage
 | 
| 32 | 33 |  from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
 | 
| 33 | 34 |  from buildgrid.server.cas.storage.s3 import S3Storage
 | 
| ... | ... | @@ -43,6 +44,11 @@ def cli(context): | 
| 43 | 44 |  | 
| 44 | 45 |  @cli.command('start', short_help='Starts server')
 | 
| 45 | 46 |  @click.option('--port', default='50051')
 | 
| 47 | +@click.option('--max-cached-actions', type=int, default=50,
 | |
| 48 | +              help='Maximum number of actions to keep in the ActionCache.')
 | |
| 49 | +@click.option('--allow-update-action-result/--forbid-update-action-result',
 | |
| 50 | +              'allow_uar', default=True,
 | |
| 51 | +              help='Whether or not to allow clients to manually edit the action cache.')
 | |
| 46 | 52 |  @click.option('--cas',
 | 
| 47 | 53 |                type=click.Choice(('lru', 's3', 'disk', 'with-cache')),
 | 
| 48 | 54 |                help='CAS storage type to use.')
 | 
| ... | ... | @@ -59,15 +65,22 @@ def cli(context): | 
| 59 | 65 |                type=click.Path(file_okay=False, dir_okay=True, writable=True),
 | 
| 60 | 66 |                help='For --cas=disk, the folder to store CAS blobs in.')
 | 
| 61 | 67 |  @pass_context
 | 
| 62 | -def start(context, port, cas, **cas_args):
 | |
| 68 | +def start(context, port, max_cached_actions, allow_uar, cas, **cas_args):
 | |
| 63 | 69 |      context.logger.info("Starting on port {}".format(port))
 | 
| 64 | 70 |  | 
| 65 | 71 |      loop = asyncio.get_event_loop()
 | 
| 66 | 72 |  | 
| 67 | 73 |      cas_storage = _make_cas_storage(context, cas, cas_args)
 | 
| 68 | 74 |      if cas_storage is None:
 | 
| 69 | -        context.logger.info("Running without CAS")
 | |
| 70 | -    server = build_grid_server.BuildGridServer(port, cas_storage=cas_storage)
 | |
| 75 | +        context.logger.info("Running without CAS - action cache will be unavailable")
 | |
| 76 | +        action_cache = None
 | |
| 77 | +    else:
 | |
| 78 | +        action_cache = ActionCache(cas_storage, max_cached_actions)
 | |
| 79 | + | |
| 80 | +    server = build_grid_server.BuildGridServer(port,
 | |
| 81 | +                                               cas_storage=cas_storage,
 | |
| 82 | +                                               action_cache=action_cache,
 | |
| 83 | +                                               allow_update_action_result=allow_uar)
 | |
| 71 | 84 |  | 
| 72 | 85 |      try:
 | 
| 73 | 86 |          asyncio.ensure_future(server.start())
 | 
| ... | ... | @@ -23,163 +23,45 @@ Creates a bot session. | 
| 23 | 23 |  """
 | 
| 24 | 24 |  | 
| 25 | 25 |  import asyncio
 | 
| 26 | -import inspect
 | |
| 26 | +import collections
 | |
| 27 | 27 |  import logging
 | 
| 28 | -import platform
 | |
| 29 | -import queue
 | |
| 30 | 28 |  import time
 | 
| 31 | 29 |  | 
| 32 | -from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
 | |
| 33 | - | |
| 34 | -from . import bot_interface
 | |
| 30 | +from . import bot_interface, bot_session
 | |
| 31 | +from .bot_session import BotStatus, LeaseState
 | |
| 35 | 32 |  from .._exceptions import BotError
 | 
| 36 | 33 |  | 
| 37 | -class Bot(object):
 | |
| 34 | +class Bot:
 | |
| 38 | 35 |      """
 | 
| 39 | 36 |      Creates a local BotSession.
 | 
| 40 | 37 |      """
 | 
| 41 | 38 |  | 
| 42 | -    def __init__(self, work, context, channel, parent, number_of_leases):
 | |
| 43 | -        if not inspect.iscoroutinefunction(work):
 | |
| 44 | -            raise BotError("work function must be async")
 | |
| 45 | - | |
| 46 | -        self.interface = bot_interface.BotInterface(channel)
 | |
| 39 | +    def __init__(self, bot_session, update_period=1):
 | |
| 47 | 40 |          self.logger = logging.getLogger(__name__)
 | 
| 48 | 41 |  | 
| 49 | -        self._create_session(parent, number_of_leases)
 | |
| 50 | -        self._work_queue = queue.Queue(maxsize = number_of_leases)
 | |
| 51 | - | |
| 52 | -        try:
 | |
| 53 | -            while True:
 | |
| 54 | -                ## TODO: Leases independently finish
 | |
| 55 | -                ## Allow leases to queue finished work independently instead
 | |
| 56 | -                ## of waiting for all to finish
 | |
| 57 | -                futures = [self._do_work(work, context, lease) for lease in self._get_work()]
 | |
| 58 | -                if futures:
 | |
| 59 | -                    loop = asyncio.new_event_loop()
 | |
| 60 | -                    leases_complete, _ = loop.run_until_complete(asyncio.wait(futures))
 | |
| 61 | -                    work_complete = [(lease.result().id, lease.result(),) for lease in leases_complete]
 | |
| 62 | -                    self._work_complete(work_complete)
 | |
| 63 | -                    loop.close()
 | |
| 64 | -                self._update_bot_session()
 | |
| 65 | -                time.sleep(2)
 | |
| 66 | -        except Exception as e:
 | |
| 67 | -            self.logger.error(e)
 | |
| 68 | -            raise BotError(e)
 | |
| 69 | - | |
| 70 | -    @property
 | |
| 71 | -    def bot_session(self):
 | |
| 72 | -        ## Read only, shouldn't have to set any of the variables in here
 | |
| 73 | -        return self._bot_session
 | |
| 74 | - | |
| 75 | -    def close_session(self):
 | |
| 76 | -        self.logger.warning("Session closing not yet implemented")
 | |
| 77 | - | |
| 78 | -    async def _do_work(self, work, context, lease):
 | |
| 79 | -        """ Work is done here, work function should be asynchronous
 | |
| 80 | -        """
 | |
| 81 | -        self.logger.info("Work found: {}".format(lease.id))
 | |
| 82 | -        lease = await work(context=context, lease=lease)
 | |
| 83 | -        lease.state = bots_pb2.LeaseState.Value('COMPLETED')
 | |
| 84 | -        self.logger.info("Work complete: {}".format(lease.id))
 | |
| 85 | -        return lease
 | |
| 86 | - | |
| 87 | -    def _update_bot_session(self):
 | |
| 88 | -        """ Should call the server periodically to inform the server the client
 | |
| 89 | -        has not died.
 | |
| 90 | -        """
 | |
| 91 | -        self.logger.debug("Updating bot session")
 | |
| 92 | -        self._bot_session = self.interface.update_bot_session(self._bot_session)
 | |
| 93 | -        leases_update = ([self._update_lease(lease) for lease in self._bot_session.leases])
 | |
| 94 | -        del self._bot_session.leases[:]
 | |
| 95 | -        self._bot_session.leases.extend(leases_update)
 | |
| 96 | - | |
| 97 | -    def _get_work(self):
 | |
| 98 | -        while not self._work_queue.empty():
 | |
| 99 | -            yield self._work_queue.get()
 | |
| 100 | -            self._work_queue.task_done()
 | |
| 101 | - | |
| 102 | -    def _work_complete(self, leases_complete):
 | |
| 103 | -        """ Bot updates itself with any completed work.
 | |
| 104 | -        """
 | |
| 105 | -        # Should really improve this...
 | |
| 106 | -        # Maybe add some call back function sentoff work...
 | |
| 107 | -        leases_active = list(filter(self._lease_active, self._bot_session.leases))
 | |
| 108 | -        leases_not_active = [lease for lease in self._bot_session.leases if not self._lease_active(lease)]
 | |
| 109 | -        del self._bot_session.leases[:]
 | |
| 110 | -        for lease in leases_active:
 | |
| 111 | -            for lease_tuple in leases_complete:
 | |
| 112 | -                if lease.id == lease_tuple[0]:
 | |
| 113 | -                    leases_not_active.extend([lease_tuple[1]])
 | |
| 114 | -        self._bot_session.leases.extend(leases_not_active)
 | |
| 115 | - | |
| 116 | -    def _update_lease(self, lease):
 | |
| 117 | -        """
 | |
| 118 | -        State machine for any recieved updates to the leases.
 | |
| 119 | -        """
 | |
| 120 | -        if self._lease_pending(lease):
 | |
| 121 | -            lease.state = bots_pb2.LeaseState.Value('ACTIVE')
 | |
| 122 | -            self._work_queue.put(lease)
 | |
| 123 | -            return lease
 | |
| 124 | - | |
| 125 | -        else:
 | |
| 126 | -            return lease
 | |
| 127 | - | |
| 128 | -    def _create_session(self, parent, number_of_leases):
 | |
| 129 | -        self.logger.debug("Creating bot session")
 | |
| 130 | -        worker = self._create_worker()
 | |
| 42 | +        self._bot_session = bot_session
 | |
| 43 | +        self._update_period = update_period
 | |
| 131 | 44 |  | 
| 132 | -        """ Unique bot ID within the farm used to identify this bot
 | |
| 133 | -        Needs to be human readable.
 | |
| 134 | -        All prior sessions with bot_id of same ID are invalidated.
 | |
| 135 | -        If a bot attempts to update an invalid session, it must be rejected and
 | |
| 136 | -        may be put in quarantine.
 | |
| 137 | -        """
 | |
| 138 | -        bot_id = '{}.{}'.format(parent, platform.node())
 | |
| 45 | +    def session(self, work, context):
 | |
| 46 | +        loop = asyncio.get_event_loop()
 | |
| 139 | 47 |  | 
| 140 | -        leases = [bots_pb2.Lease() for x in range(number_of_leases)]
 | |
| 48 | +        self._bot_session.create_bot_session(work, context)
 | |
| 141 | 49 |  | 
| 142 | -        bot_session = bots_pb2.BotSession(worker = worker,
 | |
| 143 | -                                          status = bots_pb2.BotStatus.Value('OK'),
 | |
| 144 | -                                          leases = leases,
 | |
| 145 | -                                          bot_id = bot_id)
 | |
| 146 | -        self._bot_session = self.interface.create_bot_session(parent, bot_session)
 | |
| 147 | -        self.logger.info("Name: {}, Id: {}".format(self._bot_session.name,
 | |
| 148 | -                                                      self._bot_session.bot_id))
 | |
| 149 | - | |
| 150 | -    def _create_worker(self):
 | |
| 151 | -        devices = self._create_devices()
 | |
| 152 | - | |
| 153 | -        # Contains a list of devices and the connections between them.
 | |
| 154 | -        worker = worker_pb2.Worker(devices = devices)
 | |
| 155 | - | |
| 156 | -        """ Keys supported:
 | |
| 157 | -        *pool
 | |
| 158 | -        """
 | |
| 159 | -        worker.Property.key = "pool"
 | |
| 160 | -        worker.Property.value = "all"
 | |
| 161 | - | |
| 162 | -        return worker
 | |
| 163 | - | |
| 164 | -    def _create_devices(self):
 | |
| 165 | -        """ Creates devices available to the worker
 | |
| 166 | -        The first device is know as the Primary Device - the revice which
 | |
| 167 | -        is running a bit and responsible to actually executing commands.
 | |
| 168 | -        All other devices are known as Attatched Devices and must be controlled
 | |
| 169 | -        by the Primary Device.
 | |
| 170 | -        """
 | |
| 171 | - | |
| 172 | -        devices = []
 | |
| 173 | - | |
| 174 | -        for i in range(0, 1): # Append one device for now
 | |
| 175 | -            dev = worker_pb2.Device()
 | |
| 176 | - | |
| 177 | -            devices.append(dev)
 | |
| 178 | - | |
| 179 | -        return devices
 | |
| 180 | - | |
| 181 | -    def _lease_pending(self, lease):
 | |
| 182 | -        return lease.state == bots_pb2.LeaseState.Value('PENDING')
 | |
| 183 | - | |
| 184 | -    def _lease_active(self, lease):
 | |
| 185 | -        return lease.state == bots_pb2.LeaseState.Value('ACTIVE') | |
| 50 | +        try:
 | |
| 51 | +            task = asyncio.ensure_future(self._update_bot_session())
 | |
| 52 | +            loop.run_forever()
 | |
| 53 | + | |
| 54 | +        except KeyboardInterrupt:
 | |
| 55 | +            pass
 | |
| 56 | + | |
| 57 | +        finally:
 | |
| 58 | +            task.cancel()
 | |
| 59 | +            loop.close()
 | |
| 60 | + | |
| 61 | +    async def _update_bot_session(self):
 | |
| 62 | +        while True:
 | |
| 63 | +            """ Calls the server periodically to inform the server the client
 | |
| 64 | +            has not died.
 | |
| 65 | +            """
 | |
| 66 | +            self._bot_session.update_bot_session()
 | |
| 67 | +            await asyncio.sleep(self._update_period) | 
| ... | ... | @@ -29,7 +29,7 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bo | 
| 29 | 29 |  | 
| 30 | 30 |  from .._exceptions import BotError
 | 
| 31 | 31 |  | 
| 32 | -class BotInterface(object):
 | |
| 32 | +class BotInterface:
 | |
| 33 | 33 |      """ Interface handles calls to the server.
 | 
| 34 | 34 |      """
 | 
| 35 | 35 |  | 
| ... | ... | @@ -39,22 +39,12 @@ class BotInterface(object): | 
| 39 | 39 |          self._stub = bots_pb2_grpc.BotsStub(channel)
 | 
| 40 | 40 |  | 
| 41 | 41 |      def create_bot_session(self, parent, bot_session):
 | 
| 42 | -        try:
 | |
| 43 | -            request = bots_pb2.CreateBotSessionRequest(parent = parent,
 | |
| 44 | -                                                       bot_session = bot_session)
 | |
| 45 | -            return self._stub.CreateBotSession(request)
 | |
| 46 | - | |
| 47 | -        except Exception as e:
 | |
| 48 | -            self.logger.error(e)
 | |
| 49 | -            raise BotError(e)
 | |
| 42 | +        request = bots_pb2.CreateBotSessionRequest(parent = parent,
 | |
| 43 | +                                                   bot_session = bot_session)
 | |
| 44 | +        return self._stub.CreateBotSession(request)
 | |
| 50 | 45 |  | 
| 51 | 46 |      def update_bot_session(self, bot_session, update_mask = None):
 | 
| 52 | -        try:
 | |
| 53 | -            request = bots_pb2.UpdateBotSessionRequest(name = bot_session.name,
 | |
| 54 | -                                                       bot_session = bot_session,
 | |
| 55 | -                                                       update_mask = update_mask)
 | |
| 56 | -            return self._stub.UpdateBotSession(request)
 | |
| 57 | - | |
| 58 | -        except Exception as e:
 | |
| 59 | -            self.logger.error(e)
 | |
| 60 | -            raise BotError(e) | |
| 47 | +        request = bots_pb2.UpdateBotSessionRequest(name = bot_session.name,
 | |
| 48 | +                                                   bot_session = bot_session,
 | |
| 49 | +                                                   update_mask = update_mask)
 | |
| 50 | +        return self._stub.UpdateBotSession(request) | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | +"""
 | |
| 16 | +Bot Session
 | |
| 17 | +====
 | |
| 18 | + | |
| 19 | +Allows connections
 | |
| 20 | +"""
 | |
| 21 | +import asyncio
 | |
| 22 | +import logging
 | |
| 23 | +import platform
 | |
| 24 | +import uuid
 | |
| 25 | + | |
| 26 | +from enum import Enum
 | |
| 27 | + | |
| 28 | +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
 | |
| 29 | + | |
| 30 | +class BotStatus(Enum):
 | |
| 31 | +    BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
 | |
| 32 | +    OK                     = bots_pb2.BotStatus.Value('OK')
 | |
| 33 | +    UNHEALTHY              = bots_pb2.BotStatus.Value('UNHEALTHY');
 | |
| 34 | +    HOST_REBOOTING         = bots_pb2.BotStatus.Value('HOST_REBOOTING')
 | |
| 35 | +    BOT_TERMINATING        = bots_pb2.BotStatus.Value('BOT_TERMINATING')
 | |
| 36 | + | |
| 37 | +class LeaseState(Enum):
 | |
| 38 | +    LEASE_STATE_UNSPECIFIED = bots_pb2.LeaseState.Value('LEASE_STATE_UNSPECIFIED')
 | |
| 39 | +    PENDING                 = bots_pb2.LeaseState.Value('PENDING')
 | |
| 40 | +    ACTIVE                  = bots_pb2.LeaseState.Value('ACTIVE')
 | |
| 41 | +    COMPLETED               = bots_pb2.LeaseState.Value('COMPLETED')
 | |
| 42 | +    CANCELLED               = bots_pb2.LeaseState.Value('CANCELLED')
 | |
| 43 | + | |
| 44 | + | |
| 45 | +class BotSession:
 | |
| 46 | +    def __init__(self, parent, interface):
 | |
| 47 | +        """ Unique bot ID within the farm used to identify this bot
 | |
| 48 | +        Needs to be human readable.
 | |
| 49 | +        All prior sessions with bot_id of same ID are invalidated.
 | |
| 50 | +        If a bot attempts to update an invalid session, it must be rejected and
 | |
| 51 | +        may be put in quarantine.
 | |
| 52 | +        """
 | |
| 53 | + | |
| 54 | +        self.logger = logging.getLogger(__name__)
 | |
| 55 | + | |
| 56 | +        self._bot_id = '{}.{}'.format(parent, platform.node())
 | |
| 57 | +        self._interface = interface
 | |
| 58 | +        self._leases = {}
 | |
| 59 | +        self._name = None
 | |
| 60 | +        self._parent = parent
 | |
| 61 | +        self._status = BotStatus.OK.value
 | |
| 62 | +        self._work = None
 | |
| 63 | +        self._worker = None
 | |
| 64 | + | |
| 65 | +    @property
 | |
| 66 | +    def bot_id(self):
 | |
| 67 | +        return self._bot_id
 | |
| 68 | + | |
| 69 | +    def add_worker(self, worker):
 | |
| 70 | +        self._worker = worker
 | |
| 71 | + | |
| 72 | +    def create_bot_session(self, work, context=None):
 | |
| 73 | +        self.logger.debug("Creating bot session")
 | |
| 74 | +        self._work = work
 | |
| 75 | +        self._context = context
 | |
| 76 | + | |
| 77 | +        session = self._interface.create_bot_session(self._parent, self.get_pb2())
 | |
| 78 | +        self._name = session.name
 | |
| 79 | + | |
| 80 | +        self.logger.info("Created bot session with name: {}".format(self._name))
 | |
| 81 | + | |
| 82 | +        for lease in session.leases:
 | |
| 83 | +            self._update_lease_from_server(lease)
 | |
| 84 | + | |
| 85 | +    def update_bot_session(self):
 | |
| 86 | +        session = self._interface.update_bot_session(self.get_pb2())
 | |
| 87 | +        for lease in session.leases:
 | |
| 88 | +            self._update_lease_from_server(lease)
 | |
| 89 | + | |
| 90 | +        for k, v in self._leases.items():
 | |
| 91 | +            if v.state == LeaseState.COMPLETED.value:
 | |
| 92 | +                del self._leases[k]
 | |
| 93 | + | |
| 94 | +    def get_pb2(self):
 | |
| 95 | +        leases = list(self._leases.values())
 | |
| 96 | +        if not leases:
 | |
| 97 | +            leases = None
 | |
| 98 | + | |
| 99 | +        return bots_pb2.BotSession(worker=self._worker.get_pb2(),
 | |
| 100 | +                                   status=self._status,
 | |
| 101 | +                                   leases=leases,
 | |
| 102 | +                                   bot_id=self._bot_id,
 | |
| 103 | +                                   name = self._name)
 | |
| 104 | + | |
| 105 | +    def lease_completed(self, lease):
 | |
| 106 | +        lease.state = LeaseState.COMPLETED.value
 | |
| 107 | +        self._leases[lease.id] = lease
 | |
| 108 | + | |
| 109 | +    def _update_lease_from_server(self, lease):
 | |
| 110 | +        """
 | |
| 111 | +        State machine for any recieved updates to the leases.
 | |
| 112 | +        """
 | |
| 113 | +        ## TODO: Compare with previous state of lease
 | |
| 114 | +        lease_bot = self._leases.get(lease.id)
 | |
| 115 | +        if lease.state == LeaseState.PENDING.value:
 | |
| 116 | +            lease.state = LeaseState.ACTIVE.value
 | |
| 117 | +            asyncio.ensure_future(self.create_work(lease))
 | |
| 118 | +            self._leases[lease.id] = lease
 | |
| 119 | + | |
| 120 | +    async def create_work(self, lease):
 | |
| 121 | +        self.logger.debug("Work created: {}".format(lease.id))
 | |
| 122 | +        lease = await self._work(self._context, lease)
 | |
| 123 | +        self.logger.debug("Work complete: {}".format(lease.id))
 | |
| 124 | +        self.lease_completed(lease)
 | |
| 125 | + | |
| 126 | +class Worker:
 | |
| 127 | +    def __init__(self, properties=None, configs=None):
 | |
| 128 | +        self.properties = {}
 | |
| 129 | +        self._configs = {}
 | |
| 130 | +        self._devices = []
 | |
| 131 | + | |
| 132 | +        if properties:
 | |
| 133 | +            for k, v in properties.items():
 | |
| 134 | +                if k == 'pool':
 | |
| 135 | +                    self.properties[k] = v
 | |
| 136 | +                else:
 | |
| 137 | +                    raise KeyError('Key not supported: {}'.format(k))
 | |
| 138 | + | |
| 139 | +        if configs:
 | |
| 140 | +            for k, v in configs.items():
 | |
| 141 | +                if k == 'DockerImage':
 | |
| 142 | +                    self.configs[k] = v
 | |
| 143 | +                else:
 | |
| 144 | +                    raise KeyError('Key not supported: {}'.format(k))
 | |
| 145 | + | |
| 146 | +    @property
 | |
| 147 | +    def configs(self):
 | |
| 148 | +        return self._configs
 | |
| 149 | + | |
| 150 | +    def add_device(self, device):
 | |
| 151 | +        self._devices.append(device)
 | |
| 152 | + | |
| 153 | +    def get_pb2(self):
 | |
| 154 | +        devices = [device.get_pb2() for device in self._devices]
 | |
| 155 | +        worker = worker_pb2.Worker(devices=devices)
 | |
| 156 | +        property_message = worker_pb2.Worker.Property()
 | |
| 157 | +        for k, v in self.properties.items():
 | |
| 158 | +            property_message.key = k
 | |
| 159 | +            property_message.value = v
 | |
| 160 | +            worker.properties.extend([property_message])
 | |
| 161 | + | |
| 162 | +        config_message = worker_pb2.Worker.Config()
 | |
| 163 | +        for k, v in self.properties.items():
 | |
| 164 | +            property_message.key = k
 | |
| 165 | +            property_message.value = v
 | |
| 166 | +            worker.configs.extend([config_message])
 | |
| 167 | + | |
| 168 | +        return worker
 | |
| 169 | + | |
| 170 | +class Device:
 | |
| 171 | +    def __init__(self, properties=None):
 | |
| 172 | +        """ Creates devices available to the worker
 | |
| 173 | +        The first device is know as the Primary Device - the revice which
 | |
| 174 | +        is running a bit and responsible to actually executing commands.
 | |
| 175 | +        All other devices are known as Attatched Devices and must be controlled
 | |
| 176 | +        by the Primary Device.
 | |
| 177 | +        """
 | |
| 178 | + | |
| 179 | +        self._name = str(uuid.uuid4())
 | |
| 180 | +        self._properties = {}
 | |
| 181 | + | |
| 182 | +        if properties:
 | |
| 183 | +            for k, v in properties.items():
 | |
| 184 | +                if k == 'os':
 | |
| 185 | +                    self._properties[k] = v
 | |
| 186 | + | |
| 187 | +                elif k == 'docker':
 | |
| 188 | +                    if v not in ('True', 'False'):
 | |
| 189 | +                        raise ValueError('Value not supported: {}'.format(v))
 | |
| 190 | +                    self._properties[k] = v
 | |
| 191 | + | |
| 192 | +                else:
 | |
| 193 | +                    raise KeyError('Key not supported: {}'.format(k))
 | |
| 194 | + | |
| 195 | +    @property
 | |
| 196 | +    def name(self):
 | |
| 197 | +        return self._name
 | |
| 198 | + | |
| 199 | +    @property
 | |
| 200 | +    def properties(self):
 | |
| 201 | +        return self._properties
 | |
| 202 | + | |
| 203 | + | |
| 204 | +    def get_pb2(self):
 | |
| 205 | +        device = worker_pb2.Device(handle=self._name)
 | |
| 206 | +        property_message = worker_pb2.Device.Property()
 | |
| 207 | +        for k, v in self._properties.items():
 | |
| 208 | +            property_message.key = k
 | |
| 209 | +            property_message.value = v
 | |
| 210 | +            device.properties.extend([property_message])
 | |
| 211 | +        return device | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | +#
 | |
| 15 | +# Authors:
 | |
| 16 | +#        Carter Sande <csande bloomberg net>
 | |
| 17 | + | |
| 18 | +"""
 | |
| 19 | +ActionCache
 | |
| 20 | +==================
 | |
| 21 | + | |
| 22 | +Implements a simple in-memory action cache.
 | |
| 23 | +"""
 | |
| 24 | + | |
| 25 | +import collections
 | |
| 26 | + | |
| 27 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
 | |
| 28 | + | |
| 29 | +class ActionCache:
 | |
| 30 | + | |
| 31 | +    def __init__(self, storage, max_cached_actions):
 | |
| 32 | +        self._storage = storage
 | |
| 33 | +        self._max_cached_actions = max_cached_actions
 | |
| 34 | +        self._digest_map = collections.OrderedDict()
 | |
| 35 | + | |
| 36 | +    def get_action_result(self, action_digest):
 | |
| 37 | +        """Return the cached ActionResult for the given Action digest, or None
 | |
| 38 | +        if there isn't one.
 | |
| 39 | +        """
 | |
| 40 | +        key = (action_digest.hash, action_digest.size_bytes)
 | |
| 41 | +        if key in self._digest_map:
 | |
| 42 | +            action_result = self._storage.get_message(self._digest_map[key],
 | |
| 43 | +                                                      re_pb2.ActionResult)
 | |
| 44 | +            if action_result is not None:
 | |
| 45 | +                if self._blobs_still_exist(action_result):
 | |
| 46 | +                    self._digest_map.move_to_end(key)
 | |
| 47 | +                    return action_result
 | |
| 48 | +            del self._digest_map[key]
 | |
| 49 | +        return None
 | |
| 50 | + | |
| 51 | +    def put_action_result(self, action_digest, action_result):
 | |
| 52 | +        """Add the given ActionResult to the cache for the given Action
 | |
| 53 | +        digest.
 | |
| 54 | +        """
 | |
| 55 | +        if self._max_cached_actions == 0:
 | |
| 56 | +            return
 | |
| 57 | + | |
| 58 | +        while len(self._digest_map) >= self._max_cached_actions:
 | |
| 59 | +            self._digest_map.popitem(last=False)
 | |
| 60 | + | |
| 61 | +        key = (action_digest.hash, action_digest.size_bytes)
 | |
| 62 | +        action_result_digest = self._storage.put_message(action_result)
 | |
| 63 | +        self._digest_map[key] = action_result_digest
 | |
| 64 | + | |
| 65 | +    def _blobs_still_exist(self, action_result):
 | |
| 66 | +        """Return True if all the CAS blobs referenced by the given
 | |
| 67 | +        ActionResult are present in CAS.
 | |
| 68 | +        """
 | |
| 69 | +        blobs_needed = []
 | |
| 70 | +        
 | |
| 71 | +        for output_file in action_result.output_files:
 | |
| 72 | +            blobs_needed.append(output_file.digest)
 | |
| 73 | +        
 | |
| 74 | +        for output_directory in action_result.output_directories:
 | |
| 75 | +            blobs_needed.append(output_directory.tree_digest)
 | |
| 76 | +            tree = self._storage.get_message(output_directory.tree_digest,
 | |
| 77 | +                                             re_pb2.Tree)
 | |
| 78 | +            if tree is None:
 | |
| 79 | +                return False
 | |
| 80 | +            for file_node in tree.root.files:
 | |
| 81 | +                blobs_needed.append(file_node.digest)
 | |
| 82 | +            for child in tree.children:
 | |
| 83 | +                for file_node in child.files:
 | |
| 84 | +                    blobs_needed.append(file_node.digest)
 | |
| 85 | + | |
| 86 | +        if action_result.stdout_digest.hash and not action_result.stdout_raw:
 | |
| 87 | +            blobs_needed.append(action_result.stdout_digest)
 | |
| 88 | +        if action_result.stderr_digest.hash and not action_result.stderr_raw:
 | |
| 89 | +            blobs_needed.append(action_result.stderr_digest)
 | |
| 90 | + | |
| 91 | +        missing = self._storage.missing_blobs(blobs_needed)
 | |
| 92 | +        return len(missing) == 0 | 
| ... | ... | @@ -33,6 +33,7 @@ from buildgrid._protos.google.longrunning import operations_pb2_grpc | 
| 33 | 33 |  | 
| 34 | 34 |  from .cas.bytestream_service import ByteStreamService
 | 
| 35 | 35 |  from .cas.content_addressable_storage_service import ContentAddressableStorageService
 | 
| 36 | +from .execution.action_cache_service import ActionCacheService
 | |
| 36 | 37 |  from .execution.execution_service import ExecutionService
 | 
| 37 | 38 |  from .execution.operations_service import OperationsService
 | 
| 38 | 39 |  from .execution.execution_instance import ExecutionInstance
 | 
| ... | ... | @@ -42,11 +43,11 @@ from .worker.bots_interface import BotsInterface | 
| 42 | 43 |  | 
| 43 | 44 |  class BuildGridServer(object):
 | 
| 44 | 45 |  | 
| 45 | -    def __init__(self, port = '50051', max_workers = 10, cas_storage = None):
 | |
| 46 | +    def __init__(self, port = '50051', max_workers = 10, cas_storage = None, action_cache = None, allow_update_action_result = True):
 | |
| 46 | 47 |          port = '[::]:{0}'.format(port)
 | 
| 47 | -        scheduler = Scheduler()
 | |
| 48 | +        scheduler = Scheduler(action_cache)
 | |
| 48 | 49 |          bots_interface = BotsInterface(scheduler)
 | 
| 49 | -        execution_instance = ExecutionInstance(scheduler)
 | |
| 50 | +        execution_instance = ExecutionInstance(scheduler, cas_storage)
 | |
| 50 | 51 |  | 
| 51 | 52 |          self._server = grpc.server(futures.ThreadPoolExecutor(max_workers))
 | 
| 52 | 53 |          self._server.add_insecure_port(port)
 | 
| ... | ... | @@ -63,6 +64,12 @@ class BuildGridServer(object): | 
| 63 | 64 |                                                                                        self._server)
 | 
| 64 | 65 |              bytestream_pb2_grpc.add_ByteStreamServicer_to_server(ByteStreamService(cas_storage),
 | 
| 65 | 66 |                                                                   self._server)
 | 
| 67 | +        if action_cache is not None:
 | |
| 68 | +            action_cache_service = ActionCacheService(action_cache,
 | |
| 69 | +                                                      allow_update_action_result)
 | |
| 70 | +            remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(action_cache_service,
 | |
| 71 | +                                                                        self._server)
 | |
| 72 | + | |
| 66 | 73 |  | 
| 67 | 74 |      async def start(self):
 | 
| 68 | 75 |          self._server.start()
 | 
| ... | ... | @@ -24,6 +24,7 @@ The abstract base class for storage providers. | 
| 24 | 24 |  | 
| 25 | 25 |  import abc
 | 
| 26 | 26 |  | 
| 27 | +from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
 | |
| 27 | 28 |  from buildgrid._protos.google.rpc.status_pb2 import Status
 | 
| 28 | 29 |  from buildgrid._protos.google.rpc import code_pb2
 | 
| 29 | 30 |  | 
| ... | ... | @@ -96,3 +97,23 @@ class StorageABC(abc.ABC): | 
| 96 | 97 |                  else:
 | 
| 97 | 98 |                      result.append(Status(code=code_pb2.OK))
 | 
| 98 | 99 |          return result
 | 
| 100 | + | |
| 101 | +    def put_message(self, message):
 | |
| 102 | +        """Store the given Protobuf message in CAS, returning its digest."""
 | |
| 103 | +        message_blob = message.SerializeToString()
 | |
| 104 | +        digest = Digest(hash=HASH(message_blob).hexdigest(), size_bytes=len(message_blob))
 | |
| 105 | +        session = self.begin_write(digest)
 | |
| 106 | +        session.write(message_blob)
 | |
| 107 | +        self.commit_write(digest, session)
 | |
| 108 | +        return digest
 | |
| 109 | + | |
| 110 | +    def get_message(self, digest, message_type):
 | |
| 111 | +        """Retrieve the Protobuf message with the given digest and type from
 | |
| 112 | +        CAS. If the blob is not present, returns None.
 | |
| 113 | +        """
 | |
| 114 | +        message_blob = self.get_blob(digest)
 | |
| 115 | +        if message_blob is None:
 | |
| 116 | +            return None
 | |
| 117 | +        result = message_type.FromString(message_blob.read())
 | |
| 118 | +        message_blob.close()
 | |
| 119 | +        return result | 
| ... | ... | @@ -19,7 +19,7 @@ | 
| 19 | 19 |  ActionCacheService
 | 
| 20 | 20 |  ==================
 | 
| 21 | 21 |  | 
| 22 | -Action Cache currently not implemented.
 | |
| 22 | +Allows clients to manually query/update the action cache.
 | |
| 23 | 23 |  """
 | 
| 24 | 24 |  | 
| 25 | 25 |  import logging
 | 
| ... | ... | @@ -29,14 +29,21 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p | 
| 29 | 29 |  | 
| 30 | 30 |  class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
 | 
| 31 | 31 |  | 
| 32 | -    def __init__(self, instance):
 | |
| 33 | -        self._instance = instance
 | |
| 32 | +    def __init__(self, action_cache, allow_updates=True):
 | |
| 33 | +        self._action_cache = action_cache
 | |
| 34 | +        self._allow_updates = allow_updates
 | |
| 34 | 35 |          self.logger = logging.getLogger(__name__)
 | 
| 35 | 36 |  | 
| 36 | 37 |      def GetActionResult(self, request, context):
 | 
| 37 | -        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | |
| 38 | -        return remote_execution_pb2.ActionResult()
 | |
| 38 | +        result = self._action_cache.get_action_result(request.action_digest)
 | |
| 39 | +        if result is None:
 | |
| 40 | +            context.set_code(grpc.StatusCode.NOT_FOUND)
 | |
| 41 | +            return remote_execution_pb2.ActionResult()
 | |
| 42 | +        return result
 | |
| 39 | 43 |  | 
| 40 | 44 |      def UpdateActionResult(self, request, context):
 | 
| 41 | -        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | |
| 42 | -        return remote_execution_pb2.ActionResult() | |
| 45 | +        if not self._allow_updates:
 | |
| 46 | +            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | |
| 47 | +            return remote_execution_pb2.ActionResult()
 | |
| 48 | +        self._action_cache.put_action_result(request.action_digest, request.action_result)
 | |
| 49 | +        return request.action_result | 
| ... | ... | @@ -24,14 +24,17 @@ An instance of the Remote Execution Server. | 
| 24 | 24 |  import uuid
 | 
| 25 | 25 |  import logging
 | 
| 26 | 26 |  | 
| 27 | +from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
 | |
| 28 | + | |
| 27 | 29 |  from ._exceptions import InvalidArgumentError
 | 
| 28 | 30 |  | 
| 29 | 31 |  from ..job import Job, ExecuteStage
 | 
| 30 | 32 |  | 
| 31 | 33 |  class ExecutionInstance():
 | 
| 32 | 34 |  | 
| 33 | -    def __init__(self, scheduler):
 | |
| 35 | +    def __init__(self, scheduler, storage = None):
 | |
| 34 | 36 |          self.logger = logging.getLogger(__name__)
 | 
| 37 | +        self._storage = storage
 | |
| 35 | 38 |          self._scheduler = scheduler
 | 
| 36 | 39 |  | 
| 37 | 40 |      def execute(self, action_digest, skip_cache_lookup, message_queue=None):
 | 
| ... | ... | @@ -39,13 +42,17 @@ class ExecutionInstance(): | 
| 39 | 42 |          Queues an action and creates an Operation instance to be associated with
 | 
| 40 | 43 |          this action.
 | 
| 41 | 44 |          """
 | 
| 42 | -        job = Job(action_digest, message_queue)
 | |
| 45 | + | |
| 46 | +        do_not_cache = False
 | |
| 47 | +        if self._storage is not None:
 | |
| 48 | +            action = self._storage.get_message(action_digest, Action)
 | |
| 49 | +            if action is not None:
 | |
| 50 | +                do_not_cache = action.do_not_cache
 | |
| 51 | + | |
| 52 | +        job = Job(action_digest, do_not_cache, message_queue)
 | |
| 43 | 53 |          self.logger.info("Operation name: {}".format(job.name))
 | 
| 44 | 54 |  | 
| 45 | -        if not skip_cache_lookup:
 | |
| 46 | -            raise NotImplementedError("ActionCache not implemented")
 | |
| 47 | -        else:
 | |
| 48 | -            self._scheduler.append_job(job)
 | |
| 55 | +        self._scheduler.append_job(job, skip_cache_lookup)
 | |
| 49 | 56 |  | 
| 50 | 57 |          return job.get_operation()
 | 
| 51 | 58 |  | 
| ... | ... | @@ -18,8 +18,6 @@ | 
| 18 | 18 |  import logging
 | 
| 19 | 19 |  import uuid
 | 
| 20 | 20 |  | 
| 21 | -import buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2
 | |
| 22 | - | |
| 23 | 21 |  from enum import Enum
 | 
| 24 | 22 |  | 
| 25 | 23 |  from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata, ExecuteResponse
 | 
| ... | ... | @@ -51,12 +49,14 @@ class LeaseState(Enum): | 
| 51 | 49 |  | 
| 52 | 50 |  class Job():
 | 
| 53 | 51 |  | 
| 54 | -    def __init__(self, action_digest, message_queue=None):
 | |
| 52 | +    def __init__(self, action_digest, do_not_cache=False, message_queue=None):
 | |
| 55 | 53 |          self.lease = None
 | 
| 56 | 54 |          self.logger = logging.getLogger(__name__)
 | 
| 57 | 55 |          self.result = None
 | 
| 56 | +        self.result_cached = False
 | |
| 58 | 57 |  | 
| 59 | 58 |          self._action_digest = action_digest
 | 
| 59 | +        self._do_not_cache = do_not_cache
 | |
| 60 | 60 |          self._execute_stage = ExecuteStage.UNKNOWN
 | 
| 61 | 61 |          self._n_tries = 0
 | 
| 62 | 62 |          self._name = str(uuid.uuid4())
 | 
| ... | ... | @@ -70,6 +70,14 @@ class Job(): | 
| 70 | 70 |      def name(self):
 | 
| 71 | 71 |          return self._name
 | 
| 72 | 72 |  | 
| 73 | +    @property
 | |
| 74 | +    def action_digest(self):
 | |
| 75 | +        return self._action_digest
 | |
| 76 | + | |
| 77 | +    @property
 | |
| 78 | +    def do_not_cache(self):
 | |
| 79 | +        return self._do_not_cache
 | |
| 80 | + | |
| 73 | 81 |      def check_job_finished(self):
 | 
| 74 | 82 |          if not self._operation_update_queues:
 | 
| 75 | 83 |              return self._operation.done
 | 
| ... | ... | @@ -88,6 +96,7 @@ class Job(): | 
| 88 | 96 |              self._operation.done = True
 | 
| 89 | 97 |              response = ExecuteResponse()
 | 
| 90 | 98 |              self.result.Unpack(response.result)
 | 
| 99 | +            response.cached_result = self.result_cached
 | |
| 91 | 100 |              self._operation.response.CopyFrom(self._pack_any(response))
 | 
| 92 | 101 |  | 
| 93 | 102 |          return self._operation
 | 
| ... | ... | @@ -95,6 +104,7 @@ class Job(): | 
| 95 | 104 |      def get_operation_meta(self):
 | 
| 96 | 105 |          meta = ExecuteOperationMetadata()
 | 
| 97 | 106 |          meta.stage = self._execute_stage.value
 | 
| 107 | +        meta.action_digest.CopyFrom(self._action_digest)
 | |
| 98 | 108 |  | 
| 99 | 109 |          return meta
 | 
| 100 | 110 |  | 
| ... | ... | @@ -23,7 +23,9 @@ Schedules jobs. | 
| 23 | 23 |  | 
| 24 | 24 |  from collections import deque
 | 
| 25 | 25 |  | 
| 26 | +from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult
 | |
| 26 | 27 |  from buildgrid._protos.google.longrunning import operations_pb2
 | 
| 28 | +from google.protobuf import any_pb2
 | |
| 27 | 29 |  | 
| 28 | 30 |  from .job import ExecuteStage, LeaseState
 | 
| 29 | 31 |  | 
| ... | ... | @@ -31,7 +33,8 @@ class Scheduler(): | 
| 31 | 33 |  | 
| 32 | 34 |      MAX_N_TRIES = 5
 | 
| 33 | 35 |  | 
| 34 | -    def __init__(self):
 | |
| 36 | +    def __init__(self, action_cache=None):
 | |
| 37 | +        self.action_cache = action_cache
 | |
| 35 | 38 |          self.jobs = {}
 | 
| 36 | 39 |          self.queue = deque()
 | 
| 37 | 40 |  | 
| ... | ... | @@ -44,36 +47,42 @@ class Scheduler(): | 
| 44 | 47 |          if job.check_job_finished():
 | 
| 45 | 48 |              del self.jobs[name]
 | 
| 46 | 49 |  | 
| 47 | -    def append_job(self, job):
 | |
| 48 | -        job.update_execute_stage(ExecuteStage.QUEUED)
 | |
| 50 | +    def append_job(self, job, skip_cache_lookup=False):
 | |
| 49 | 51 |          self.jobs[job.name] = job
 | 
| 52 | +        if self.action_cache is not None and not skip_cache_lookup:
 | |
| 53 | +            cached_result = self.action_cache.get_action_result(job.action_digest)
 | |
| 54 | +            if cached_result is not None:
 | |
| 55 | +                cached_result_any = any_pb2.Any()
 | |
| 56 | +                cached_result_any.Pack(cached_result)
 | |
| 57 | +                job.result = cached_result_any
 | |
| 58 | +                job.result_cached = True
 | |
| 59 | +                job.update_execute_stage(ExecuteStage.COMPLETED)
 | |
| 60 | +                return
 | |
| 50 | 61 |          self.queue.append(job)
 | 
| 62 | +        job.update_execute_stage(ExecuteStage.QUEUED)
 | |
| 51 | 63 |  | 
| 52 | 64 |      def retry_job(self, name):
 | 
| 53 | -        job = self.jobs[name]
 | |
| 54 | - | |
| 55 | -        if job.n_tries >= self.MAX_N_TRIES:
 | |
| 56 | -            # TODO: Decide what to do with these jobs
 | |
| 57 | -            job.update_execute_stage(ExecuteStage.COMPLETED)
 | |
| 58 | -        else:
 | |
| 59 | -            job.update_execute_stage(ExecuteStage.QUEUED)
 | |
| 60 | -            job.n_tries += 1
 | |
| 61 | -            self.queue.appendleft(job)
 | |
| 65 | +        if job in self.jobs[name]:
 | |
| 66 | +            if job.n_tries >= self.MAX_N_TRIES:
 | |
| 67 | +                # TODO: Decide what to do with these jobs
 | |
| 68 | +                job.update_execute_stage(ExecuteStage.COMPLETED)
 | |
| 69 | +                # TODO: Mark these jobs as done
 | |
| 70 | +            else:
 | |
| 71 | +                job.update_execute_stage(ExecuteStage.QUEUED)
 | |
| 72 | +                job.n_tries += 1
 | |
| 73 | +                self.queue.appendleft(job)
 | |
| 62 | 74 |  | 
| 63 | -        self.jobs[name] = job
 | |
| 64 | - | |
| 65 | -    def create_job(self):
 | |
| 66 | -        if len(self.queue) > 0:
 | |
| 67 | -            job = self.queue.popleft()
 | |
| 68 | -            job.update_execute_stage(ExecuteStage.EXECUTING)
 | |
| 69 | -            self.jobs[job.name] = job
 | |
| 70 | -            return job
 | |
| 75 | +            self.jobs[name] = job
 | |
| 71 | 76 |  | 
| 72 | 77 |      def job_complete(self, name, result):
 | 
| 73 | 78 |          job = self.jobs[name]
 | 
| 74 | 79 |          job.result = result
 | 
| 75 | 80 |          job.update_execute_stage(ExecuteStage.COMPLETED)
 | 
| 76 | 81 |          self.jobs[name] = job
 | 
| 82 | +        if not job.do_not_cache and self.action_cache is not None:
 | |
| 83 | +            action_result = ActionResult()
 | |
| 84 | +            result.Unpack(action_result)
 | |
| 85 | +            self.action_cache.put_action_result(job.action_digest, action_result)
 | |
| 77 | 86 |  | 
| 78 | 87 |      def get_operations(self):
 | 
| 79 | 88 |          response = operations_pb2.ListOperationsResponse()
 | 
| ... | ... | @@ -81,48 +90,13 @@ class Scheduler(): | 
| 81 | 90 |              response.operations.extend([v.get_operation()])
 | 
| 82 | 91 |          return response
 | 
| 83 | 92 |  | 
| 84 | -    def update_lease(self, lease):
 | |
| 85 | -        name = lease.id
 | |
| 93 | +    def update_job_lease_state(self, name, state):
 | |
| 86 | 94 |          job = self.jobs.get(name)
 | 
| 87 | -        state = lease.state
 | |
| 88 | - | |
| 89 | -        if state   == LeaseState.LEASE_STATE_UNSPECIFIED.value:
 | |
| 90 | -            create_job = self.create_job()
 | |
| 91 | -            if create_job is None:
 | |
| 92 | -                # No job? Return lease.
 | |
| 93 | -                return lease
 | |
| 94 | -            else:
 | |
| 95 | -                job = create_job
 | |
| 96 | -                job.create_lease()
 | |
| 97 | - | |
| 98 | -        elif state == LeaseState.PENDING.value:
 | |
| 99 | -            job.lease = lease
 | |
| 100 | - | |
| 101 | -        elif state == LeaseState.ACTIVE.value:
 | |
| 102 | -            job.lease = lease
 | |
| 103 | - | |
| 104 | -        elif state == LeaseState.COMPLETED.value:
 | |
| 105 | -            self.job_complete(job.name, lease.result)
 | |
| 106 | - | |
| 107 | -            create_job = self.create_job()
 | |
| 108 | -            if create_job is None:
 | |
| 109 | -                # Docs say not to use this state though if job has
 | |
| 110 | -                # completed and no more jobs, then use this state to stop
 | |
| 111 | -                # job being processed again
 | |
| 112 | -                job.lease = lease
 | |
| 113 | -                job.lease.state = LeaseState.LEASE_STATE_UNSPECIFIED.value
 | |
| 114 | -            else:
 | |
| 115 | -                job = create_job
 | |
| 116 | -                job.lease = job.create_lease()
 | |
| 117 | - | |
| 118 | -        elif state == LeaseState.CANCELLED.value:
 | |
| 119 | -            job.lease = lease
 | |
| 120 | - | |
| 121 | -        else:
 | |
| 122 | -            raise Exception("Unknown state: {}".format(state))
 | |
| 123 | - | |
| 95 | +        job.lease.state = state
 | |
| 124 | 96 |          self.jobs[name] = job
 | 
| 125 | -        return job.lease
 | |
| 97 | + | |
| 98 | +    def get_job_lease(self, name):
 | |
| 99 | +        return self.jobs[name].lease
 | |
| 126 | 100 |  | 
| 127 | 101 |      def cancel_session(self, name):
 | 
| 128 | 102 |          job = self.jobs[name]
 | 
| ... | ... | @@ -130,3 +104,12 @@ class Scheduler(): | 
| 130 | 104 |          if state == LeaseState.PENDING.value or \
 | 
| 131 | 105 |             state == LeaseState.ACTIVE.value:
 | 
| 132 | 106 |              self.retry_job(name)
 | 
| 107 | + | |
| 108 | +    def create_leases(self):
 | |
| 109 | +        while len(self.queue) > 0:
 | |
| 110 | +            job = self.queue.popleft()
 | |
| 111 | +            job.update_execute_stage(ExecuteStage.EXECUTING)
 | |
| 112 | +            job.lease = job.create_lease()
 | |
| 113 | +            job.lease.state = LeaseState.PENDING.value
 | |
| 114 | +            self.jobs[job.name] = job
 | |
| 115 | +            yield job.lease | 
| ... | ... | @@ -35,6 +35,7 @@ class BotsInterface(): | 
| 35 | 35 |          self.logger = logging.getLogger(__name__)
 | 
| 36 | 36 |  | 
| 37 | 37 |          self._bot_ids = {}
 | 
| 38 | +        self._bot_sessions = {}
 | |
| 38 | 39 |          self._scheduler = scheduler
 | 
| 39 | 40 |  | 
| 40 | 41 |      def create_bot_session(self, parent, bot_session):
 | 
| ... | ... | @@ -59,7 +60,12 @@ class BotsInterface(): | 
| 59 | 60 |          bot_session.name = name
 | 
| 60 | 61 |  | 
| 61 | 62 |          self._bot_ids[name] = bot_id
 | 
| 63 | +        self._bot_sessions[name] = bot_session
 | |
| 62 | 64 |          self.logger.info("Created bot session name={} with bot_id={}".format(name, bot_id))
 | 
| 65 | + | |
| 66 | +        for lease in self._scheduler.create_leases():
 | |
| 67 | +            bot_session.leases.extend([lease])
 | |
| 68 | + | |
| 63 | 69 |          return bot_session
 | 
| 64 | 70 |  | 
| 65 | 71 |      def update_bot_session(self, name, bot_session):
 | 
| ... | ... | @@ -69,13 +75,66 @@ class BotsInterface(): | 
| 69 | 75 |          self.logger.debug("Updating bot session name={}".format(name))
 | 
| 70 | 76 |          self._check_bot_ids(bot_session.bot_id, name)
 | 
| 71 | 77 |  | 
| 72 | -        leases = [self._scheduler.update_lease(lease) for lease in bot_session.leases]
 | |
| 78 | +        server_session = self._bot_sessions[name]
 | |
| 79 | + | |
| 80 | +        leases = filter(None, [self.check_states(lease) for lease in bot_session.leases])
 | |
| 73 | 81 |  | 
| 74 | 82 |          del bot_session.leases[:]
 | 
| 75 | 83 |          bot_session.leases.extend(leases)
 | 
| 76 | 84 |  | 
| 85 | +        for lease in self._scheduler.create_leases():
 | |
| 86 | +            bot_session.leases.extend([lease])
 | |
| 87 | + | |
| 88 | +        self._bot_sessions[name] = bot_session
 | |
| 77 | 89 |          return bot_session
 | 
| 78 | 90 |  | 
| 91 | +    def check_states(self, client_lease):
 | |
| 92 | +        """ Edge detector for states
 | |
| 93 | +        """
 | |
| 94 | +        ## TODO: Handle cancelled states
 | |
| 95 | +        try:
 | |
| 96 | +            server_lease = self._scheduler.get_job_lease(client_lease.id)
 | |
| 97 | +        except KeyError:
 | |
| 98 | +            raise InvalidArgumentError("Lease not found on server: {}".format(client_lease))
 | |
| 99 | + | |
| 100 | +        server_state = LeaseState(server_lease.state)
 | |
| 101 | +        client_state = LeaseState(client_lease.state)
 | |
| 102 | + | |
| 103 | +        if server_state == LeaseState.PENDING:
 | |
| 104 | + | |
| 105 | +            if client_state == LeaseState.ACTIVE:
 | |
| 106 | +                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
 | |
| 107 | +            elif client_state == LeaseState.COMPLETED:
 | |
| 108 | +                # TODO: Lease was rejected
 | |
| 109 | +                raise NotImplementedError("'Not Accepted' is unsupported")
 | |
| 110 | +            else:
 | |
| 111 | +                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
 | |
| 112 | + | |
| 113 | +        elif server_state == LeaseState.ACTIVE:
 | |
| 114 | + | |
| 115 | +            if client_state == LeaseState.ACTIVE:
 | |
| 116 | +                pass
 | |
| 117 | + | |
| 118 | +            elif client_state == LeaseState.COMPLETED:
 | |
| 119 | +                self._scheduler.job_complete(client_lease.id, client_lease.result)
 | |
| 120 | +                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
 | |
| 121 | +                return None
 | |
| 122 | + | |
| 123 | +            else:
 | |
| 124 | +                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
 | |
| 125 | + | |
| 126 | +        elif server_state == LeaseState.COMPLETED:
 | |
| 127 | +            raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
 | |
| 128 | + | |
| 129 | +        elif server_state == LeaseState.CANCELLED:
 | |
| 130 | +            raise NotImplementedError("Cancelled states not supported yet")
 | |
| 131 | + | |
| 132 | +        else:
 | |
| 133 | +            # Sould never get here
 | |
| 134 | +            raise OutofSyncError("State now allowed: {}".format(server_state))
 | |
| 135 | + | |
| 136 | +        return client_lease
 | |
| 137 | + | |
| 79 | 138 |      def _check_bot_ids(self, bot_id, name = None):
 | 
| 80 | 139 |          """ Checks the ID and the name of the bot.
 | 
| 81 | 140 |          """
 | 
| ... | ... | @@ -103,7 +162,11 @@ class BotsInterface(): | 
| 103 | 162 |              raise InvalidArgumentError("Bot id does not exist: {}".format(name))
 | 
| 104 | 163 |  | 
| 105 | 164 |          self.logger.debug("Attempting to close {} with name: {}".format(bot_id, name))
 | 
| 106 | -        self._scheduler.retry_job(name)
 | |
| 165 | +        for lease in self._bot_sessions[name].leases:
 | |
| 166 | +            if lease.state != LeaseState.COMPLETED.value:
 | |
| 167 | +                # TODO: Be wary here, may need to handle rejected leases in future
 | |
| 168 | +                self._scheduler.retry_job(lease.id)
 | |
| 169 | + | |
| 107 | 170 |          self.logger.debug("Closing bot session: {}".format(name))
 | 
| 108 | 171 |          self._bot_ids.pop(name)
 | 
| 109 | 172 |          self.logger.info("Closed bot {} with name: {}".format(bot_id, name)) | 
| ... | ... | @@ -43,7 +43,6 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 43 | 43 |              self.logger.error(e)
 | 
| 44 | 44 |              context.set_details(str(e))
 | 
| 45 | 45 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 46 | -            return bots_pb2.BotSession()
 | |
| 47 | 46 |  | 
| 48 | 47 |      def UpdateBotSession(self, request, context):
 | 
| 49 | 48 |          try:
 | 
| ... | ... | @@ -53,13 +52,16 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 53 | 52 |              self.logger.error(e)
 | 
| 54 | 53 |              context.set_details(str(e))
 | 
| 55 | 54 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 56 | -            return bots_pb2.BotSession()
 | |
| 57 | 55 |  | 
| 58 | 56 |          except OutofSyncError as e:
 | 
| 59 | 57 |              self.logger.error(e)
 | 
| 60 | 58 |              context.set_details(str(e))
 | 
| 61 | 59 |              context.set_code(grpc.StatusCode.DATA_LOSS)
 | 
| 62 | -            return bots_pb2.BotSession()
 | |
| 60 | + | |
| 61 | +        except NotImplementedError as e:
 | |
| 62 | +            self.logger.error(e)
 | |
| 63 | +            context.set_details(str(e))
 | |
| 64 | +            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | |
| 63 | 65 |  | 
| 64 | 66 |      def PostBotEventTemp(self, request, context):
 | 
| 65 | 67 |          context.set_code(grpc.StatusCode.UNIMPLEMENTED) | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | +#
 | |
| 15 | +# Authors:
 | |
| 16 | +#        Carter Sande <csande bloomberg net>
 | |
| 17 | + | |
| 18 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 19 | +import pytest
 | |
| 20 | + | |
| 21 | +from buildgrid.server import action_cache
 | |
| 22 | +from buildgrid.server.cas.storage import lru_memory_cache
 | |
| 23 | + | |
| 24 | +@pytest.fixture
 | |
| 25 | +def cas():
 | |
| 26 | +    return lru_memory_cache.LRUMemoryCache(1024 * 1024)
 | |
| 27 | + | |
| 28 | +def test_null_action_cache(cas):
 | |
| 29 | +    cache = action_cache.ActionCache(cas, 0)
 | |
| 30 | + | |
| 31 | +    action_digest1 = remote_execution_pb2.Digest(hash='alpha', size_bytes=4)
 | |
| 32 | +    dummy_result = remote_execution_pb2.ActionResult()
 | |
| 33 | + | |
| 34 | +    cache.put_action_result(action_digest1, dummy_result)
 | |
| 35 | +    assert cache.get_action_result(action_digest1) is None
 | |
| 36 | + | |
| 37 | +def test_action_cache_expiry(cas):
 | |
| 38 | +    cache = action_cache.ActionCache(cas, 2)
 | |
| 39 | + | |
| 40 | +    action_digest1 = remote_execution_pb2.Digest(hash='alpha', size_bytes=4)
 | |
| 41 | +    action_digest2 = remote_execution_pb2.Digest(hash='bravo', size_bytes=4)
 | |
| 42 | +    action_digest3 = remote_execution_pb2.Digest(hash='charlie', size_bytes=4)
 | |
| 43 | +    dummy_result = remote_execution_pb2.ActionResult()
 | |
| 44 | + | |
| 45 | +    cache.put_action_result(action_digest1, dummy_result)
 | |
| 46 | +    cache.put_action_result(action_digest2, dummy_result)
 | |
| 47 | + | |
| 48 | +    # Get digest 1 (making 2 the least recently used)
 | |
| 49 | +    assert cache.get_action_result(action_digest1) is not None
 | |
| 50 | +    # Add digest 3 (so 2 gets removed from the cache)
 | |
| 51 | +    cache.put_action_result(action_digest3, dummy_result)
 | |
| 52 | + | |
| 53 | +    assert cache.get_action_result(action_digest1) is not None
 | |
| 54 | +    assert cache.get_action_result(action_digest2) is None
 | |
| 55 | +    assert cache.get_action_result(action_digest3) is not None
 | |
| 56 | + | |
| 57 | +def test_action_cache_checks_cas(cas):
 | |
| 58 | +    cache = action_cache.ActionCache(cas, 50)
 | |
| 59 | + | |
| 60 | +    action_digest1 = remote_execution_pb2.Digest(hash='alpha', size_bytes=4)
 | |
| 61 | +    action_digest2 = remote_execution_pb2.Digest(hash='bravo', size_bytes=4)
 | |
| 62 | +    action_digest3 = remote_execution_pb2.Digest(hash='charlie', size_bytes=4)
 | |
| 63 | + | |
| 64 | +    # Create a tree that references digests in CAS
 | |
| 65 | +    sample_digest = cas.put_message(remote_execution_pb2.Command(arguments=["sample"]))
 | |
| 66 | +    tree = remote_execution_pb2.Tree()
 | |
| 67 | +    tree.root.files.add().digest.CopyFrom(sample_digest)
 | |
| 68 | +    tree.children.add().files.add().digest.CopyFrom(sample_digest)
 | |
| 69 | +    tree_digest = cas.put_message(tree)
 | |
| 70 | + | |
| 71 | +    # Add an ActionResult that references real digests to the cache
 | |
| 72 | +    action_result1 = remote_execution_pb2.ActionResult()
 | |
| 73 | +    action_result1.output_directories.add().tree_digest.CopyFrom(tree_digest)
 | |
| 74 | +    action_result1.output_files.add().digest.CopyFrom(sample_digest)
 | |
| 75 | +    action_result1.stdout_digest.CopyFrom(sample_digest)
 | |
| 76 | +    action_result1.stderr_digest.CopyFrom(sample_digest)
 | |
| 77 | +    cache.put_action_result(action_digest1, action_result1)
 | |
| 78 | + | |
| 79 | +    # Add ActionResults that reference fake digests to the cache
 | |
| 80 | +    action_result2 = remote_execution_pb2.ActionResult()
 | |
| 81 | +    action_result2.output_directories.add().tree_digest.hash = "nonexistent"
 | |
| 82 | +    action_result2.output_directories[0].tree_digest.size_bytes = 8
 | |
| 83 | +    cache.put_action_result(action_digest2, action_result2)
 | |
| 84 | + | |
| 85 | +    action_result3 = remote_execution_pb2.ActionResult()
 | |
| 86 | +    action_result3.stdout_digest.hash = "nonexistent"
 | |
| 87 | +    action_result3.stdout_digest.size_bytes = 8
 | |
| 88 | +    cache.put_action_result(action_digest3, action_result3)
 | |
| 89 | + | |
| 90 | +    # Verify we can get the first ActionResult but not the others
 | |
| 91 | +    fetched_result1 = cache.get_action_result(action_digest1)
 | |
| 92 | +    assert fetched_result1.output_directories[0].tree_digest.hash == tree_digest.hash
 | |
| 93 | +    assert cache.get_action_result(action_digest2) is None
 | |
| 94 | +    assert cache.get_action_result(action_digest3) is None | 
| ... | ... | @@ -23,45 +23,47 @@ from unittest import mock | 
| 23 | 23 |  from grpc._server import _Context
 | 
| 24 | 24 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 25 | 25 |  | 
| 26 | -from buildgrid.server import scheduler
 | |
| 27 | -from buildgrid.server.execution import execution_instance, action_cache_service
 | |
| 26 | +from buildgrid.server import action_cache
 | |
| 27 | +from buildgrid.server.cas.storage import lru_memory_cache
 | |
| 28 | +from buildgrid.server.execution import action_cache_service
 | |
| 28 | 29 |  | 
| 29 | 30 |  # Can mock this
 | 
| 30 | 31 |  @pytest.fixture
 | 
| 31 | 32 |  def context():
 | 
| 32 | 33 |      yield mock.MagicMock(spec = _Context)
 | 
| 33 | 34 |  | 
| 34 | -# Requests to make
 | |
| 35 | 35 |  @pytest.fixture
 | 
| 36 | -def execute_request():
 | |
| 37 | -    action = remote_execution_pb2.Action()
 | |
| 38 | -    action.command_digest.hash = 'zhora'
 | |
| 39 | - | |
| 40 | -    yield remote_execution_pb2.ExecuteRequest(instance_name = '',
 | |
| 41 | -                                              action = action,
 | |
| 42 | -                                              skip_cache_lookup = True)
 | |
| 36 | +def cas():
 | |
| 37 | +    yield lru_memory_cache.LRUMemoryCache(1024 * 1024)
 | |
| 43 | 38 |  | 
| 44 | 39 |  @pytest.fixture
 | 
| 45 | -def schedule():
 | |
| 46 | -    yield scheduler.Scheduler()
 | |
| 40 | +def cache(cas):
 | |
| 41 | +    yield action_cache.ActionCache(cas, 50)
 | |
| 47 | 42 |  | 
| 48 | -@pytest.fixture
 | |
| 49 | -def execution(schedule):
 | |
| 50 | -    yield execution_instance.ExecutionInstance(schedule)
 | |
| 43 | +def test_simple_action_result(cache, context):
 | |
| 44 | +    service = action_cache_service.ActionCacheService(cache)
 | |
| 45 | +    action_digest = remote_execution_pb2.Digest(hash='sample', size_bytes=4)
 | |
| 51 | 46 |  | 
| 52 | -# Instance to test
 | |
| 53 | -@pytest.fixture
 | |
| 54 | -def instance(execution):
 | |
| 55 | -    yield action_cache_service.ActionCacheService(execution)
 | |
| 47 | +    # Check that before adding the ActionResult, attempting to fetch it fails
 | |
| 48 | +    request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
 | |
| 49 | +    service.GetActionResult(request, context)
 | |
| 50 | +    context.set_code.assert_called_once_with(grpc.StatusCode.NOT_FOUND)
 | |
| 56 | 51 |  | 
| 57 | -def test_get_action_result(instance, context):
 | |
| 58 | -    request = remote_execution_pb2.GetActionResultRequest()
 | |
| 59 | -    instance.GetActionResult(request, context)
 | |
| 52 | +    # Add an ActionResult to the cache
 | |
| 53 | +    action_result = remote_execution_pb2.ActionResult(stdout_raw=b'example output')
 | |
| 54 | +    request = remote_execution_pb2.UpdateActionResultRequest(action_digest=action_digest,
 | |
| 55 | +                                                             action_result=action_result)
 | |
| 56 | +    service.UpdateActionResult(request, context)
 | |
| 60 | 57 |  | 
| 61 | -    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | |
| 58 | +    # Check that fetching it now works
 | |
| 59 | +    request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
 | |
| 60 | +    fetched_result = service.GetActionResult(request, context)
 | |
| 61 | +    assert fetched_result.stdout_raw == action_result.stdout_raw
 | |
| 62 | + | |
| 63 | +def test_disabled_update_action_result(cache, context):
 | |
| 64 | +    service = action_cache_service.ActionCacheService(cache, False)
 | |
| 62 | 65 |  | 
| 63 | -def test_update_action_result(instance, context):
 | |
| 64 | 66 |      request = remote_execution_pb2.UpdateActionResultRequest()
 | 
| 65 | -    instance.UpdateActionResult(request, context)
 | |
| 67 | +    service.UpdateActionResult(request, context)
 | |
| 66 | 68 |  | 
| 67 | 69 |      context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED) | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | +import grpc
 | |
| 16 | +import pytest
 | |
| 17 | +import uuid
 | |
| 18 | + | |
| 19 | +from unittest import mock
 | |
| 20 | + | |
| 21 | +from buildgrid.bot import bot_session, bot_interface
 | |
| 22 | + | |
| 23 | +@pytest.mark.parametrize("docker_value", ["True", "False"])
 | |
| 24 | +@pytest.mark.parametrize("os_value", ["nexus7", "nexus8"])
 | |
| 25 | +def test_create_device(docker_value, os_value):
 | |
| 26 | +    properties = {'docker' : docker_value, 'os' : os_value}
 | |
| 27 | +    device = bot_session.Device(properties)
 | |
| 28 | + | |
| 29 | +    assert uuid.UUID(device.name, version=4)
 | |
| 30 | +    assert properties == device.properties
 | |
| 31 | + | |
| 32 | +def test_create_device_key_fail():
 | |
| 33 | +    properties = {'voight' : 'kampff'}
 | |
| 34 | + | |
| 35 | +    with pytest.raises(KeyError):
 | |
| 36 | +        device = bot_session.Device(properties)
 | |
| 37 | + | |
| 38 | +def test_create_device_value_fail():
 | |
| 39 | +    properties = {'docker' :  True}
 | |
| 40 | + | |
| 41 | +    with pytest.raises(ValueError):
 | |
| 42 | +        device = bot_session.Device(properties)
 | |
| 43 | + | |
| 44 | +def test_create_worker():
 | |
| 45 | +    properties = {'pool' : 'swim'}
 | |
| 46 | +    configs = {'DockerImage' : 'Windows'}
 | |
| 47 | +    worker = bot_session.Worker(properties, configs)
 | |
| 48 | + | |
| 49 | +    assert properties == worker.properties
 | |
| 50 | +    assert configs == worker.configs
 | |
| 51 | + | |
| 52 | +    device = bot_session.Device()
 | |
| 53 | +    worker.add_device(device)
 | |
| 54 | + | |
| 55 | +    assert worker._devices[0] == device
 | |
| 56 | + | |
| 57 | +def test_create_worker_key_fail():
 | |
| 58 | +    properties = {'voight' : 'kampff'}
 | |
| 59 | +    configs = {'voight' : 'kampff'}
 | |
| 60 | + | |
| 61 | +    with pytest.raises(KeyError):
 | |
| 62 | +        bot_session.Worker(properties)
 | |
| 63 | +    with pytest.raises(KeyError):
 | |
| 64 | +        bot_session.Worker(configs) | 
| ... | ... | @@ -36,6 +36,12 @@ from buildgrid.server.worker import bots_interface, bots_service | 
| 36 | 36 |  def context():
 | 
| 37 | 37 |      yield mock.MagicMock(spec = _Context)
 | 
| 38 | 38 |  | 
| 39 | +@pytest.fixture
 | |
| 40 | +def action_job():
 | |
| 41 | +    action_digest = remote_execution_pb2.Digest()
 | |
| 42 | +    j = job.Job(action_digest, None)
 | |
| 43 | +    yield j
 | |
| 44 | + | |
| 39 | 45 |  @pytest.fixture
 | 
| 40 | 46 |  def bot_session():
 | 
| 41 | 47 |      bot = bots_pb2.BotSession()
 | 
| ... | ... | @@ -101,7 +107,6 @@ def test_update_bot_session_zombie(bot_session, context, instance): | 
| 101 | 107 |  | 
| 102 | 108 |      response = instance.UpdateBotSession(request, context)
 | 
| 103 | 109 |  | 
| 104 | -    assert isinstance(response, bots_pb2.BotSession)
 | |
| 105 | 110 |      context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 106 | 111 |  | 
| 107 | 112 |  def test_update_bot_session_bot_id_fail(bot_session, context, instance):
 | 
| ... | ... | @@ -113,62 +118,121 @@ def test_update_bot_session_bot_id_fail(bot_session, context, instance): | 
| 113 | 118 |  | 
| 114 | 119 |      context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 115 | 120 |  | 
| 116 | -@pytest.mark.parametrize("number_of_leases", [1, 3, 500])
 | |
| 117 | -def test_update_leases(number_of_leases, bot_session, context, instance):
 | |
| 118 | -    leases = [bots_pb2.Lease() for x in range(number_of_leases)]
 | |
| 119 | -    bot_session.leases.extend(leases)
 | |
| 121 | +@pytest.mark.parametrize("number_of_jobs", [0, 1, 3, 500])
 | |
| 122 | +def test_number_of_leases(number_of_jobs, bot_session, context, instance):
 | |
| 120 | 123 |      request = bots_pb2.CreateBotSessionRequest(parent='',
 | 
| 121 | 124 |                                                 bot_session=bot_session)
 | 
| 122 | -    # Simulated the severed binding between client and server
 | |
| 123 | -    bot = copy.deepcopy(instance.CreateBotSession(request, context))
 | |
| 124 | - | |
| 125 | -    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
 | |
| 126 | -                                               bot_session=bot)
 | |
| 125 | +    # Inject work
 | |
| 126 | +    for n in range(0, number_of_jobs):
 | |
| 127 | +        action_digest = remote_execution_pb2.Digest()
 | |
| 128 | +        instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 127 | 129 |  | 
| 128 | -    response = instance.UpdateBotSession(request, context)
 | |
| 130 | +    response = instance.CreateBotSession(request, context)
 | |
| 129 | 131 |  | 
| 132 | +    assert len(response.leases) == number_of_jobs
 | |
| 130 | 133 |      assert isinstance(response, bots_pb2.BotSession)
 | 
| 131 | -    assert len(response.leases) == len(bot.leases)
 | |
| 132 | -    assert bot == response
 | |
| 133 | 134 |  | 
| 134 | 135 |  def test_update_leases_with_work(bot_session, context, instance):
 | 
| 135 | -    leases = [bots_pb2.Lease() for x in range(2)]
 | |
| 136 | -    bot_session.leases.extend(leases)
 | |
| 136 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 137 | +                                               bot_session=bot_session)
 | |
| 138 | +    # Inject work
 | |
| 139 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 140 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 141 | + | |
| 142 | +    response = instance.CreateBotSession(request, context)
 | |
| 143 | + | |
| 144 | +    assert len(response.leases) == 1
 | |
| 145 | +    response_action = remote_execution_pb2.Digest()
 | |
| 146 | +    response.leases[0].payload.Unpack(response_action)
 | |
| 147 | + | |
| 148 | +    assert isinstance(response, bots_pb2.BotSession)
 | |
| 149 | +    assert response.leases[0].state == LeaseState.PENDING.value
 | |
| 150 | +    assert uuid.UUID(response.leases[0].id, version=4)
 | |
| 151 | +    assert response_action == action_digest
 | |
| 152 | + | |
| 153 | +def test_update_leases_work_complete(bot_session, context, instance):
 | |
| 154 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 155 | +                                               bot_session=bot_session)
 | |
| 156 | +    # Create bot session
 | |
| 157 | +    # Simulated the severed binding between client and server
 | |
| 158 | +    response = copy.deepcopy(instance.CreateBotSession(request, context))
 | |
| 159 | + | |
| 160 | +    # Inject work
 | |
| 161 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 162 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 163 | + | |
| 164 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 165 | +                                               bot_session=response)
 | |
| 166 | +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | |
| 167 | + | |
| 168 | +    assert response.leases[0].state == LeaseState.PENDING.value
 | |
| 169 | +    response.leases[0].state = LeaseState.ACTIVE.value
 | |
| 170 | + | |
| 171 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 172 | +                                               bot_session=response)
 | |
| 173 | + | |
| 174 | +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | |
| 175 | + | |
| 176 | +    response.leases[0].state = LeaseState.COMPLETED.value
 | |
| 177 | + | |
| 178 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 179 | +                                               bot_session=response)
 | |
| 180 | +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | |
| 181 | + | |
| 182 | +    assert len(response.leases) == 0
 | |
| 183 | + | |
| 184 | +def test_work_rejected_by_bot(bot_session, context, instance):
 | |
| 185 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 186 | +                                               bot_session=bot_session)
 | |
| 187 | +    # Inject work
 | |
| 188 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 189 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 190 | + | |
| 191 | +    # Simulated the severed binding between client and server
 | |
| 192 | +    response = copy.deepcopy(instance.CreateBotSession(request, context))
 | |
| 193 | + | |
| 194 | +    # Reject work
 | |
| 195 | +    assert response.leases[0].state == LeaseState.PENDING.value
 | |
| 196 | +    response.leases[0].state = LeaseState.COMPLETED.value
 | |
| 197 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 198 | +                                               bot_session=response)
 | |
| 199 | + | |
| 200 | +    response = instance.UpdateBotSession(request, context)
 | |
| 201 | + | |
| 202 | +    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | |
| 137 | 203 |  | 
| 138 | -    # Inject some work to be done
 | |
| 139 | -    action = remote_execution_pb2.Action()
 | |
| 140 | -    action.command_digest.hash = 'rick'
 | |
| 141 | -    instance._instance._scheduler.append_job(job.Job(action))
 | |
| 142 | 204 |  | 
| 205 | +@pytest.mark.parametrize("state", [ LeaseState.LEASE_STATE_UNSPECIFIED, LeaseState.PENDING])
 | |
| 206 | +def test_work_out_of_sync_from_pending(state, bot_session, context, instance):
 | |
| 143 | 207 |      request = bots_pb2.CreateBotSessionRequest(parent='',
 | 
| 144 | 208 |                                                 bot_session=bot_session)
 | 
| 209 | +    # Inject work
 | |
| 210 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 211 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 145 | 212 |      # Simulated the severed binding between client and server
 | 
| 146 | 213 |      bot = copy.deepcopy(instance.CreateBotSession(request, context))
 | 
| 147 | 214 |  | 
| 148 | 215 |      request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
 | 
| 149 | 216 |                                                 bot_session=bot)
 | 
| 150 | 217 |  | 
| 151 | -    response = instance.UpdateBotSession(request, context)
 | |
| 152 | -    response_action = remote_execution_pb2.Action()
 | |
| 153 | -    _unpack_any(response.leases[0].payload, response_action)
 | |
| 218 | +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | |
| 154 | 219 |  | 
| 155 | -    assert isinstance(response, bots_pb2.BotSession)
 | |
| 156 | -    assert response.leases[0].state == LeaseState.PENDING.value
 | |
| 157 | -    assert response.leases[1].state == LeaseState.LEASE_STATE_UNSPECIFIED.value
 | |
| 158 | -    assert uuid.UUID(response.leases[0].id, version=4)
 | |
| 159 | -    assert response_action == action
 | |
| 220 | +    response.leases[0].state = state.value
 | |
| 160 | 221 |  | 
| 161 | -def test_update_leases_work_complete(bot_session, context, instance):
 | |
| 162 | -    leases = [bots_pb2.Lease() for x in range(2)]
 | |
| 163 | -    bot_session.leases.extend(leases)
 | |
| 222 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 223 | +                                               bot_session=response)
 | |
| 224 | + | |
| 225 | +    response = instance.UpdateBotSession(request, context)
 | |
| 164 | 226 |  | 
| 165 | -    # Inject some work to be done
 | |
| 166 | -    action = remote_execution_pb2.Action()
 | |
| 167 | -    action.command_digest.hash = 'rick'
 | |
| 168 | -    instance._instance._scheduler.append_job(job.Job(action))
 | |
| 227 | +    context.set_code.assert_called_once_with(grpc.StatusCode.DATA_LOSS)
 | |
| 169 | 228 |  | 
| 229 | +@pytest.mark.parametrize("state", [ LeaseState.LEASE_STATE_UNSPECIFIED, LeaseState.PENDING])
 | |
| 230 | +def test_work_out_of_sync_from_active(state, bot_session, context, instance):
 | |
| 170 | 231 |      request = bots_pb2.CreateBotSessionRequest(parent='',
 | 
| 171 | 232 |                                                 bot_session=bot_session)
 | 
| 233 | +    # Inject work
 | |
| 234 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 235 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 172 | 236 |      # Simulated the severed binding between client and server
 | 
| 173 | 237 |      bot = copy.deepcopy(instance.CreateBotSession(request, context))
 | 
| 174 | 238 |  | 
| ... | ... | @@ -177,26 +241,75 @@ def test_update_leases_work_complete(bot_session, context, instance): | 
| 177 | 241 |  | 
| 178 | 242 |      response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | 
| 179 | 243 |  | 
| 180 | -    operation_name = response.leases[0].id
 | |
| 244 | +    response = instance.UpdateBotSession(request, context)
 | |
| 181 | 245 |  | 
| 182 | -    assert response.leases[0].state == LeaseState.PENDING.value
 | |
| 183 | -    response.leases[0].state = LeaseState.COMPLETED.value
 | |
| 246 | +    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | |
| 247 | + | |
| 248 | +@pytest.mark.parametrize("state", [ LeaseState.LEASE_STATE_UNSPECIFIED, LeaseState.PENDING])
 | |
| 249 | +def test_work_out_of_sync_from_pending(state, bot_session, context, instance):
 | |
| 250 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 251 | +                                               bot_session=bot_session)
 | |
| 252 | +    # Inject work
 | |
| 253 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 254 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 255 | +    # Simulated the severed binding between client and server
 | |
| 256 | +    response = copy.deepcopy(instance.CreateBotSession(request, context))
 | |
| 257 | + | |
| 258 | +    response.leases[0].state = state.value
 | |
| 259 | + | |
| 260 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 261 | +                                               bot_session=response)
 | |
| 262 | + | |
| 263 | +    response = instance.UpdateBotSession(request, context)
 | |
| 264 | + | |
| 265 | +    context.set_code.assert_called_once_with(grpc.StatusCode.DATA_LOSS)
 | |
| 266 | + | |
| 267 | +@pytest.mark.parametrize("state", [ LeaseState.LEASE_STATE_UNSPECIFIED, LeaseState.PENDING])
 | |
| 268 | +def test_work_out_of_sync_from_active(state, bot_session, context, instance):
 | |
| 269 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 270 | +                                               bot_session=bot_session)
 | |
| 271 | +    # Inject work
 | |
| 272 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 273 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 274 | +    # Simulated the severed binding between client and server
 | |
| 275 | +    response = copy.deepcopy(instance.CreateBotSession(request, context))
 | |
| 276 | + | |
| 277 | +    response.leases[0].state = LeaseState.ACTIVE.value
 | |
| 278 | + | |
| 279 | +    request = copy.deepcopy(bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 280 | +                                                             bot_session=response))
 | |
| 281 | + | |
| 282 | +    response = instance.UpdateBotSession(request, context)
 | |
| 283 | + | |
| 284 | +    response.leases[0].state = state.value
 | |
| 184 | 285 |  | 
| 185 | 286 |      request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | 
| 186 | 287 |                                                 bot_session=response)
 | 
| 288 | + | |
| 289 | +    response = instance.UpdateBotSession(request, context)
 | |
| 290 | + | |
| 291 | +    context.set_code.assert_called_once_with(grpc.StatusCode.DATA_LOSS)
 | |
| 292 | + | |
| 293 | +def test_work_active_to_active(bot_session, context, instance):
 | |
| 294 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 295 | +                                               bot_session=bot_session)
 | |
| 296 | +    # Inject work
 | |
| 297 | +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
 | |
| 298 | +    instance._instance._scheduler.append_job(job.Job(action_digest))
 | |
| 187 | 299 |      # Simulated the severed binding between client and server
 | 
| 188 | -    response = copy.deepcopy(instance.UpdateBotSession(request, context))
 | |
| 189 | -    assert isinstance(response, bots_pb2.BotSession)
 | |
| 190 | -    assert instance._instance._scheduler.jobs[operation_name]._execute_stage == ExecuteStage.COMPLETED
 | |
| 300 | +    response = copy.deepcopy(instance.CreateBotSession(request, context))
 | |
| 301 | + | |
| 302 | +    response.leases[0].state = LeaseState.ACTIVE.value
 | |
| 303 | + | |
| 304 | +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
 | |
| 305 | +                                               bot_session=response)
 | |
| 306 | + | |
| 307 | +    response = instance.UpdateBotSession(request, context)
 | |
| 308 | + | |
| 309 | +    assert response.leases[0].state == LeaseState.ACTIVE.value
 | |
| 191 | 310 |  | 
| 192 | 311 |  def test_post_bot_event_temp(context, instance):
 | 
| 193 | 312 |      request = bots_pb2.PostBotEventTempRequest()
 | 
| 194 | 313 |      instance.PostBotEventTemp(request, context)
 | 
| 195 | 314 |  | 
| 196 | 315 |      context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED) | 
| 197 | - | |
| 198 | -def _unpack_any(unpack_from, to):
 | |
| 199 | -    any = any_pb2.Any()
 | |
| 200 | -    any.CopyFrom(unpack_from)
 | |
| 201 | -    any.Unpack(to)
 | |
| 202 | -    return to | 
| ... | ... | @@ -26,7 +26,8 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p | 
| 26 | 26 |  from buildgrid._protos.google.longrunning import operations_pb2
 | 
| 27 | 27 |  from google.protobuf import any_pb2
 | 
| 28 | 28 |  | 
| 29 | -from buildgrid.server import scheduler, job
 | |
| 29 | +from buildgrid.server import action_cache, scheduler, job
 | |
| 30 | +from buildgrid.server.cas.storage import lru_memory_cache
 | |
| 30 | 31 |  from buildgrid.server.execution import execution_instance, execution_service
 | 
| 31 | 32 |  | 
| 32 | 33 |  @pytest.fixture
 | 
| ... | ... | @@ -34,13 +35,14 @@ def context(): | 
| 34 | 35 |      cxt = mock.MagicMock(spec = _Context)
 | 
| 35 | 36 |      yield cxt
 | 
| 36 | 37 |  | 
| 37 | -@pytest.fixture
 | |
| 38 | -def schedule():
 | |
| 39 | -    yield scheduler.Scheduler()
 | |
| 40 | - | |
| 41 | -@pytest.fixture
 | |
| 42 | -def execution(schedule):
 | |
| 43 | -    yield execution_instance.ExecutionInstance(schedule)
 | |
| 38 | +@pytest.fixture(params=["action-cache", "no-action-cache"])
 | |
| 39 | +def execution(request):
 | |
| 40 | +    if request.param == "action-cache":
 | |
| 41 | +        storage = lru_memory_cache.LRUMemoryCache(1024 * 1024)
 | |
| 42 | +        cache = action_cache.ActionCache(storage, 50)
 | |
| 43 | +        schedule = scheduler.Scheduler(cache)
 | |
| 44 | +        return execution_instance.ExecutionInstance(schedule, storage)
 | |
| 45 | +    return execution_instance.ExecutionInstance(scheduler.Scheduler())
 | |
| 44 | 46 |  | 
| 45 | 47 |  # Instance to test
 | 
| 46 | 48 |  @pytest.fixture
 | 
| ... | ... | @@ -56,17 +58,15 @@ def test_execute(skip_cache_lookup, instance, context): | 
| 56 | 58 |                                                    action_digest = action_digest,
 | 
| 57 | 59 |                                                    skip_cache_lookup = skip_cache_lookup)
 | 
| 58 | 60 |      response = instance.Execute(request, context)
 | 
| 59 | -    if skip_cache_lookup is False:
 | |
| 60 | -        [r for r in response]
 | |
| 61 | -        context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | |
| 62 | -    else:
 | |
| 63 | -        result = next(response)
 | |
| 64 | -        assert isinstance(result, operations_pb2.Operation)
 | |
| 65 | -        metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | |
| 66 | -        result.metadata.Unpack(metadata)
 | |
| 67 | -        assert metadata.stage == job.ExecuteStage.QUEUED.value
 | |
| 68 | -        assert uuid.UUID(result.name, version=4)
 | |
| 69 | -        assert result.done is False
 | |
| 61 | + | |
| 62 | +    result = next(response)
 | |
| 63 | +    assert isinstance(result, operations_pb2.Operation)
 | |
| 64 | +    metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | |
| 65 | +    result.metadata.Unpack(metadata)
 | |
| 66 | +    assert metadata.stage == job.ExecuteStage.QUEUED.value
 | |
| 67 | +    assert uuid.UUID(result.name, version=4)
 | |
| 68 | +    assert result.done is False
 | |
| 69 | + | |
| 70 | 70 |  """
 | 
| 71 | 71 |  def test_wait_execution(instance, context):
 | 
| 72 | 72 |      # TODO: Figure out why next(response) hangs on the .get()
 | 
