[Notes] [Git][BuildGrid/buildgrid][finn/bot-refactor] 3 commits: Update CONTRIBUTING.rst



Title: GitLab

finnball pushed to branch finn/bot-refactor at BuildGrid / buildgrid

Commits:

15 changed files:

Changes:

  • .gitlab-ci.yml
    ... ... @@ -30,7 +30,7 @@ before_script:
    30 30
       script:
    
    31 31
         - ${BGD} server start &
    
    32 32
         - sleep 1 # Allow server to boot
    
    33
    -    - ${BGD} bot --host=0.0.0.0 dummy &
    
    33
    +    - ${BGD} bot --host=0.0.0.0 --continuous dummy &
    
    34 34
         - ${BGD} execute --host=0.0.0.0 request --wait-for-completion
    
    35 35
     
    
    36 36
     tests-debian-stretch:
    

  • CONTRIBUTING.rst
    ... ... @@ -9,8 +9,23 @@ We welcome contributions in the form of bug fixes or feature additions / enhance
    9 9
     
    
    10 10
     Any major feature additions should be raised as a proposal on the `Mailing List <https://lists.buildgrid.build/cgi-bin/mailman/listinfo/buildgrid/>`_ to be discussed, and then eventually followed up with an issue here on gitlab. We recommend that you propose the feature in advance of commencing work. We are also on irc, but do not have our own dedicated channel - you can find us on #buildstream on GIMPNet and #bazel on freenode.
    
    11 11
     
    
    12
    -The author of any patch is expected to take ownership of that code and is to support it for a reasonable
    
    13
    -time-frame. This means addressing any unforeseen side effects and quirks the feature may have introduced.
    
    12
    +The author of any patch is expected to take ownership of that code and is to support it for a reasonable time-frame. This means addressing any unforeseen side effects and quirks the feature may have introduced. More on this below in 'Granting Committer Access'.
    
    13
    +
    
    14
    +Granting Committer Access
    
    15
    +-------------------------
    
    16
    +
    
    17
    +We'll hand out commit access to anyone who has successfully landed a single patch to the code base. Please request this via irc or the mailing list.
    
    18
    +
    
    19
    +This of course relies on contributors being responsive and show willingness to address problems after landing branches there should not be any problems here.
    
    20
    +
    
    21
    +What we are expecting of committers here in general is basically to
    
    22
    +escalate the review in cases of uncertainty:
    
    23
    +
    
    24
    +* If the patch/branch is very trivial (obvious few line changes or typos etc), and you are confident of the change, there is no need for review.
    
    25
    +
    
    26
    +* If the patch/branch is non trivial, please obtain a review from another committer who is familiar with the area which the branch effects. An approval from someone who is not the patch author will be needed before any merge. 
    
    27
    +
    
    28
    +We don't have any detailed policy for "bad actors", but will of course handle things on a case by case basis - commit access should not result in commit wars or be used as a tool to subvert the project when disagreements arise, such incidents (if any) would surely lead to temporary suspension of commit rights.
    
    14 29
     
    
    15 30
     Patch Submissions
    
    16 31
     -----------------
    

  • app/commands/cmd_bot.py
    ... ... @@ -30,10 +30,12 @@ import os
    30 30
     import random
    
    31 31
     import subprocess
    
    32 32
     import tempfile
    
    33
    +import time
    
    33 34
     
    
    34 35
     from pathlib import Path, PurePath
    
    35 36
     
    
    36
    -from buildgrid.bot import bot
    
    37
    +from buildgrid.bot import bot, bot_interface
    
    38
    +from buildgrid.bot.bot_session import BotSession, Device, Worker
    
    37 39
     from buildgrid._exceptions import BotError
    
    38 40
     
    
    39 41
     from ..cli import pass_context
    
    ... ... @@ -43,18 +45,27 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p
    43 45
     from google.protobuf import any_pb2
    
    44 46
     
    
    45 47
     @click.group(short_help = 'Create a bot client')
    
    48
    +@click.option('--continuous', is_flag=True)
    
    46 49
     @click.option('--parent', default='bgd_test')
    
    47
    -@click.option('--number-of-leases', default=1)
    
    48 50
     @click.option('--port', default='50051')
    
    49 51
     @click.option('--host', default='localhost')
    
    50 52
     @pass_context
    
    51
    -def cli(context, host, port, number_of_leases, parent):
    
    53
    +def cli(context, host, port, parent, continuous):
    
    54
    +    channel = grpc.insecure_channel('{}:{}'.format(host, port))
    
    55
    +    interface = bot_interface.BotInterface(channel)
    
    56
    +
    
    52 57
         context.logger = logging.getLogger(__name__)
    
    53 58
         context.logger.info("Starting on port {}".format(port))
    
    54 59
     
    
    55
    -    context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
    
    56
    -    context.number_of_leases = number_of_leases
    
    57
    -    context.parent = parent
    
    60
    +    context.continuous = continuous
    
    61
    +
    
    62
    +    worker = Worker()
    
    63
    +    worker.add_device(Device())
    
    64
    +
    
    65
    +    bot_session = BotSession(parent, interface)
    
    66
    +    bot_session.add_worker(worker)
    
    67
    +
    
    68
    +    context.bot_session = bot_session
    
    58 69
     
    
    59 70
     @cli.command('dummy', short_help='Create a dummy bot session')
    
    60 71
     @pass_context
    
    ... ... @@ -63,15 +74,11 @@ def dummy(context):
    63 74
         Simple dummy client. Creates a session, accepts leases, does fake work and
    
    64 75
         updates the server.
    
    65 76
         """
    
    66
    -
    
    67
    -    context.logger.info("Creating a bot session")
    
    68
    -
    
    69 77
         try:
    
    70
    -        bot.Bot(work=_work_dummy,
    
    71
    -                context=context,
    
    72
    -                channel=context.channel,
    
    73
    -                parent=context.parent,
    
    74
    -                number_of_leases=context.number_of_leases)
    
    78
    +        b = bot.Bot(context.bot_session)
    
    79
    +        b.session(_work_dummy,
    
    80
    +                  context,
    
    81
    +                  context.continuous)
    
    75 82
     
    
    76 83
         except KeyboardInterrupt:
    
    77 84
             pass
    
    ... ... @@ -85,7 +92,7 @@ def dummy(context):
    85 92
     @click.option('--port', show_default = True, default=11001)
    
    86 93
     @click.option('--remote', show_default = True, default='localhost')
    
    87 94
     @pass_context
    
    88
    -def _work_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
    
    95
    +def work_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
    
    89 96
         """
    
    90 97
         Uses BuildBox to run commands.
    
    91 98
         """
    
    ... ... @@ -101,11 +108,14 @@ def _work_buildbox(context, remote, port, server_cert, client_key, client_cert,
    101 108
         context.fuse_dir = fuse_dir
    
    102 109
     
    
    103 110
         try:
    
    104
    -        bot.Bot(work=_work_buildbox,
    
    105
    -                context=context,
    
    106
    -                channel=context.channel,
    
    107
    -                parent=context.parent,
    
    108
    -                number_of_leases=context.number_of_leases)
    
    111
    +        b = bot.Bot(work=_work_buildbox,
    
    112
    +                    bot_session=context.bot_session,
    
    113
    +                    channel=context.channel,
    
    114
    +                    parent=context.parent)
    
    115
    +
    
    116
    +        b.session(context.parent,
    
    117
    +                  _work_buildbox,
    
    118
    +                  context)
    
    109 119
     
    
    110 120
         except KeyboardInterrupt:
    
    111 121
             pass
    

  • buildgrid/bot/bot.py
    ... ... @@ -23,163 +23,46 @@ Creates a bot session.
    23 23
     """
    
    24 24
     
    
    25 25
     import asyncio
    
    26
    -import inspect
    
    26
    +import collections
    
    27 27
     import logging
    
    28
    -import platform
    
    29
    -import queue
    
    30 28
     import time
    
    31 29
     
    
    32
    -from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
    
    33
    -
    
    34
    -from . import bot_interface
    
    30
    +from . import bot_interface, bot_session
    
    31
    +from .bot_session import BotStatus, LeaseState
    
    35 32
     from .._exceptions import BotError
    
    36 33
     
    
    37
    -class Bot(object):
    
    34
    +class Bot:
    
    38 35
         """
    
    39 36
         Creates a local BotSession.
    
    40 37
         """
    
    41 38
     
    
    42
    -    def __init__(self, work, context, channel, parent, number_of_leases):
    
    43
    -        if not inspect.iscoroutinefunction(work):
    
    44
    -            raise BotError("work function must be async")
    
    39
    +    UPDATE_PERIOD = 1
    
    45 40
     
    
    46
    -        self.interface = bot_interface.BotInterface(channel)
    
    41
    +    def __init__(self, bot_session):
    
    47 42
             self.logger = logging.getLogger(__name__)
    
    48 43
     
    
    49
    -        self._create_session(parent, number_of_leases)
    
    50
    -        self._work_queue = queue.Queue(maxsize = number_of_leases)
    
    51
    -
    
    52
    -        try:
    
    53
    -            while True:
    
    54
    -                ## TODO: Leases independently finish
    
    55
    -                ## Allow leases to queue finished work independently instead
    
    56
    -                ## of waiting for all to finish
    
    57
    -                futures = [self._do_work(work, context, lease) for lease in self._get_work()]
    
    58
    -                if futures:
    
    59
    -                    loop = asyncio.new_event_loop()
    
    60
    -                    leases_complete, _ = loop.run_until_complete(asyncio.wait(futures))
    
    61
    -                    work_complete = [(lease.result().id, lease.result(),) for lease in leases_complete]
    
    62
    -                    self._work_complete(work_complete)
    
    63
    -                    loop.close()
    
    64
    -                self._update_bot_session()
    
    65
    -                time.sleep(2)
    
    66
    -        except Exception as e:
    
    67
    -            self.logger.error(e)
    
    68
    -            raise BotError(e)
    
    69
    -
    
    70
    -    @property
    
    71
    -    def bot_session(self):
    
    72
    -        ## Read only, shouldn't have to set any of the variables in here
    
    73
    -        return self._bot_session
    
    74
    -
    
    75
    -    def close_session(self):
    
    76
    -        self.logger.warning("Session closing not yet implemented")
    
    77
    -
    
    78
    -    async def _do_work(self, work, context, lease):
    
    79
    -        """ Work is done here, work function should be asynchronous
    
    80
    -        """
    
    81
    -        self.logger.info("Work found: {}".format(lease.id))
    
    82
    -        lease = await work(context=context, lease=lease)
    
    83
    -        lease.state = bots_pb2.LeaseState.Value('COMPLETED')
    
    84
    -        self.logger.info("Work complete: {}".format(lease.id))
    
    85
    -        return lease
    
    86
    -
    
    87
    -    def _update_bot_session(self):
    
    88
    -        """ Should call the server periodically to inform the server the client
    
    89
    -        has not died.
    
    90
    -        """
    
    91
    -        self.logger.debug("Updating bot session")
    
    92
    -        self._bot_session = self.interface.update_bot_session(self._bot_session)
    
    93
    -        leases_update = ([self._update_lease(lease) for lease in self._bot_session.leases])
    
    94
    -        del self._bot_session.leases[:]
    
    95
    -        self._bot_session.leases.extend(leases_update)
    
    96
    -
    
    97
    -    def _get_work(self):
    
    98
    -        while not self._work_queue.empty():
    
    99
    -            yield self._work_queue.get()
    
    100
    -            self._work_queue.task_done()
    
    101
    -
    
    102
    -    def _work_complete(self, leases_complete):
    
    103
    -        """ Bot updates itself with any completed work.
    
    104
    -        """
    
    105
    -        # Should really improve this...
    
    106
    -        # Maybe add some call back function sentoff work...
    
    107
    -        leases_active = list(filter(self._lease_active, self._bot_session.leases))
    
    108
    -        leases_not_active = [lease for lease in self._bot_session.leases if not self._lease_active(lease)]
    
    109
    -        del self._bot_session.leases[:]
    
    110
    -        for lease in leases_active:
    
    111
    -            for lease_tuple in leases_complete:
    
    112
    -                if lease.id == lease_tuple[0]:
    
    113
    -                    leases_not_active.extend([lease_tuple[1]])
    
    114
    -        self._bot_session.leases.extend(leases_not_active)
    
    115
    -
    
    116
    -    def _update_lease(self, lease):
    
    117
    -        """
    
    118
    -        State machine for any recieved updates to the leases.
    
    119
    -        """
    
    120
    -        if self._lease_pending(lease):
    
    121
    -            lease.state = bots_pb2.LeaseState.Value('ACTIVE')
    
    122
    -            self._work_queue.put(lease)
    
    123
    -            return lease
    
    124
    -
    
    125
    -        else:
    
    126
    -            return lease
    
    127
    -
    
    128
    -    def _create_session(self, parent, number_of_leases):
    
    129
    -        self.logger.debug("Creating bot session")
    
    130
    -        worker = self._create_worker()
    
    44
    +        self._bot_session = bot_session
    
    131 45
     
    
    132
    -        """ Unique bot ID within the farm used to identify this bot
    
    133
    -        Needs to be human readable.
    
    134
    -        All prior sessions with bot_id of same ID are invalidated.
    
    135
    -        If a bot attempts to update an invalid session, it must be rejected and
    
    136
    -        may be put in quarantine.
    
    137
    -        """
    
    138
    -        bot_id = '{}.{}'.format(parent, platform.node())
    
    46
    +    def session(self, work, context, continuous = False):
    
    47
    +        loop = asyncio.get_event_loop()
    
    139 48
     
    
    140
    -        leases = [bots_pb2.Lease() for x in range(number_of_leases)]
    
    49
    +        self._bot_session.create_bot_session(work, context)
    
    141 50
     
    
    142
    -        bot_session = bots_pb2.BotSession(worker = worker,
    
    143
    -                                          status = bots_pb2.BotStatus.Value('OK'),
    
    144
    -                                          leases = leases,
    
    145
    -                                          bot_id = bot_id)
    
    146
    -        self._bot_session = self.interface.create_bot_session(parent, bot_session)
    
    147
    -        self.logger.info("Name: {}, Id: {}".format(self._bot_session.name,
    
    148
    -                                                      self._bot_session.bot_id))
    
    149
    -
    
    150
    -    def _create_worker(self):
    
    151
    -        devices = self._create_devices()
    
    152
    -
    
    153
    -        # Contains a list of devices and the connections between them.
    
    154
    -        worker = worker_pb2.Worker(devices = devices)
    
    155
    -
    
    156
    -        """ Keys supported:
    
    157
    -        *pool
    
    158
    -        """
    
    159
    -        worker.Property.key = "pool"
    
    160
    -        worker.Property.value = "all"
    
    161
    -
    
    162
    -        return worker
    
    163
    -
    
    164
    -    def _create_devices(self):
    
    165
    -        """ Creates devices available to the worker
    
    166
    -        The first device is know as the Primary Device - the revice which
    
    167
    -        is running a bit and responsible to actually executing commands.
    
    168
    -        All other devices are known as Attatched Devices and must be controlled
    
    169
    -        by the Primary Device.
    
    170
    -        """
    
    171
    -
    
    172
    -        devices = []
    
    173
    -
    
    174
    -        for i in range(0, 1): # Append one device for now
    
    175
    -            dev = worker_pb2.Device()
    
    176
    -
    
    177
    -            devices.append(dev)
    
    178
    -
    
    179
    -        return devices
    
    180
    -
    
    181
    -    def _lease_pending(self, lease):
    
    182
    -        return lease.state == bots_pb2.LeaseState.Value('PENDING')
    
    183
    -
    
    184
    -    def _lease_active(self, lease):
    
    185
    -        return lease.state == bots_pb2.LeaseState.Value('ACTIVE')
    51
    +        try:
    
    52
    +            task = asyncio.ensure_future(self._update_bot_session())
    
    53
    +            loop.run_forever()
    
    54
    +
    
    55
    +        except KeyboardInterrupt:
    
    56
    +            pass
    
    57
    +
    
    58
    +        finally:
    
    59
    +            task.cancel()
    
    60
    +            loop.close()
    
    61
    +
    
    62
    +    async def _update_bot_session(self):
    
    63
    +        while True:
    
    64
    +            """ Calls the server periodically to inform the server the client
    
    65
    +            has not died.
    
    66
    +            """
    
    67
    +            self._bot_session.update_bot_session()
    
    68
    +            await asyncio.sleep(self.UPDATE_PERIOD)

  • buildgrid/bot/bot_interface.py
    ... ... @@ -29,7 +29,7 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bo
    29 29
     
    
    30 30
     from .._exceptions import BotError
    
    31 31
     
    
    32
    -class BotInterface(object):
    
    32
    +class BotInterface:
    
    33 33
         """ Interface handles calls to the server.
    
    34 34
         """
    
    35 35
     
    
    ... ... @@ -39,22 +39,12 @@ class BotInterface(object):
    39 39
             self._stub = bots_pb2_grpc.BotsStub(channel)
    
    40 40
     
    
    41 41
         def create_bot_session(self, parent, bot_session):
    
    42
    -        try:
    
    43
    -            request = bots_pb2.CreateBotSessionRequest(parent = parent,
    
    44
    -                                                       bot_session = bot_session)
    
    45
    -            return self._stub.CreateBotSession(request)
    
    46
    -
    
    47
    -        except Exception as e:
    
    48
    -            self.logger.error(e)
    
    49
    -            raise BotError(e)
    
    42
    +        request = bots_pb2.CreateBotSessionRequest(parent = parent,
    
    43
    +                                                   bot_session = bot_session)
    
    44
    +        return self._stub.CreateBotSession(request)
    
    50 45
     
    
    51 46
         def update_bot_session(self, bot_session, update_mask = None):
    
    52
    -        try:
    
    53
    -            request = bots_pb2.UpdateBotSessionRequest(name = bot_session.name,
    
    54
    -                                                       bot_session = bot_session,
    
    55
    -                                                       update_mask = update_mask)
    
    56
    -            return self._stub.UpdateBotSession(request)
    
    57
    -
    
    58
    -        except Exception as e:
    
    59
    -            self.logger.error(e)
    
    60
    -            raise BotError(e)
    47
    +        request = bots_pb2.UpdateBotSessionRequest(name = bot_session.name,
    
    48
    +                                                   bot_session = bot_session,
    
    49
    +                                                   update_mask = update_mask)
    
    50
    +        return self._stub.UpdateBotSession(request)

  • buildgrid/bot/bot_session.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +"""
    
    16
    +Bot Session
    
    17
    +====
    
    18
    +
    
    19
    +Allows connections
    
    20
    +"""
    
    21
    +import asyncio
    
    22
    +import logging
    
    23
    +import platform
    
    24
    +import uuid
    
    25
    +
    
    26
    +from enum import Enum
    
    27
    +
    
    28
    +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
    
    29
    +
    
    30
    +class BotStatus(Enum):
    
    31
    +    BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
    
    32
    +    OK                     = bots_pb2.BotStatus.Value('OK')
    
    33
    +    UNHEALTHY              = bots_pb2.BotStatus.Value('UNHEALTHY');
    
    34
    +    HOST_REBOOTING         = bots_pb2.BotStatus.Value('HOST_REBOOTING')
    
    35
    +    BOT_TERMINATING        = bots_pb2.BotStatus.Value('BOT_TERMINATING')
    
    36
    +
    
    37
    +class LeaseState(Enum):
    
    38
    +    LEASE_STATE_UNSPECIFIED = bots_pb2.LeaseState.Value('LEASE_STATE_UNSPECIFIED')
    
    39
    +    PENDING                 = bots_pb2.LeaseState.Value('PENDING')
    
    40
    +    ACTIVE                  = bots_pb2.LeaseState.Value('ACTIVE')
    
    41
    +    COMPLETED               = bots_pb2.LeaseState.Value('COMPLETED')
    
    42
    +    CANCELLED               = bots_pb2.LeaseState.Value('CANCELLED')
    
    43
    +
    
    44
    +
    
    45
    +class BotSession:
    
    46
    +    def __init__(self, parent, interface):
    
    47
    +        """ Unique bot ID within the farm used to identify this bot
    
    48
    +        Needs to be human readable.
    
    49
    +        All prior sessions with bot_id of same ID are invalidated.
    
    50
    +        If a bot attempts to update an invalid session, it must be rejected and
    
    51
    +        may be put in quarantine.
    
    52
    +        """
    
    53
    +
    
    54
    +        self.logger = logging.getLogger(__name__)
    
    55
    +
    
    56
    +        self._bot_id = '{}.{}'.format(parent, platform.node())
    
    57
    +        self._interface = interface
    
    58
    +        self._leases = {}
    
    59
    +        self._name = None
    
    60
    +        self._parent = parent
    
    61
    +        self._status = BotStatus.OK.value
    
    62
    +        self._work = None
    
    63
    +        self._worker = None
    
    64
    +
    
    65
    +    @property
    
    66
    +    def bot_id(self):
    
    67
    +        return self._bot_id
    
    68
    +
    
    69
    +    def add_worker(self, worker):
    
    70
    +        self._worker = worker
    
    71
    +
    
    72
    +    def create_bot_session(self, work, context=None):
    
    73
    +        self.logger.debug("Creating bot session")
    
    74
    +        self._work = work
    
    75
    +        self._context = context
    
    76
    +
    
    77
    +        session = self._interface.create_bot_session(self._parent, self.get_pb2())
    
    78
    +        self._name = session.name
    
    79
    +        self.logger.info("Created bot session with name: {}".format(self._name))
    
    80
    +
    
    81
    +    def update_bot_session(self):
    
    82
    +        session = self._interface.update_bot_session(self.get_pb2())
    
    83
    +        for lease in session.leases:
    
    84
    +            self._update_lease_from_server(lease)
    
    85
    +
    
    86
    +    def get_pb2(self):
    
    87
    +        leases = list(self._leases.values())
    
    88
    +        if not leases:
    
    89
    +            leases = None
    
    90
    +
    
    91
    +        return bots_pb2.BotSession(worker=self._worker.get_pb2(),
    
    92
    +                                   status=self._status,
    
    93
    +                                   leases=leases,
    
    94
    +                                   bot_id=self._bot_id,
    
    95
    +                                   name = self._name)
    
    96
    +
    
    97
    +    def lease_completed(self, lease):
    
    98
    +        lease.state = LeaseState.COMPLETED.value
    
    99
    +        self._leases[lease.id] = lease
    
    100
    +
    
    101
    +    def _update_lease_from_server(self, lease):
    
    102
    +        """
    
    103
    +        State machine for any recieved updates to the leases.
    
    104
    +        """
    
    105
    +        ## TODO: Compare with previous state of lease
    
    106
    +        lease_bot = self._leases.get(lease.id)
    
    107
    +
    
    108
    +        if lease.state == LeaseState.PENDING.value:
    
    109
    +            lease.state = LeaseState.ACTIVE.value
    
    110
    +            asyncio.ensure_future(self.create_work(lease))
    
    111
    +            self._leases[lease.id] = lease
    
    112
    +
    
    113
    +        elif lease.state == LeaseState.COMPLETED.value and \
    
    114
    +           lease_bot.state == LeaseState.COMPLETED.value:
    
    115
    +            del self._leases[lease.id]
    
    116
    +
    
    117
    +    async def create_work(self, lease):
    
    118
    +        self.logger.debug("Work created: {}".format(lease.id))
    
    119
    +        lease = await self._work(self._context, lease)
    
    120
    +        self.logger.debug("Work complete: {}".format(lease.id))
    
    121
    +        self.lease_completed(lease)
    
    122
    +
    
    123
    +class Worker:
    
    124
    +    def __init__(self, properties=None, configs=None):
    
    125
    +        self.properties = {}
    
    126
    +        self._configs = {}
    
    127
    +        self._devices = []
    
    128
    +
    
    129
    +        if properties:
    
    130
    +            for k, v in properties.items():
    
    131
    +                if k == 'pool':
    
    132
    +                    self.properties[k] = v
    
    133
    +                else:
    
    134
    +                    raise KeyError('Key not supported: {}'.format(k))
    
    135
    +
    
    136
    +        if configs:
    
    137
    +            for k, v in configs.items():
    
    138
    +                if k == 'DockerImage':
    
    139
    +                    self.properties[k] = v
    
    140
    +                else:
    
    141
    +                    raise KeyError('Key not supported: {}'.format(k))
    
    142
    +
    
    143
    +    def add_device(self, device):
    
    144
    +        self._devices.append(device)
    
    145
    +
    
    146
    +    def get_pb2(self):
    
    147
    +        devices = [device.get_pb2() for device in self._devices]
    
    148
    +        worker = worker_pb2.Worker(devices=devices)
    
    149
    +        property_message = worker_pb2.Worker.Property()
    
    150
    +        for k, v in self.properties.items():
    
    151
    +            property_message.key = k
    
    152
    +            property_message.value = v
    
    153
    +            worker.properties.extend([property_message])
    
    154
    +
    
    155
    +        config_message = worker_pb2.Worker.Config()
    
    156
    +        for k, v in self.properties.items():
    
    157
    +            property_message.key = k
    
    158
    +            property_message.value = v
    
    159
    +            worker.configs.extend([config_message])
    
    160
    +
    
    161
    +        return worker
    
    162
    +
    
    163
    +class Device:
    
    164
    +    def __init__(self, properties=None):
    
    165
    +        """ Creates devices available to the worker
    
    166
    +        The first device is know as the Primary Device - the revice which
    
    167
    +        is running a bit and responsible to actually executing commands.
    
    168
    +        All other devices are known as Attatched Devices and must be controlled
    
    169
    +        by the Primary Device.
    
    170
    +        """
    
    171
    +
    
    172
    +        self._name = str(uuid.uuid4())
    
    173
    +        self._properties = {}
    
    174
    +
    
    175
    +        if properties:
    
    176
    +            for k, v in properties.items():
    
    177
    +                if k == 'os':
    
    178
    +                    self._properties[k] = v
    
    179
    +
    
    180
    +                elif k == 'docker':
    
    181
    +                    if v not in ('True', 'False'):
    
    182
    +                        raise ValueError('Value not supported: {}'.format(v))
    
    183
    +                    self._properties[k] = v
    
    184
    +
    
    185
    +                else:
    
    186
    +                    raise KeyError('Key not supported: {}'.format(k))
    
    187
    +
    
    188
    +    def get_pb2(self):
    
    189
    +        device = worker_pb2.Device(handle=self._name)
    
    190
    +        property_message = worker_pb2.Device.Property()
    
    191
    +        for k, v in self._properties.items():
    
    192
    +            property_message.key = k
    
    193
    +            property_message.value = v
    
    194
    +            device.properties.extend([property_message])
    
    195
    +        return device

  • buildgrid/server/execution/execution_instance.py
    ... ... @@ -34,12 +34,12 @@ class ExecutionInstance():
    34 34
             self.logger = logging.getLogger(__name__)
    
    35 35
             self._scheduler = scheduler
    
    36 36
     
    
    37
    -    def execute(self, action_digest, skip_cache_lookup):
    
    37
    +    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    38 38
             """ Sends a job for execution.
    
    39 39
             Queues an action and creates an Operation instance to be associated with
    
    40 40
             this action.
    
    41 41
             """
    
    42
    -        job = Job(action_digest)
    
    42
    +        job = Job(action_digest, message_queue)
    
    43 43
             self.logger.info("Operation name: {}".format(job.name))
    
    44 44
     
    
    45 45
             if not skip_cache_lookup:
    
    ... ... @@ -70,3 +70,15 @@ class ExecutionInstance():
    70 70
         def cancel_operation(self, name):
    
    71 71
             # TODO: Cancel leases
    
    72 72
             raise NotImplementedError("Cancelled operations not supported")
    
    73
    +
    
    74
    +    def register_message_client(self, name, queue):
    
    75
    +        try:
    
    76
    +            self._scheduler.register_client(name, queue)
    
    77
    +        except KeyError:
    
    78
    +            raise InvalidArgumentError("Operation name does not exist: {}".format(name))
    
    79
    +
    
    80
    +    def unregister_message_client(self, name, queue):
    
    81
    +        try:
    
    82
    +            self._scheduler.unregister_client(name, queue)
    
    83
    +        except KeyError:
    
    84
    +            raise InvalidArgumentError("Operation name does not exist: {}".format(name))

  • buildgrid/server/execution/execution_service.py
    ... ... @@ -22,10 +22,9 @@ ExecutionService
    22 22
     Serves remote execution requests.
    
    23 23
     """
    
    24 24
     
    
    25
    -import copy
    
    26 25
     import grpc
    
    27 26
     import logging
    
    28
    -import time
    
    27
    +import queue
    
    29 28
     
    
    30 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    31 30
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    ... ... @@ -35,17 +34,23 @@ from ._exceptions import InvalidArgumentError
    35 34
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    36 35
     
    
    37 36
         def __init__(self, instance):
    
    38
    -        self._instance = instance
    
    39 37
             self.logger = logging.getLogger(__name__)
    
    38
    +        self._instance = instance
    
    40 39
     
    
    41 40
         def Execute(self, request, context):
    
    42 41
             # Ignore request.instance_name for now
    
    43 42
             # Have only one instance
    
    44 43
             try:
    
    44
    +            message_queue = queue.Queue()
    
    45 45
                 operation = self._instance.execute(request.action_digest,
    
    46
    -                                               request.skip_cache_lookup)
    
    46
    +                                               request.skip_cache_lookup,
    
    47
    +                                               message_queue)
    
    47 48
     
    
    48
    -            yield from self._stream_operation_updates(operation.name)
    
    49
    +            remove_client = lambda : self._remove_client(operation.name, message_queue)
    
    50
    +            context.add_callback(remove_client)
    
    51
    +
    
    52
    +            yield from self._stream_operation_updates(message_queue,
    
    53
    +                                                      operation.name)
    
    49 54
     
    
    50 55
             except InvalidArgumentError as e:
    
    51 56
                 self.logger.error(e)
    
    ... ... @@ -59,19 +64,28 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    59 64
     
    
    60 65
         def WaitExecution(self, request, context):
    
    61 66
             try:
    
    62
    -            yield from self._stream_operation_updates(request.name)
    
    67
    +            message_queue = queue.Queue()
    
    68
    +            operation_name = request.name
    
    69
    +
    
    70
    +            self._instance.register_message_client(operation_name, message_queue)
    
    71
    +
    
    72
    +            remove_client = lambda : self._remove_client(operation_name, message_queue)
    
    73
    +            context.add_callback(remove_client)
    
    74
    +
    
    75
    +            yield from self._stream_operation_updates(message_queue,
    
    76
    +                                                      operation_name)
    
    63 77
     
    
    64 78
             except InvalidArgumentError as e:
    
    65 79
                 self.logger.error(e)
    
    66 80
                 context.set_details(str(e))
    
    67 81
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    68 82
     
    
    69
    -    def _stream_operation_updates(self, name):
    
    70
    -        stream_previous = None
    
    71
    -        while True:
    
    72
    -            stream = self._instance.get_operation(name)
    
    73
    -            if stream != stream_previous:
    
    74
    -                yield stream
    
    75
    -                if stream.done == True: break
    
    76
    -                stream_previous = copy.deepcopy(stream)
    
    77
    -            time.sleep(1)
    83
    +    def _remove_client(self, operation_name, message_queue):
    
    84
    +        self._instance.unregister_message_client(operation_name, message_queue)
    
    85
    +
    
    86
    +    def _stream_operation_updates(self, message_queue, operation_name):
    
    87
    +        operation = message_queue.get()
    
    88
    +        while not operation.done:
    
    89
    +            yield operation
    
    90
    +            operation = message_queue.get()
    
    91
    +        yield operation

  • buildgrid/server/job.py
    ... ... @@ -51,21 +51,39 @@ class LeaseState(Enum):
    51 51
     
    
    52 52
     class Job():
    
    53 53
     
    
    54
    -    def __init__(self, action):
    
    55
    -        self.action = action
    
    56
    -        self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
    
    57
    -        self.execute_stage = ExecuteStage.UNKNOWN
    
    54
    +    def __init__(self, action_digest, message_queue=None):
    
    58 55
             self.lease = None
    
    59 56
             self.logger = logging.getLogger(__name__)
    
    60
    -        self.name = str(uuid.uuid4())
    
    61 57
             self.result = None
    
    62 58
     
    
    59
    +        self._action_digest = action_digest
    
    60
    +        self._execute_stage = ExecuteStage.UNKNOWN
    
    63 61
             self._n_tries = 0
    
    64
    -        self._operation = operations_pb2.Operation(name = self.name)
    
    62
    +        self._name = str(uuid.uuid4())
    
    63
    +        self._operation = operations_pb2.Operation(name = self._name)
    
    64
    +        self._operation_update_queues = []
    
    65
    +
    
    66
    +        if message_queue is not None:
    
    67
    +            self.register_client(message_queue)
    
    68
    +
    
    69
    +    @property
    
    70
    +    def name(self):
    
    71
    +        return self._name
    
    72
    +
    
    73
    +    def check_job_finished(self):
    
    74
    +        if not self._operation_update_queues:
    
    75
    +            return self._operation.done
    
    76
    +        return False
    
    77
    +
    
    78
    +    def register_client(self, queue):
    
    79
    +        self._operation_update_queues.append(queue)
    
    80
    +        queue.put(self.get_operation())
    
    81
    +
    
    82
    +    def unregister_client(self, queue):
    
    83
    +        self._operation_update_queues.remove(queue)
    
    65 84
     
    
    66 85
         def get_operation(self):
    
    67 86
             self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
    
    68
    -
    
    69 87
             if self.result is not None:
    
    70 88
                 self._operation.done = True
    
    71 89
                 response = ExecuteResponse()
    
    ... ... @@ -76,15 +94,15 @@ class Job():
    76 94
     
    
    77 95
         def get_operation_meta(self):
    
    78 96
             meta = ExecuteOperationMetadata()
    
    79
    -        meta.stage = self.execute_stage.value
    
    97
    +        meta.stage = self._execute_stage.value
    
    80 98
     
    
    81 99
             return meta
    
    82 100
     
    
    83 101
         def create_lease(self):
    
    84
    -        action = self._pack_any(self.action)
    
    102
    +        action_digest = self._pack_any(self._action_digest)
    
    85 103
     
    
    86 104
             lease = bots_pb2.Lease(id = self.name,
    
    87
    -                               payload = action,
    
    105
    +                               payload = action_digest,
    
    88 106
                                    state = LeaseState.PENDING.value)
    
    89 107
             self.lease = lease
    
    90 108
             return lease
    
    ... ... @@ -92,6 +110,11 @@ class Job():
    92 110
         def get_operations(self):
    
    93 111
             return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
    
    94 112
     
    
    113
    +    def update_execute_stage(self, stage):
    
    114
    +        self._execute_stage = stage
    
    115
    +        for queue in self._operation_update_queues:
    
    116
    +            queue.put(self.get_operation())
    
    117
    +
    
    95 118
         def _pack_any(self, pack):
    
    96 119
             any = any_pb2.Any()
    
    97 120
             any.Pack(pack)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -35,36 +35,39 @@ class Scheduler():
    35 35
             self.jobs = {}
    
    36 36
             self.queue = deque()
    
    37 37
     
    
    38
    +    def register_client(self, name, queue):
    
    39
    +        self.jobs[name].register_client(queue)
    
    40
    +
    
    41
    +    def unregister_client(self, name, queue):
    
    42
    +        job = self.jobs[name]
    
    43
    +        job.unregister_client(queue)
    
    44
    +        if job.check_job_finished():
    
    45
    +            del self.jobs[name]
    
    46
    +
    
    38 47
         def append_job(self, job):
    
    39
    -        job.execute_stage = ExecuteStage.QUEUED
    
    48
    +        job.update_execute_stage(ExecuteStage.QUEUED)
    
    40 49
             self.jobs[job.name] = job
    
    41 50
             self.queue.append(job)
    
    42 51
     
    
    43 52
         def retry_job(self, name):
    
    44
    -        job = self.jobs[name]
    
    45
    -
    
    46
    -        if job.n_tries >= self.MAX_N_TRIES:
    
    47
    -            # TODO: Decide what to do with these jobs
    
    48
    -            job.execute_stage = ExecuteStage.COMPLETED
    
    49
    -        else:
    
    50
    -            job.execute_stage = ExecuteStage.QUEUED
    
    51
    -            job.n_tries += 1
    
    52
    -            self.queue.appendleft(job)
    
    53
    +        job = self.jobs.get(name)
    
    53 54
     
    
    54
    -        self.jobs[name] = job
    
    55
    +        if job is not None:
    
    56
    +            if job.n_tries >= self.MAX_N_TRIES:
    
    57
    +                # TODO: Decide what to do with these jobs
    
    58
    +                job.update_execute_stage(ExecuteStage.COMPLETED)
    
    59
    +                # TODO: Mark these jobs as done
    
    60
    +            else:
    
    61
    +                job.update_execute_stage(ExecuteStage.QUEUED)
    
    62
    +                job.n_tries += 1
    
    63
    +                self.queue.appendleft(job)
    
    55 64
     
    
    56
    -    def create_job(self):
    
    57
    -        if len(self.queue) > 0:
    
    58
    -            job = self.queue.popleft()
    
    59
    -            job.execute_stage = ExecuteStage.EXECUTING
    
    60
    -            self.jobs[job.name] = job
    
    61
    -            return job
    
    62
    -        return None
    
    65
    +            self.jobs[name] = job
    
    63 66
     
    
    64 67
         def job_complete(self, name, result):
    
    65 68
             job = self.jobs[name]
    
    66
    -        job.execute_stage = ExecuteStage.COMPLETED
    
    67 69
             job.result = result
    
    70
    +        job.update_execute_stage(ExecuteStage.COMPLETED)
    
    68 71
             self.jobs[name] = job
    
    69 72
     
    
    70 73
         def get_operations(self):
    
    ... ... @@ -73,48 +76,13 @@ class Scheduler():
    73 76
                 response.operations.extend([v.get_operation()])
    
    74 77
             return response
    
    75 78
     
    
    76
    -    def update_lease(self, lease):
    
    77
    -        name = lease.id
    
    79
    +    def update_job_lease_state(self, name, state):
    
    78 80
             job = self.jobs.get(name)
    
    79
    -        state = lease.state
    
    80
    -
    
    81
    -        if state   == LeaseState.LEASE_STATE_UNSPECIFIED.value:
    
    82
    -            create_job = self.create_job()
    
    83
    -            if create_job is None:
    
    84
    -                # No job? Return lease.
    
    85
    -                return lease
    
    86
    -            else:
    
    87
    -                job = create_job
    
    88
    -                job.lease = job.create_lease()
    
    89
    -
    
    90
    -        elif state == LeaseState.PENDING.value:
    
    91
    -            job.lease = lease
    
    92
    -
    
    93
    -        elif state == LeaseState.ACTIVE.value:
    
    94
    -            job.lease = lease
    
    95
    -
    
    96
    -        elif state == LeaseState.COMPLETED.value:
    
    97
    -            self.job_complete(job.name, lease.result)
    
    98
    -
    
    99
    -            create_job = self.create_job()
    
    100
    -            if create_job is None:
    
    101
    -                # Docs say not to use this state though if job has
    
    102
    -                # completed and no more jobs, then use this state to stop
    
    103
    -                # job being processed again
    
    104
    -                job.lease = lease
    
    105
    -                job.lease.state = LeaseState.LEASE_STATE_UNSPECIFIED.value
    
    106
    -            else:
    
    107
    -                job = create_job
    
    108
    -                job.lease = job.create_lease()
    
    109
    -
    
    110
    -        elif state == LeaseState.CANCELLED.value:
    
    111
    -            job.lease = lease
    
    112
    -
    
    113
    -        else:
    
    114
    -            raise Exception("Unknown state: {}".format(state))
    
    115
    -
    
    81
    +        job.lease.state = state
    
    116 82
             self.jobs[name] = job
    
    117
    -        return job.lease
    
    83
    +
    
    84
    +    def get_job_lease(self, name):
    
    85
    +        return self.jobs[name].lease
    
    118 86
     
    
    119 87
         def cancel_session(self, name):
    
    120 88
             job = self.jobs[name]
    
    ... ... @@ -122,3 +90,12 @@ class Scheduler():
    122 90
             if state == LeaseState.PENDING.value or \
    
    123 91
                state == LeaseState.ACTIVE.value:
    
    124 92
                 self.retry_job(name)
    
    93
    +
    
    94
    +    def create_leases(self):
    
    95
    +        while len(self.queue) > 0:
    
    96
    +            job = self.queue.popleft()
    
    97
    +            job.update_execute_stage(ExecuteStage.EXECUTING)
    
    98
    +            job.lease = job.create_lease()
    
    99
    +            job.lease.state = LeaseState.PENDING.value
    
    100
    +            self.jobs[job.name] = job
    
    101
    +            yield job.lease

  • buildgrid/server/worker/bots_interface.py
    ... ... @@ -35,6 +35,7 @@ class BotsInterface():
    35 35
             self.logger = logging.getLogger(__name__)
    
    36 36
     
    
    37 37
             self._bot_ids = {}
    
    38
    +        self._bot_sessions = {}
    
    38 39
             self._scheduler = scheduler
    
    39 40
     
    
    40 41
         def create_bot_session(self, parent, bot_session):
    
    ... ... @@ -59,6 +60,7 @@ class BotsInterface():
    59 60
             bot_session.name = name
    
    60 61
     
    
    61 62
             self._bot_ids[name] = bot_id
    
    63
    +        self._bot_sessions[name] = bot_session
    
    62 64
             self.logger.info("Created bot session name={} with bot_id={}".format(name, bot_id))
    
    63 65
             return bot_session
    
    64 66
     
    
    ... ... @@ -69,13 +71,63 @@ class BotsInterface():
    69 71
             self.logger.debug("Updating bot session name={}".format(name))
    
    70 72
             self._check_bot_ids(bot_session.bot_id, name)
    
    71 73
     
    
    72
    -        leases = [self._scheduler.update_lease(lease) for lease in bot_session.leases]
    
    74
    +        server_session = self._bot_sessions[name]
    
    75
    +
    
    76
    +        leases = filter(None, [self.check_states(lease) for lease in bot_session.leases])
    
    73 77
     
    
    74 78
             del bot_session.leases[:]
    
    75 79
             bot_session.leases.extend(leases)
    
    76 80
     
    
    81
    +        for lease in self._scheduler.create_leases():
    
    82
    +            bot_session.leases.extend([lease])
    
    83
    +
    
    84
    +        self._bot_sessions[name] = bot_session
    
    77 85
             return bot_session
    
    78 86
     
    
    87
    +    def check_states(self, client_lease):
    
    88
    +        """ Edge detector for states
    
    89
    +        """
    
    90
    +        ## TODO: Handle cancelled states
    
    91
    +        server_lease = self._scheduler.get_job_lease(client_lease.id)
    
    92
    +        server_state = LeaseState(server_lease.state)
    
    93
    +        client_state = LeaseState(client_lease.state)
    
    94
    +
    
    95
    +        if server_state == LeaseState.PENDING:
    
    96
    +
    
    97
    +            if client_state == LeaseState.ACTIVE:
    
    98
    +                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
    
    99
    +            elif client_state == LeaseState.COMPLETED:
    
    100
    +                # TODO: Lease was rejected
    
    101
    +                raise NotImplementedError("'Not Accepted' is unsupported")
    
    102
    +            else:
    
    103
    +                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    104
    +
    
    105
    +        elif server_state == LeaseState.ACTIVE:
    
    106
    +
    
    107
    +            if client_state == LeaseState.ACTIVE:
    
    108
    +                pass
    
    109
    +
    
    110
    +            elif client_state == LeaseState.COMPLETED:
    
    111
    +                self._scheduler.job_complete(client_lease.id, client_lease.result)
    
    112
    +                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
    
    113
    +                return None
    
    114
    +
    
    115
    +            else:
    
    116
    +                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    117
    +
    
    118
    +        elif server_state == LeaseState.COMPLETED:
    
    119
    +            raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    120
    +
    
    121
    +        elif server_state == LeaseState.CANCELLED:
    
    122
    +            raise NotImplementedError("Cancelled states not supported yet")
    
    123
    +
    
    124
    +        else:
    
    125
    +            # Sould never get here
    
    126
    +            raise OutofSyncError("State now allowed: {}".format(server_state))
    
    127
    +
    
    128
    +        return client_lease
    
    129
    +
    
    130
    +
    
    79 131
         def _check_bot_ids(self, bot_id, name = None):
    
    80 132
             """ Checks the ID and the name of the bot.
    
    81 133
             """
    
    ... ... @@ -103,7 +155,11 @@ class BotsInterface():
    103 155
                 raise InvalidArgumentError("Bot id does not exist: {}".format(name))
    
    104 156
     
    
    105 157
             self.logger.debug("Attempting to close {} with name: {}".format(bot_id, name))
    
    106
    -        self._scheduler.retry_job(name)
    
    158
    +        for lease in self._bot_sessions[name].leases:
    
    159
    +            if lease.state != LeaseState.COMPLETED.value:
    
    160
    +                # TODO: Be wary here, may need to handle rejected leases in future
    
    161
    +                self._scheduler.retry_job(lease.id)
    
    162
    +
    
    107 163
             self.logger.debug("Closing bot session: {}".format(name))
    
    108 164
             self._bot_ids.pop(name)
    
    109 165
             self.logger.info("Closed bot {} with name: {}".format(bot_id, name))

  • buildgrid/server/worker/bots_service.py
    ... ... @@ -43,7 +43,6 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    43 43
                 self.logger.error(e)
    
    44 44
                 context.set_details(str(e))
    
    45 45
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    46
    -            return bots_pb2.BotSession()
    
    47 46
     
    
    48 47
         def UpdateBotSession(self, request, context):
    
    49 48
             try:
    
    ... ... @@ -53,13 +52,16 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    53 52
                 self.logger.error(e)
    
    54 53
                 context.set_details(str(e))
    
    55 54
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    56
    -            return bots_pb2.BotSession()
    
    57 55
     
    
    58 56
             except OutofSyncError as e:
    
    59 57
                 self.logger.error(e)
    
    60 58
                 context.set_details(str(e))
    
    61 59
                 context.set_code(grpc.StatusCode.DATA_LOSS)
    
    62
    -            return bots_pb2.BotSession()
    
    60
    +
    
    61
    +        except NotImplementedError as e:
    
    62
    +            self.logger.error(e)
    
    63
    +            context.set_details(str(e))
    
    64
    +            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    63 65
     
    
    64 66
         def PostBotEventTemp(self, request, context):
    
    65 67
             context.set_code(grpc.StatusCode.UNIMPLEMENTED)

  • tests/integration/bot_interface.py
    1
    +# Copyright (C) 2018 Codethink Limited
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +#
    
    15
    +# Authors:
    
    16
    +#        Finn Ball <finn ball codethink co uk>
    
    17
    +
    
    18
    +import copy
    
    19
    +import grpc
    
    20
    +import logging
    
    21
    +import mock
    
    22
    +import pytest
    
    23
    +import uuid
    
    24
    +
    
    25
    +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
    
    26
    +
    
    27
    +from buildgrid.bot import bot, bot_interface
    
    28
    +
    
    29
    +async def _work_dummy(context, lease):
    
    30
    +    return lease
    
    31
    +
    
    32
    +class ContextMock():
    
    33
    +    def __init__(self):
    
    34
    +        self.logger = logging.getLogger(__name__)
    
    35
    +
    
    36
    +# GRPC context
    
    37
    +@pytest.fixture
    
    38
    +def context():
    
    39
    +    yield ContextMock()
    
    40
    +
    
    41
    +# GRPC context
    
    42
    +@pytest.fixture
    
    43
    +def channel():
    
    44
    +    yield mock.MagicMock(spec = grpc.insecure_channel)
    
    45
    +
    
    46
    +@mock.patch.object(bot.bot_interface, 'bots_pb2', autospec = True)
    
    47
    +@mock.patch.object(bot.bot_interface, 'bots_pb2_grpc', autospec = True)
    
    48
    +def test_me(mock_pb2, mock_pb2_grpc, channel, context):
    
    49
    +    pass

  • tests/integration/bots_service.py
    ... ... @@ -36,6 +36,12 @@ from buildgrid.server.worker import bots_interface, bots_service
    36 36
     def context():
    
    37 37
         yield mock.MagicMock(spec = _Context)
    
    38 38
     
    
    39
    +@pytest.fixture
    
    40
    +def action_job():
    
    41
    +    action_digest = remote_execution_pb2.Digest()
    
    42
    +    j = job.Job(action_digest, None)
    
    43
    +    yield j
    
    44
    +
    
    39 45
     @pytest.fixture
    
    40 46
     def bot_session():
    
    41 47
         bot = bots_pb2.BotSession()
    
    ... ... @@ -101,7 +107,6 @@ def test_update_bot_session_zombie(bot_session, context, instance):
    101 107
     
    
    102 108
         response = instance.UpdateBotSession(request, context)
    
    103 109
     
    
    104
    -    assert isinstance(response, bots_pb2.BotSession)
    
    105 110
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    106 111
     
    
    107 112
     def test_update_bot_session_bot_id_fail(bot_session, context, instance):
    
    ... ... @@ -113,35 +118,33 @@ def test_update_bot_session_bot_id_fail(bot_session, context, instance):
    113 118
     
    
    114 119
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    115 120
     
    
    116
    -@pytest.mark.parametrize("number_of_leases", [1, 3, 500])
    
    117
    -def test_update_leases(number_of_leases, bot_session, context, instance):
    
    118
    -    leases = [bots_pb2.Lease() for x in range(number_of_leases)]
    
    119
    -    bot_session.leases.extend(leases)
    
    121
    +@pytest.mark.parametrize("number_of_jobs", [0, 1, 3, 500])
    
    122
    +def test_number_of_leases(number_of_jobs, bot_session, context, instance):
    
    120 123
         request = bots_pb2.CreateBotSessionRequest(parent='',
    
    121 124
                                                    bot_session=bot_session)
    
    125
    +    # Inject work
    
    126
    +    for n in range(0, number_of_jobs):
    
    127
    +        action_digest = remote_execution_pb2.Digest()
    
    128
    +        instance._instance._scheduler.append_job(job.Job(action_digest))
    
    122 129
         # Simulated the severed binding between client and server
    
    123 130
         bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    124 131
     
    
    132
    +    # Creation of bot session should not create leases
    
    133
    +    assert len(bot.leases) == 0
    
    134
    +
    
    125 135
         request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    126 136
                                                    bot_session=bot)
    
    127 137
     
    
    128 138
         response = instance.UpdateBotSession(request, context)
    
    129 139
     
    
    130
    -    assert isinstance(response, bots_pb2.BotSession)
    
    131
    -    assert len(response.leases) == len(bot.leases)
    
    132
    -    assert bot == response
    
    140
    +    assert len(response.leases) == number_of_jobs
    
    133 141
     
    
    134 142
     def test_update_leases_with_work(bot_session, context, instance):
    
    135
    -    leases = [bots_pb2.Lease() for x in range(2)]
    
    136
    -    bot_session.leases.extend(leases)
    
    137
    -
    
    138
    -    # Inject some work to be done
    
    139
    -    action = remote_execution_pb2.Action()
    
    140
    -    action.command_digest.hash = 'rick'
    
    141
    -    instance._instance._scheduler.append_job(job.Job(action))
    
    142
    -
    
    143 143
         request = bots_pb2.CreateBotSessionRequest(parent='',
    
    144 144
                                                    bot_session=bot_session)
    
    145
    +    # Inject work
    
    146
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    147
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    145 148
         # Simulated the severed binding between client and server
    
    146 149
         bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    147 150
     
    
    ... ... @@ -149,26 +152,50 @@ def test_update_leases_with_work(bot_session, context, instance):
    149 152
                                                    bot_session=bot)
    
    150 153
     
    
    151 154
         response = instance.UpdateBotSession(request, context)
    
    152
    -    response_action = remote_execution_pb2.Action()
    
    153
    -    _unpack_any(response.leases[0].payload, response_action)
    
    154 155
     
    
    156
    +    response_action = remote_execution_pb2.Digest()
    
    157
    +    response.leases[0].payload.Unpack(response_action)
    
    155 158
         assert isinstance(response, bots_pb2.BotSession)
    
    156 159
         assert response.leases[0].state == LeaseState.PENDING.value
    
    157
    -    assert response.leases[1].state == LeaseState.LEASE_STATE_UNSPECIFIED.value
    
    158 160
         assert uuid.UUID(response.leases[0].id, version=4)
    
    159
    -    assert response_action == action
    
    161
    +    assert response_action == action_digest
    
    160 162
     
    
    161 163
     def test_update_leases_work_complete(bot_session, context, instance):
    
    162
    -    leases = [bots_pb2.Lease() for x in range(2)]
    
    163
    -    bot_session.leases.extend(leases)
    
    164
    +    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    165
    +                                               bot_session=bot_session)
    
    166
    +    # Inject work
    
    167
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    168
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    169
    +    # Simulated the severed binding between client and server
    
    170
    +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    171
    +
    
    172
    +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    173
    +                                               bot_session=bot)
    
    174
    +
    
    175
    +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    176
    +
    
    177
    +    response.leases[0].state = LeaseState.ACTIVE.value
    
    178
    +
    
    179
    +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    180
    +                                               bot_session=response)
    
    181
    +
    
    182
    +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    183
    +
    
    184
    +    response.leases[0].state = LeaseState.COMPLETED.value
    
    185
    +
    
    186
    +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    187
    +                                               bot_session=response)
    
    164 188
     
    
    165
    -    # Inject some work to be done
    
    166
    -    action = remote_execution_pb2.Action()
    
    167
    -    action.command_digest.hash = 'rick'
    
    168
    -    instance._instance._scheduler.append_job(job.Job(action))
    
    189
    +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    190
    +
    
    191
    +    assert len(response.leases) == 0
    
    169 192
     
    
    193
    +def test_work_rejected_by_bot(bot_session, context, instance):
    
    170 194
         request = bots_pb2.CreateBotSessionRequest(parent='',
    
    171 195
                                                    bot_session=bot_session)
    
    196
    +    # Inject work
    
    197
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    198
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    172 199
         # Simulated the severed binding between client and server
    
    173 200
         bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    174 201
     
    
    ... ... @@ -177,26 +204,125 @@ def test_update_leases_work_complete(bot_session, context, instance):
    177 204
     
    
    178 205
         response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    179 206
     
    
    180
    -    operation_name = response.leases[0].id
    
    207
    +    response.leases[0].state = LeaseState.COMPLETED.value
    
    208
    +
    
    209
    +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    210
    +                                               bot_session=response)
    
    211
    +
    
    212
    +    response = instance.UpdateBotSession(request, context)
    
    213
    +
    
    214
    +    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    215
    +
    
    216
    +def test_work_rejected_by_bot(bot_session, context, instance):
    
    217
    +    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    218
    +                                               bot_session=bot_session)
    
    219
    +    # Inject work
    
    220
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    221
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    222
    +    # Simulated the severed binding between client and server
    
    223
    +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    224
    +
    
    225
    +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    226
    +                                               bot_session=bot)
    
    227
    +
    
    228
    +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    181 229
     
    
    182
    -    assert response.leases[0].state == LeaseState.PENDING.value
    
    183 230
         response.leases[0].state = LeaseState.COMPLETED.value
    
    184 231
     
    
    185 232
         request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    186 233
                                                    bot_session=response)
    
    234
    +
    
    235
    +    response = instance.UpdateBotSession(request, context)
    
    236
    +
    
    237
    +    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    238
    +
    
    239
    +@pytest.mark.parametrize("state", [ LeaseState.LEASE_STATE_UNSPECIFIED, LeaseState.PENDING])
    
    240
    +def test_work_out_of_sync_from_pending(state, bot_session, context, instance):
    
    241
    +    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    242
    +                                               bot_session=bot_session)
    
    243
    +    # Inject work
    
    244
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    245
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    246
    +    # Simulated the severed binding between client and server
    
    247
    +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    248
    +
    
    249
    +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    250
    +                                               bot_session=bot)
    
    251
    +
    
    252
    +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    253
    +
    
    254
    +    response.leases[0].state = state.value
    
    255
    +
    
    256
    +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    257
    +                                               bot_session=response)
    
    258
    +
    
    259
    +    response = instance.UpdateBotSession(request, context)
    
    260
    +
    
    261
    +    context.set_code.assert_called_once_with(grpc.StatusCode.DATA_LOSS)
    
    262
    +
    
    263
    +@pytest.mark.parametrize("state", [ LeaseState.LEASE_STATE_UNSPECIFIED, LeaseState.PENDING])
    
    264
    +def test_work_out_of_sync_from_active(state, bot_session, context, instance):
    
    265
    +    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    266
    +                                               bot_session=bot_session)
    
    267
    +    # Inject work
    
    268
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    269
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    187 270
         # Simulated the severed binding between client and server
    
    271
    +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    272
    +
    
    273
    +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    274
    +                                               bot_session=bot)
    
    275
    +
    
    188 276
         response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    189
    -    assert isinstance(response, bots_pb2.BotSession)
    
    190
    -    assert instance._instance._scheduler.jobs[operation_name].execute_stage == ExecuteStage.COMPLETED
    
    277
    +
    
    278
    +    response.leases[0].state = LeaseState.ACTIVE.value
    
    279
    +
    
    280
    +    request = copy.deepcopy(bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    281
    +                                                             bot_session=response))
    
    282
    +
    
    283
    +    response = instance.UpdateBotSession(request, context)
    
    284
    +
    
    285
    +    response.leases[0].state = state.value
    
    286
    +
    
    287
    +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    288
    +                                               bot_session=response)
    
    289
    +
    
    290
    +    response = instance.UpdateBotSession(request, context)
    
    291
    +
    
    292
    +    context.set_code.assert_called_once_with(grpc.StatusCode.DATA_LOSS)
    
    293
    +
    
    294
    +def test_work_active_to_active(bot_session, context, instance):
    
    295
    +    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    296
    +                                               bot_session=bot_session)
    
    297
    +    # Inject work
    
    298
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    299
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    300
    +    # Simulated the severed binding between client and server
    
    301
    +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    302
    +
    
    303
    +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    304
    +                                               bot_session=bot)
    
    305
    +
    
    306
    +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    307
    +
    
    308
    +    response.leases[0].state = LeaseState.ACTIVE.value
    
    309
    +
    
    310
    +    request = copy.deepcopy(bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    311
    +                                                             bot_session=response))
    
    312
    +
    
    313
    +    response = instance.UpdateBotSession(request, context)
    
    314
    +
    
    315
    +    response.leases[0].state = LeaseState.ACTIVE.value
    
    316
    +
    
    317
    +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    318
    +                                               bot_session=response)
    
    319
    +
    
    320
    +    response = instance.UpdateBotSession(request, context)
    
    321
    +
    
    322
    +    assert response.leases[0].state == LeaseState.ACTIVE.value
    
    191 323
     
    
    192 324
     def test_post_bot_event_temp(context, instance):
    
    193 325
         request = bots_pb2.PostBotEventTempRequest()
    
    194 326
         instance.PostBotEventTemp(request, context)
    
    195 327
     
    
    196 328
         context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    197
    -
    
    198
    -def _unpack_any(unpack_from, to):
    
    199
    -    any = any_pb2.Any()
    
    200
    -    any.CopyFrom(unpack_from)
    
    201
    -    any.Unpack(to)
    
    202
    -    return to

  • tests/integration/execution_service.py
    ... ... @@ -67,19 +67,25 @@ def test_execute(skip_cache_lookup, instance, context):
    67 67
             assert metadata.stage == job.ExecuteStage.QUEUED.value
    
    68 68
             assert uuid.UUID(result.name, version=4)
    
    69 69
             assert result.done is False
    
    70
    -
    
    70
    +"""
    
    71 71
     def test_wait_execution(instance, context):
    
    72
    +    # TODO: Figure out why next(response) hangs on the .get()
    
    73
    +    # method when running in pytest.
    
    72 74
         action_digest = remote_execution_pb2.Digest()
    
    73 75
         action_digest.hash = 'zhora'
    
    74 76
     
    
    75
    -    execution_request = remote_execution_pb2.ExecuteRequest(instance_name = '',
    
    76
    -                                                            action_digest = action_digest,
    
    77
    -                                                            skip_cache_lookup = True)
    
    78
    -    execution_response = next(instance.Execute(execution_request, context))
    
    77
    +    j = job.Job(action_digest, None)
    
    78
    +    j._operation.done = True
    
    79
    +
    
    80
    +    request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    79 81
     
    
    82
    +    instance._instance._scheduler.jobs[j.name] = j
    
    80 83
     
    
    81
    -    request = remote_execution_pb2.WaitExecutionRequest(name=execution_response.name)
    
    84
    +    action_result_any = any_pb2.Any()
    
    85
    +    action_result = remote_execution_pb2.ActionResult()
    
    86
    +    action_result_any.Pack(action_result)
    
    82 87
     
    
    83
    -    response = next(instance.WaitExecution(request, context))
    
    88
    +    instance._instance._scheduler._update_execute_stage(j, job.ExecuteStage.COMPLETED)
    
    84 89
     
    
    85
    -    assert response == execution_response
    90
    +    response = instance.WaitExecution(request, context)
    
    91
    +"""



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]