[Notes] [Git][BuildGrid/buildgrid][725-job-cancellation-on-remote-builds] 3 commits: setup.py: Pin moto dependency and require < 1.3.7



Title: GitLab

Raoul Hidalgo Charman pushed to branch 725-job-cancellation-on-remote-builds at BuildGrid / buildgrid

Commits:

10 changed files:

Changes:

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -52,7 +52,8 @@ from ..cli import pass_context
    52 52
                   help="Public CAS client certificate for TLS (PEM-encoded)")
    
    53 53
     @click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
    
    54 54
                   help="Public CAS server certificate for TLS (PEM-encoded)")
    
    55
    -@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
    
    55
    +# TODO change default to 30
    
    56
    +@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
    
    56 57
                   help="Time period for bot updates to the server in seconds.")
    
    57 58
     @click.option('--parent', type=click.STRING, default='main', show_default=True,
    
    58 59
                   help="Targeted farm resource.")
    
    ... ... @@ -64,7 +65,6 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
    64 65
     
    
    65 66
         context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
    
    66 67
         context.remote_url = remote
    
    67
    -    context.update_period = update_period
    
    68 68
         context.parent = parent
    
    69 69
     
    
    70 70
         if url.scheme == 'http':
    
    ... ... @@ -123,7 +123,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
    123 123
         context.logger = logging.getLogger(__name__)
    
    124 124
         context.logger.debug("Starting for remote {}".format(context.remote))
    
    125 125
     
    
    126
    -    interface = bot_interface.BotInterface(context.channel)
    
    126
    +    interface = bot_interface.BotInterface(context.channel, update_period)
    
    127 127
     
    
    128 128
         worker = Worker()
    
    129 129
         worker.add_device(Device())
    
    ... ... @@ -141,7 +141,7 @@ def run_dummy(context):
    141 141
         Creates a session, accepts leases, does fake work and updates the server.
    
    142 142
         """
    
    143 143
         try:
    
    144
    -        b = bot.Bot(context.bot_session, context.update_period)
    
    144
    +        b = bot.Bot(context.bot_session)
    
    145 145
             b.session(dummy.work_dummy,
    
    146 146
                       context)
    
    147 147
         except KeyboardInterrupt:
    
    ... ... @@ -156,7 +156,7 @@ def run_host_tools(context):
    156 156
         result back to CAS.
    
    157 157
         """
    
    158 158
         try:
    
    159
    -        b = bot.Bot(context.bot_session, context.update_period)
    
    159
    +        b = bot.Bot(context.bot_session)
    
    160 160
             b.session(host.work_host_tools,
    
    161 161
                       context)
    
    162 162
         except KeyboardInterrupt:
    
    ... ... @@ -177,7 +177,7 @@ def run_buildbox(context, local_cas, fuse_dir):
    177 177
         context.fuse_dir = fuse_dir
    
    178 178
     
    
    179 179
         try:
    
    180
    -        b = bot.Bot(context.bot_session, context.update_period)
    
    180
    +        b = bot.Bot(context.bot_session)
    
    181 181
             b.session(buildbox.work_buildbox,
    
    182 182
                       context)
    
    183 183
         except KeyboardInterrupt:
    

  • buildgrid/bot/bot.py
    ... ... @@ -29,19 +29,16 @@ class Bot:
    29 29
         Creates a local BotSession.
    
    30 30
         """
    
    31 31
     
    
    32
    -    def __init__(self, bot_session, update_period=1):
    
    32
    +    def __init__(self, bot_session):
    
    33 33
             self.logger = logging.getLogger(__name__)
    
    34 34
     
    
    35 35
             self._bot_session = bot_session
    
    36
    -        self._update_period = update_period
    
    37 36
     
    
    38 37
         def session(self, work, context):
    
    39 38
             loop = asyncio.get_event_loop()
    
    40 39
     
    
    41
    -        self._bot_session.create_bot_session(work, context)
    
    42
    -
    
    43 40
             try:
    
    44
    -            task = asyncio.ensure_future(self._update_bot_session())
    
    41
    +            task = asyncio.ensure_future(self._bot_session.run(work, context))
    
    45 42
                 loop.run_forever()
    
    46 43
             except KeyboardInterrupt:
    
    47 44
                 pass
    
    ... ... @@ -49,10 +46,19 @@ class Bot:
    49 46
                 task.cancel()
    
    50 47
                 loop.close()
    
    51 48
     
    
    52
    -    async def _update_bot_session(self):
    
    49
    +    async def _run_bot_session(self, work, context):
    
    53 50
             """
    
    54 51
             Calls the server periodically to inform the server the client has not died.
    
    55 52
             """
    
    56 53
             while True:
    
    57
    -            self._bot_session.update_bot_session()
    
    58
    -            await asyncio.sleep(self._update_period)
    54
    +            if self._bot_session.connected is False:
    
    55
    +                self._bot_session.create_bot_session(work, context)
    
    56
    +            else:
    
    57
    +                self._bot_session.update_bot_session()
    
    58
    +
    
    59
    +            if self._bot_session._futures:
    
    60
    +                await asyncio.wait(self._bot_session._futures.values(),
    
    61
    +                                   timeout=30,
    
    62
    +                                   return_when=asyncio.FIRST_COMPLETED)
    
    63
    +            elif self._bot_session.connected is False:
    
    64
    +                await asyncio.sleep(30)

  • buildgrid/bot/bot_interface.py
    ... ... @@ -21,8 +21,10 @@ Interface to grpc
    21 21
     """
    
    22 22
     
    
    23 23
     import logging
    
    24
    +import grpc
    
    24 25
     
    
    25 26
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
    
    27
    +from ..settings import INTERVAL_BUFFER
    
    26 28
     
    
    27 29
     
    
    28 30
     class BotInterface:
    
    ... ... @@ -30,18 +32,27 @@ class BotInterface:
    30 32
         Interface handles calls to the server.
    
    31 33
         """
    
    32 34
     
    
    33
    -    def __init__(self, channel):
    
    35
    +    def __init__(self, channel, interval):
    
    34 36
             self.logger = logging.getLogger(__name__)
    
    35 37
             self.logger.info(channel)
    
    36 38
             self._stub = bots_pb2_grpc.BotsStub(channel)
    
    39
    +        self.interval = interval
    
    37 40
     
    
    38 41
         def create_bot_session(self, parent, bot_session):
    
    39 42
             request = bots_pb2.CreateBotSessionRequest(parent=parent,
    
    40 43
                                                        bot_session=bot_session)
    
    41
    -        return self._stub.CreateBotSession(request)
    
    44
    +        return self._bot_call(self._stub.CreateBotSession, request)
    
    42 45
     
    
    43 46
         def update_bot_session(self, bot_session, update_mask=None):
    
    44 47
             request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
    
    45 48
                                                        bot_session=bot_session,
    
    46 49
                                                        update_mask=update_mask)
    
    47
    -        return self._stub.UpdateBotSession(request)
    50
    +        return self._bot_call(self._stub.UpdateBotSession, request)
    
    51
    +
    
    52
    +    def _bot_call(self, call, request):
    
    53
    +        try:
    
    54
    +            return call(request, timeout=self.interval + INTERVAL_BUFFER)
    
    55
    +        except grpc.RpcError as e:
    
    56
    +            if e.code() in grpc.StatusCode:
    
    57
    +                self.logger.warning("Server responded with error: {}".format(e.code()))
    
    58
    +                return None

  • buildgrid/bot/bot_session.py
    ... ... @@ -49,7 +49,9 @@ class BotSession:
    49 49
             self._bot_id = '{}.{}'.format(parent, platform.node())
    
    50 50
             self._context = None
    
    51 51
             self._interface = interface
    
    52
    +        self.connected = False
    
    52 53
             self._leases = {}
    
    54
    +        self._futures = {}
    
    53 55
             self._name = None
    
    54 56
             self._parent = parent
    
    55 57
             self._status = BotStatus.OK.value
    
    ... ... @@ -63,12 +65,31 @@ class BotSession:
    63 65
         def add_worker(self, worker):
    
    64 66
             self._worker = worker
    
    65 67
     
    
    68
    +    async def run(self, work, context=None):
    
    69
    +        self.logger.info("Starting bot session runner")
    
    70
    +        while True:
    
    71
    +            if self.connected is False:
    
    72
    +                self.create_bot_session(work, context)
    
    73
    +            else:
    
    74
    +                self.update_bot_session()
    
    75
    +
    
    76
    +            if self._futures:
    
    77
    +                await asyncio.wait(self._futures.values(),
    
    78
    +                                   timeout=self._interface.interval,
    
    79
    +                                   return_when=asyncio.FIRST_COMPLETED)
    
    80
    +            elif self.connected is False:
    
    81
    +                await asyncio.sleep(self._interface.interval)
    
    82
    +
    
    66 83
         def create_bot_session(self, work, context=None):
    
    67 84
             self.logger.debug("Creating bot session")
    
    68 85
             self._work = work
    
    69 86
             self._context = context
    
    70 87
     
    
    71 88
             session = self._interface.create_bot_session(self._parent, self.get_pb2())
    
    89
    +        if session is None:
    
    90
    +            self.connected = False
    
    91
    +            return
    
    92
    +        self.connected = True
    
    72 93
             self._name = session.name
    
    73 94
     
    
    74 95
             self.logger.info("Created bot session with name: [{}]".format(self._name))
    
    ... ... @@ -79,6 +100,10 @@ class BotSession:
    79 100
         def update_bot_session(self):
    
    80 101
             self.logger.debug("Updating bot session: [{}]".format(self._bot_id))
    
    81 102
             session = self._interface.update_bot_session(self.get_pb2())
    
    103
    +        if session is None:
    
    104
    +            self.connected = False
    
    105
    +            return
    
    106
    +        self.connected = True
    
    82 107
             for k, v in list(self._leases.items()):
    
    83 108
                 if v.state == LeaseState.COMPLETED.value:
    
    84 109
                     del self._leases[k]
    
    ... ... @@ -110,7 +135,7 @@ class BotSession:
    110 135
                 lease.state = LeaseState.ACTIVE.value
    
    111 136
                 self._leases[lease.id] = lease
    
    112 137
                 self.update_bot_session()
    
    113
    -            asyncio.ensure_future(self.create_work(lease))
    
    138
    +            self._futures[lease.id] = asyncio.ensure_future(self.create_work(lease))
    
    114 139
     
    
    115 140
         async def create_work(self, lease):
    
    116 141
             self.logger.debug("Work created: [{}]".format(lease.id))
    
    ... ... @@ -133,6 +158,7 @@ class BotSession:
    133 158
     
    
    134 159
             self.logger.debug("Work complete: [{}]".format(lease.id))
    
    135 160
             self.lease_completed(lease)
    
    161
    +        del self._futures[lease.id]
    
    136 162
     
    
    137 163
     
    
    138 164
     class Worker:
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -26,6 +26,7 @@ import uuid
    26 26
     from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    27 27
     
    
    28 28
     from ..job import LeaseState
    
    29
    +from ...settings import INTERVAL_BUFFER
    
    29 30
     
    
    30 31
     
    
    31 32
     class BotsInterface:
    
    ... ... @@ -73,7 +74,7 @@ class BotsInterface:
    73 74
     
    
    74 75
             return bot_session
    
    75 76
     
    
    76
    -    def update_bot_session(self, name, bot_session):
    
    77
    +    def update_bot_session(self, name, bot_session, deadline=None):
    
    77 78
             """ Client updates the server. Any changes in state to the Lease should be
    
    78 79
             registered server side. Assigns available leases with work.
    
    79 80
             """
    
    ... ... @@ -87,7 +88,9 @@ class BotsInterface:
    87 88
     
    
    88 89
             # TODO: Send worker capabilities to the scheduler!
    
    89 90
             if not bot_session.leases:
    
    90
    -            leases = self._scheduler.request_job_leases({})
    
    91
    +            leases = self._scheduler.request_job_leases(
    
    92
    +                {}, block=True if deadline else None,
    
    93
    +                timeout=deadline - INTERVAL_BUFFER if deadline else None)
    
    91 94
                 if leases:
    
    92 95
                     bot_session.leases.extend(leases)
    
    93 96
     
    

  • buildgrid/server/bots/service.py
    ... ... @@ -64,8 +64,10 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    64 64
                 instance_name = ''.join(names[0:-1])
    
    65 65
     
    
    66 66
                 instance = self._get_instance(instance_name)
    
    67
    +            # server side context time_remaining is maxint - unix time
    
    67 68
                 return instance.update_bot_session(request.name,
    
    68
    -                                               request.bot_session)
    
    69
    +                                               request.bot_session,
    
    70
    +                                               context.time_remaining())
    
    69 71
     
    
    70 72
             except InvalidArgumentError as e:
    
    71 73
                 self.logger.error(e)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -19,7 +19,7 @@ Scheduler
    19 19
     Schedules jobs.
    
    20 20
     """
    
    21 21
     
    
    22
    -from collections import deque
    
    22
    +from queue import Queue, Empty
    
    23 23
     
    
    24 24
     from buildgrid._exceptions import NotFoundError
    
    25 25
     
    
    ... ... @@ -33,7 +33,7 @@ class Scheduler:
    33 33
         def __init__(self, action_cache=None):
    
    34 34
             self._action_cache = action_cache
    
    35 35
             self.jobs = {}
    
    36
    -        self.queue = deque()
    
    36
    +        self.queue = Queue()
    
    37 37
     
    
    38 38
         def register_client(self, job_name, queue):
    
    39 39
             self.jobs[job_name].register_client(queue)
    
    ... ... @@ -53,7 +53,7 @@ class Scheduler:
    53 53
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    54 54
                 except NotFoundError:
    
    55 55
                     operation_stage = OperationStage.QUEUED
    
    56
    -                self.queue.append(job)
    
    56
    +                self.queue.put(job)
    
    57 57
     
    
    58 58
                 else:
    
    59 59
                     job.set_cached_result(action_result)
    
    ... ... @@ -61,7 +61,7 @@ class Scheduler:
    61 61
     
    
    62 62
             else:
    
    63 63
                 operation_stage = OperationStage.QUEUED
    
    64
    -            self.queue.append(job)
    
    64
    +            self.queue.put(job)
    
    65 65
     
    
    66 66
             job.update_operation_stage(operation_stage)
    
    67 67
     
    
    ... ... @@ -74,12 +74,12 @@ class Scheduler:
    74 74
                     # TODO: Mark these jobs as done
    
    75 75
                 else:
    
    76 76
                     job.update_operation_stage(OperationStage.QUEUED)
    
    77
    -                self.queue.appendleft(job)
    
    77
    +                self.queue.queue.appendleft(job)
    
    78 78
     
    
    79 79
         def list_jobs(self):
    
    80 80
             return self.jobs.values()
    
    81 81
     
    
    82
    -    def request_job_leases(self, worker_capabilities):
    
    82
    +    def request_job_leases(self, worker_capabilities, block=False, timeout=None):
    
    83 83
             """Generates a list of the highest priority leases to be run.
    
    84 84
     
    
    85 85
             Args:
    
    ... ... @@ -87,10 +87,13 @@ class Scheduler:
    87 87
                     worker properties, configuration and state at the time of the
    
    88 88
                     request.
    
    89 89
             """
    
    90
    -        if not self.queue:
    
    90
    +        if not block and self.queue.empty():
    
    91 91
                 return []
    
    92 92
     
    
    93
    -        job = self.queue.popleft()
    
    93
    +        try:
    
    94
    +            job = self.queue.get(block, timeout)
    
    95
    +        except Empty:
    
    96
    +            return []
    
    94 97
             # For now, one lease at a time:
    
    95 98
             lease = job.create_lease()
    
    96 99
     
    

  • buildgrid/settings.py
    ... ... @@ -4,3 +4,6 @@ import hashlib
    4 4
     # The hash function that CAS uses
    
    5 5
     HASH = hashlib.sha256
    
    6 6
     HASH_LENGTH = HASH().digest_size * 2
    
    7
    +
    
    8
    +# time in seconds to pad timeouts
    
    9
    +INTERVAL_BUFFER = 5

  • setup.py
    ... ... @@ -87,7 +87,7 @@ def get_cmdclass():
    87 87
     
    
    88 88
     tests_require = [
    
    89 89
         'coverage >= 4.5.0',
    
    90
    -    'moto',
    
    90
    +    'moto < 1.3.7',
    
    91 91
         'pep8',
    
    92 92
         'psutil',
    
    93 93
         'pytest >= 3.8.0',
    

  • tests/integration/bots_service.py
    ... ... @@ -39,7 +39,9 @@ server = mock.create_autospec(grpc.server)
    39 39
     # GRPC context
    
    40 40
     @pytest.fixture
    
    41 41
     def context():
    
    42
    -    yield mock.MagicMock(spec=_Context)
    
    42
    +    context_mock = mock.MagicMock(spec=_Context)
    
    43
    +    context_mock.time_remaining.return_value = None
    
    44
    +    yield context_mock
    
    43 45
     
    
    44 46
     
    
    45 47
     @pytest.fixture
    
    ... ... @@ -91,7 +93,6 @@ def test_update_bot_session(bot_session, context, instance):
    91 93
     
    
    92 94
         request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    93 95
                                                    bot_session=bot)
    
    94
    -
    
    95 96
         response = instance.UpdateBotSession(request, context)
    
    96 97
     
    
    97 98
         assert isinstance(response, bots_pb2.BotSession)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]