[Notes] [Git][BuildGrid/buildgrid][finn/48-cancellation-leases] Working commit.



Title: GitLab

finn pushed to branch finn/48-cancellation-leases at BuildGrid / buildgrid

Commits:

6 changed files:

Changes:

  • buildgrid/_app/bots/dummy.py
    ... ... @@ -13,14 +13,15 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +import asyncio
    
    16 17
     import random
    
    17 18
     import time
    
    19
    +import sys
    
    18 20
     
    
    19 21
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    20 22
     from buildgrid.utils import get_hostname
    
    21 23
     
    
    22
    -
    
    23
    -def work_dummy(context, lease):
    
    24
    +def work_dummy(lease, context, event):
    
    24 25
         """ Just returns lease after some random time
    
    25 26
         """
    
    26 27
         action_result = remote_execution_pb2.ActionResult()
    
    ... ... @@ -46,4 +47,10 @@ def work_dummy(context, lease):
    46 47
     
    
    47 48
         lease.result.Pack(action_result)
    
    48 49
     
    
    50
    +    while True:
    
    51
    +        if event.is_set():
    
    52
    +            return lease
    
    53
    +        print("^^")
    
    54
    +        time.sleep(2)
    
    55
    +
    
    49 56
         return lease

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -28,8 +28,11 @@ from urllib.parse import urlparse
    28 28
     import click
    
    29 29
     import grpc
    
    30 30
     
    
    31
    -from buildgrid.bot import bot, bot_interface
    
    32
    -from buildgrid.bot.bot_session import BotSession, Device, Worker
    
    31
    +from buildgrid.bot import bot, interface, session
    
    32
    +from buildgrid.bot.hardware.interface import HardwareInterface
    
    33
    +from buildgrid.bot.hardware.device import Device
    
    34
    +from buildgrid.bot.hardware.worker import Worker
    
    35
    +
    
    33 36
     
    
    34 37
     from ..bots import buildbox, dummy, host
    
    35 38
     from ..cli import pass_context
    
    ... ... @@ -123,13 +126,14 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
    123 126
         context.logger = logging.getLogger(__name__)
    
    124 127
         context.logger.debug("Starting for remote {}".format(context.remote))
    
    125 128
     
    
    126
    -    interface = bot_interface.BotInterface(context.channel)
    
    129
    +    bot_interface = interface.BotInterface(context.channel)
    
    127 130
     
    
    128 131
         worker = Worker()
    
    129 132
         worker.add_device(Device())
    
    130 133
     
    
    131
    -    bot_session = BotSession(parent, interface)
    
    132
    -    bot_session.add_worker(worker)
    
    134
    +    hardware_interface = HardwareInterface(worker)
    
    135
    +
    
    136
    +    bot_session = session.BotSession(parent, bot_interface, hardware_interface)
    
    133 137
     
    
    134 138
         context.bot_session = bot_session
    
    135 139
     
    
    ... ... @@ -142,8 +146,7 @@ def run_dummy(context):
    142 146
         """
    
    143 147
         try:
    
    144 148
             b = bot.Bot(context.bot_session, context.update_period)
    
    145
    -        b.session(dummy.work_dummy,
    
    146
    -                  context)
    
    149
    +        b.session(dummy.work_dummy, context)
    
    147 150
         except KeyboardInterrupt:
    
    148 151
             pass
    
    149 152
     
    

  • buildgrid/bot/session.py
    ... ... @@ -101,6 +101,7 @@ class BotSession:
    101 101
     
    
    102 102
             closed_lease_ids = [x for x in self._tenant_manager.get_lease_ids() if x not in server_ids]
    
    103 103
             for lease_id in closed_lease_ids:
    
    104
    +            self._tenant_manager.cancel_tenancy(lease_id)
    
    104 105
                 self._tenant_manager.remove_tenant(lease_id)
    
    105 106
     
    
    106 107
         def get_pb2(self):
    

  • buildgrid/bot/tenant.py
    ... ... @@ -21,12 +21,16 @@ Handles leased and runs leased work.
    21 21
     
    
    22 22
     import asyncio
    
    23 23
     import logging
    
    24
    +import threading
    
    25
    +
    
    26
    +import grpc
    
    24 27
     
    
    25 28
     from functools import partial
    
    26 29
     
    
    27 30
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    28
    -
    
    31
    +from buildgrid._protos.google.rpc import code_pb2
    
    29 32
     from buildgrid._enums import LeaseState
    
    33
    +from buildgrid._exceptions import BotError
    
    30 34
     
    
    31 35
     
    
    32 36
     class Tenant:
    
    ... ... @@ -37,14 +41,28 @@ class Tenant:
    37 41
                 raise ValueError("Lease state not `PENDING`: {}".format(lease.state))
    
    38 42
     
    
    39 43
             self.logger = logging.getLogger(__name__)
    
    40
    -        self.lease_finished = False
    
    41 44
     
    
    42 45
             self._lease = lease
    
    43 46
     
    
    47
    +        self.__lease_cancelled = False
    
    48
    +        self.__tenant_completed = False
    
    49
    +
    
    44 50
         @property
    
    45 51
         def lease(self):
    
    46 52
             return self._lease
    
    47 53
     
    
    54
    +    @property
    
    55
    +    def tenant_completed(self):
    
    56
    +        return self.__tenant_completed
    
    57
    +
    
    58
    +    @property
    
    59
    +    def lease_cancelled(self):
    
    60
    +        return self.__lease_cancelled
    
    61
    +
    
    62
    +    def cancel_lease(self):
    
    63
    +        self.__lease_cancelled = True
    
    64
    +        self.update_lease_state(LeaseState.CANCELLED)
    
    65
    +
    
    48 66
         def get_lease_state(self):
    
    49 67
             return LeaseState(self._lease.state)
    
    50 68
     
    
    ... ... @@ -64,11 +82,15 @@ class Tenant:
    64 82
             loop = asyncio.get_event_loop()
    
    65 83
     
    
    66 84
             try:
    
    67
    -            lease = await loop.run_in_executor(executor, partial(work, context, self._lease))
    
    85
    +            event = threading.Event()
    
    86
    +            lease = await loop.run_in_executor(executor, partial(work, self._lease, context, event))
    
    68 87
                 self._lease.CopyFrom(lease)
    
    69 88
     
    
    70
    -        except asyncio.CancelledError as e:
    
    71
    -            self.logger.error("Task cancelled: [{}]".format(e))
    
    89
    +        except asyncio.CancelledError:
    
    90
    +            self.logger.error("Lease cancelled: [{}]".format(self._lease.id))
    
    91
    +            event.set()
    
    92
    +            # Propagate error to task wrapper
    
    93
    +            raise
    
    72 94
     
    
    73 95
             except grpc.RpcError as e:
    
    74 96
                 self.logger.error("RPC error thrown: [{}]".format(e))
    
    ... ... @@ -82,4 +104,5 @@ class Tenant:
    82 104
                 self.logger.error("Exception thrown: [{}]".format(e))
    
    83 105
                 lease.status.code = code_pb2.INTERNAL
    
    84 106
     
    
    107
    +        self.__tenant_completed = True
    
    85 108
             self.logger.debug("Work completed: [{}]".format(lease.id))

  • buildgrid/bot/tenantmanager.py
    ... ... @@ -50,14 +50,20 @@ class TenantManager:
    50 50
     
    
    51 51
         def remove_tenant(self, lease_id):
    
    52 52
             state = self.get_lease_state(lease_id)
    
    53
    -        if state == LeaseState.PENDING or state == LeaseState.ACTIVE:
    
    54
    -            self.logger.error("Attempting to remove a lease not finished."
    
    55
    -                              "Bot will not remove lease."
    
    56
    -                              "Lease: [{}]".format(self._tenants[lease_id].lease))
    
    57 53
     
    
    58
    -        else:
    
    59
    -            self._tenants.pop(lease_id)
    
    60
    -            self._tasks.pop(lease_id)
    
    54
    +        if not self._tenants[lease_id].lease_cancelled:
    
    55
    +            self.logger.error("Attempting to remove a lease not cancelled."
    
    56
    +                              "Bot will attempt to cancel lease."
    
    57
    +                              "Lease id=[{}]".format(lease_id))
    
    58
    +            self.cancel_tenancy(lease_id)
    
    59
    +
    
    60
    +        elif not self._tenants[lease_id].tenant_completed:
    
    61
    +            self.logger.debug("Lease cancelled but tenant not completed."
    
    62
    +                              "Lease=[{}]".format(self._tenants[lease_id].lease))
    
    63
    +
    
    64
    +        self.logger.debug("Removing tenant=[{}]".format(lease_id))
    
    65
    +        self._tenants.pop(lease_id)
    
    66
    +        self._tasks.pop(lease_id)
    
    61 67
     
    
    62 68
         def get_leases(self):
    
    63 69
             leases = []
    
    ... ... @@ -79,8 +85,9 @@ class TenantManager:
    79 85
             if status is not None:
    
    80 86
                 self._update_lease_status(lease_id, status)
    
    81 87
     
    
    82
    -        if self._tenants[lease_id].get_lease_state() != LeaseState.CANCELLED:
    
    83
    -            self._update_lease_state(lease_id, LeaseState.COMPLETED)
    
    88
    +        if task:
    
    89
    +            if not task.cancelled():
    
    90
    +                self._update_lease_state(lease_id, LeaseState.COMPLETED)
    
    84 91
     
    
    85 92
         def create_work(self, lease_id, work, context):
    
    86 93
             self._update_lease_state(lease_id, LeaseState.ACTIVE)
    
    ... ... @@ -92,8 +99,12 @@ class TenantManager:
    92 99
             self._tasks[lease_id] = task
    
    93 100
     
    
    94 101
         def cancel_tenancy(self, lease_id):
    
    95
    -        self._update_lease_state(LeaseState.CANCELLED)
    
    96
    -        self._tasks[lease_id].cancel()
    
    102
    +        if not self._tenants[lease_id].lease_cancelled:
    
    103
    +            self._tenants[lease_id].cancel_lease()
    
    104
    +            self._tasks[lease_id].cancel()
    
    105
    +
    
    106
    +    def tenant_completed(self, lease_id):
    
    107
    +        return self._tenants[lease_id].tenant_completed
    
    97 108
     
    
    98 109
         def _update_lease_state(self, lease_id, state):
    
    99 110
             self._tenants[lease_id].update_lease_state(state)
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -34,7 +34,7 @@ class BotsInterface:
    34 34
             self.logger = logging.getLogger(__name__)
    
    35 35
     
    
    36 36
             self._bot_ids = {}
    
    37
    -        self._bot_sessions = {}
    
    37
    +        self._assigned_leases = {}
    
    38 38
             self._scheduler = scheduler
    
    39 39
     
    
    40 40
         def register_instance_with_server(self, instance_name, server):
    
    ... ... @@ -59,18 +59,15 @@ class BotsInterface:
    59 59
     
    
    60 60
             # Bot session name, selected by the server
    
    61 61
             name = "{}/{}".format(parent, str(uuid.uuid4()))
    
    62
    -
    
    63 62
             bot_session.name = name
    
    64 63
     
    
    65 64
             self._bot_ids[name] = bot_id
    
    66
    -        self._bot_sessions[name] = bot_session
    
    67 65
             self.logger.info("Created bot session name=[{}] with bot_id=[{}]".format(name, bot_id))
    
    68 66
     
    
    69
    -        # TODO: Send worker capabilities to the scheduler!
    
    70
    -        leases = self._scheduler.request_job_leases({})
    
    71
    -        if leases:
    
    72
    -            bot_session.leases.extend(leases)
    
    67
    +        # We want to keep a copy of lease ids we have assigned
    
    68
    +        self._assigned_leases[name] = set()
    
    73 69
     
    
    70
    +        self._request_leases(bot_session)
    
    74 71
             return bot_session
    
    75 72
     
    
    76 73
         def update_bot_session(self, name, bot_session):
    
    ... ... @@ -79,39 +76,53 @@ class BotsInterface:
    79 76
             """
    
    80 77
             self.logger.debug("Updating bot session name={}".format(name))
    
    81 78
             self._check_bot_ids(bot_session.bot_id, name)
    
    82
    -
    
    83
    -        leases = filter(None, [self._check_lease_state(lease) for lease in bot_session.leases])
    
    79
    +        self._check_assigned_leases(bot_session)
    
    84 80
     
    
    85 81
             for lease in bot_session.leases:
    
    86
    -            lease.Clear()
    
    87
    -
    
    88
    -        bot_session.leases.extend(leases)
    
    82
    +            checked_lease = self._check_lease_state(lease)
    
    83
    +            if not checked_lease:
    
    84
    +                # TODO: Make sure we don't need this
    
    85
    +                try:
    
    86
    +                    self._assigned_leases[name].remove(lease.id)
    
    87
    +                except KeyError:
    
    88
    +                    pass
    
    89
    +                lease.Clear()
    
    90
    +
    
    91
    +        self._request_leases(bot_session)
    
    92
    +        return bot_session
    
    89 93
     
    
    94
    +    def _request_leases(self, bot_session):
    
    90 95
             # TODO: Send worker capabilities to the scheduler!
    
    96
    +        # Only send one lease at a time currently.
    
    91 97
             if not bot_session.leases:
    
    92 98
                 leases = self._scheduler.request_job_leases({})
    
    93 99
                 if leases:
    
    100
    +                for lease in leases:
    
    101
    +                    self._assigned_leases[bot_session.name].add(lease.id)
    
    94 102
                     bot_session.leases.extend(leases)
    
    95 103
     
    
    96
    -        self._bot_sessions[name] = bot_session
    
    97
    -        return bot_session
    
    98
    -
    
    99 104
         def _check_lease_state(self, lease):
    
    105
    +        # careful here
    
    106
    +        # should store bot name in scheduler
    
    107
    +        lease_state = LeaseState(lease.state)
    
    108
    +
    
    109
    +        # Lease has replied with cancelled, remove
    
    110
    +        if lease_state == LeaseState.CANCELLED:
    
    111
    +            return None
    
    100 112
     
    
    101
    -        # Check for cancelled lease
    
    102
    -        if self._scheduler.get_lease_cancelled(lease.id):
    
    113
    +        try:
    
    114
    +            if self._scheduler.get_job_lease_cancelled(lease.id):
    
    115
    +                lease.state.CopyFrom(LeaseState.CANCELLED.value)
    
    116
    +                return lease
    
    117
    +        except KeyError:
    
    118
    +            # Job does not exist, remove from bot.
    
    103 119
                 return None
    
    104 120
     
    
    105
    -        # If not cancelled, update the status
    
    106 121
             self._scheduler.update_job_lease(lease)
    
    107 122
     
    
    108
    -        lease_state = LeaseState(lease.state)
    
    109 123
             if lease_state == LeaseState.COMPLETED:
    
    110 124
                 return None
    
    111 125
     
    
    112
    -        elif lease_state == LeaseState.CANCELLED:
    
    113
    -            return None
    
    114
    -
    
    115 126
             return lease
    
    116 127
     
    
    117 128
         def _check_bot_ids(self, bot_id, name=None):
    
    ... ... @@ -134,6 +145,19 @@ class BotsInterface:
    134 145
                             'Bot id already registered. ID sent: [{}].'
    
    135 146
                             'Id registered: [{}] with name: [{}]'.format(bot_id, _bot_id, _name))
    
    136 147
     
    
    148
    +    def _check_assigned_leases(self, bot_session):
    
    149
    +        session_lease_ids = []
    
    150
    +
    
    151
    +        for lease in bot_session.leases:
    
    152
    +            session_lease_ids.append(lease.id)
    
    153
    +
    
    154
    +        for lease_id in self._assigned_leases[bot_session.name]:
    
    155
    +            if lease_id not in session_lease_ids:
    
    156
    +                self.logger.error("Assigned lease id=[{}],"
    
    157
    +                                  " not found on bot with name=[{}] and id=[{}]."
    
    158
    +                                  " Retrying job".format(lease_id, bot_session.name, bot_session.bot_id))
    
    159
    +                self._scheduler.retry_job(lease_id)
    
    160
    +
    
    137 161
         def _close_bot_session(self, name):
    
    138 162
             """ Before removing the session, close any leases and
    
    139 163
             requeue with high priority.
    
    ... ... @@ -144,10 +168,9 @@ class BotsInterface:
    144 168
                 raise InvalidArgumentError("Bot id does not exist: [{}]".format(name))
    
    145 169
     
    
    146 170
             self.logger.debug("Attempting to close [{}] with name: [{}]".format(bot_id, name))
    
    147
    -        for lease in self._bot_sessions[name].leases:
    
    148
    -            if lease.state != LeaseState.COMPLETED.value:
    
    149
    -                # TODO: Be wary here, may need to handle rejected leases in future
    
    150
    -                self._scheduler.retry_job(lease.id)
    
    171
    +        for lease_id in self._assigned_leases[name]:
    
    172
    +            self._scheduler.retry_job(lease_id)
    
    173
    +        self._assigned_leases.pop(name)
    
    151 174
     
    
    152 175
             self.logger.debug("Closing bot session: [{}]".format(name))
    
    153 176
             self._bot_ids.pop(name)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]