[Notes] [Git][BuildGrid/buildgrid][raoul/126-bot-reconnects] 6 commits: job.py: Expose its ActionResult origin



Title: GitLab

Martin Blanchard pushed to branch raoul/126-bot-reconnects at BuildGrid / buildgrid

Commits:

7 changed files:

Changes:

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -141,13 +141,10 @@ def run_dummy(context):
    141 141
         """
    
    142 142
         Creates a session, accepts leases, does fake work and updates the server.
    
    143 143
         """
    
    144
    -    try:
    
    145
    -        bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
    
    146
    -                                         dummy.work_dummy, context)
    
    147
    -        b = bot.Bot(bot_session, context.update_period)
    
    148
    -        b.session()
    
    149
    -    except KeyboardInterrupt:
    
    150
    -        pass
    
    144
    +    bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
    
    145
    +                                     dummy.work_dummy, context, context.update_period)
    
    146
    +    b = bot.Bot(bot_session)
    
    147
    +    b.session()
    
    151 148
     
    
    152 149
     
    
    153 150
     @cli.command('host-tools', short_help="Runs commands using the host's tools.")
    
    ... ... @@ -157,13 +154,10 @@ def run_host_tools(context):
    157 154
         Downloads inputs from CAS, runs build commands using host-tools and uploads
    
    158 155
         result back to CAS.
    
    159 156
         """
    
    160
    -    try:
    
    161
    -        bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
    
    162
    -                                         host.work_host_tools, context)
    
    163
    -        b = bot.Bot(bot_session, context.update_period)
    
    164
    -        b.session()
    
    165
    -    except KeyboardInterrupt:
    
    166
    -        pass
    
    157
    +    bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
    
    158
    +                                     host.work_host_tools, context, context.update_period)
    
    159
    +    b = bot.Bot(bot_session)
    
    160
    +    b.session()
    
    167 161
     
    
    168 162
     
    
    169 163
     @cli.command('buildbox', short_help="Run commands using the BuildBox tool.")
    
    ... ... @@ -179,10 +173,7 @@ def run_buildbox(context, local_cas, fuse_dir):
    179 173
         context.local_cas = local_cas
    
    180 174
         context.fuse_dir = fuse_dir
    
    181 175
     
    
    182
    -    try:
    
    183
    -        bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
    
    184
    -                                         buildbox.work_buildbox, context)
    
    185
    -        b = bot.Bot(bot_session, context.update_period)
    
    186
    -        b.session()
    
    187
    -    except KeyboardInterrupt:
    
    188
    -        pass
    176
    +    bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
    
    177
    +                                     buildbox.work_buildbox, context, context.update_period)
    
    178
    +    b = bot.Bot(bot_session)
    
    179
    +    b.session()

  • buildgrid/bot/bot.py
    ... ... @@ -20,13 +20,10 @@ import logging
    20 20
     class Bot:
    
    21 21
         """Creates a local BotSession."""
    
    22 22
     
    
    23
    -    def __init__(self, bot_session, update_period=1):
    
    24
    -        """
    
    25
    -        """
    
    23
    +    def __init__(self, bot_session):
    
    26 24
             self.__logger = logging.getLogger(__name__)
    
    27 25
     
    
    28 26
             self.__bot_session = bot_session
    
    29
    -        self.__update_period = update_period
    
    30 27
     
    
    31 28
             self.__loop = None
    
    32 29
     
    
    ... ... @@ -34,28 +31,16 @@ class Bot:
    34 31
             """Will create a session and periodically call the server."""
    
    35 32
     
    
    36 33
             self.__loop = asyncio.get_event_loop()
    
    37
    -        self.__bot_session.create_bot_session()
    
    38 34
     
    
    39 35
             try:
    
    40
    -            task = asyncio.ensure_future(self.__update_bot_session())
    
    36
    +            task = asyncio.ensure_future(self.__bot_session.run())
    
    41 37
                 self.__loop.run_until_complete(task)
    
    42
    -
    
    43 38
             except KeyboardInterrupt:
    
    44 39
                 pass
    
    45 40
     
    
    46 41
             self.__kill_everyone()
    
    47 42
             self.__logger.info("Bot shutdown.")
    
    48 43
     
    
    49
    -    async def __update_bot_session(self):
    
    50
    -        """Calls the server periodically to inform the server the client has not died."""
    
    51
    -        try:
    
    52
    -            while True:
    
    53
    -                self.__bot_session.update_bot_session()
    
    54
    -                await asyncio.sleep(self.__update_period)
    
    55
    -
    
    56
    -        except asyncio.CancelledError:
    
    57
    -            pass
    
    58
    -
    
    59 44
         def __kill_everyone(self):
    
    60 45
             """Cancels and waits for them to stop."""
    
    61 46
             self.__logger.info("Cancelling remaining tasks...")
    

  • buildgrid/bot/interface.py
    ... ... @@ -37,22 +37,25 @@ class BotInterface:
    37 37
             self._stub = bots_pb2_grpc.BotsStub(channel)
    
    38 38
     
    
    39 39
         def create_bot_session(self, parent, bot_session):
    
    40
    +        """ Create bot session request
    
    41
    +        Returns BotSession if correct else a grpc StatusCode
    
    42
    +        """
    
    40 43
             request = bots_pb2.CreateBotSessionRequest(parent=parent,
    
    41 44
                                                        bot_session=bot_session)
    
    42
    -        try:
    
    43
    -            return self._stub.CreateBotSession(request)
    
    44
    -
    
    45
    -        except grpc.RpcError as e:
    
    46
    -            self.__logger.error(e)
    
    47
    -            raise
    
    45
    +        return self._bot_call(self._stub.CreateBotSession, request)
    
    48 46
     
    
    49 47
         def update_bot_session(self, bot_session, update_mask=None):
    
    48
    +        """ Update bot session request
    
    49
    +        Returns BotSession if correct else a grpc StatusCode
    
    50
    +        """
    
    50 51
             request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
    
    51 52
                                                        bot_session=bot_session,
    
    52 53
                                                        update_mask=update_mask)
    
    53
    -        try:
    
    54
    -            return self._stub.UpdateBotSession(request)
    
    54
    +        return self._bot_call(self._stub.UpdateBotSession, request)
    
    55 55
     
    
    56
    +    def _bot_call(self, call, request):
    
    57
    +        try:
    
    58
    +            return call(request)
    
    56 59
             except grpc.RpcError as e:
    
    57
    -            self.__logger.error(e)
    
    58
    -            raise
    60
    +            self.__logger.error(e.code())
    
    61
    +            return e.code()

  • buildgrid/bot/session.py
    ... ... @@ -19,8 +19,10 @@ Bot Session
    19 19
     
    
    20 20
     Allows connections
    
    21 21
     """
    
    22
    +import asyncio
    
    22 23
     import logging
    
    23 24
     import platform
    
    25
    +import grpc
    
    24 26
     
    
    25 27
     from buildgrid._enums import BotStatus, LeaseState
    
    26 28
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    ... ... @@ -32,7 +34,8 @@ from .tenantmanager import TenantManager
    32 34
     
    
    33 35
     
    
    34 36
     class BotSession:
    
    35
    -    def __init__(self, parent, bots_interface, hardware_interface, work, context=None):
    
    37
    +    def __init__(self, parent, bots_interface, hardware_interface, work,
    
    38
    +                 context=None, update_period=1):
    
    36 39
             """ Unique bot ID within the farm used to identify this bot
    
    37 40
             Needs to be human readable.
    
    38 41
             All prior sessions with bot_id of same ID are invalidated.
    
    ... ... @@ -54,14 +57,37 @@ class BotSession:
    54 57
             self._work = work
    
    55 58
             self._context = context
    
    56 59
     
    
    60
    +        self.__connected = False
    
    61
    +        self.__update_period = update_period
    
    62
    +
    
    57 63
         @property
    
    58 64
         def bot_id(self):
    
    59 65
             return self.__bot_id
    
    60 66
     
    
    67
    +    @property
    
    68
    +    def connected(self):
    
    69
    +        return self.__connected
    
    70
    +
    
    71
    +    async def run(self):
    
    72
    +        try:
    
    73
    +            while True:
    
    74
    +                if not self.connected:
    
    75
    +                    self.create_bot_session()
    
    76
    +                else:
    
    77
    +                    self.update_bot_session()
    
    78
    +
    
    79
    +                await asyncio.sleep(self.__update_period)
    
    80
    +        except asyncio.CancelledError:
    
    81
    +            pass
    
    82
    +
    
    61 83
         def create_bot_session(self):
    
    62 84
             self.__logger.debug("Creating bot session")
    
    63 85
     
    
    64 86
             session = self._bots_interface.create_bot_session(self.__parent, self.get_pb2())
    
    87
    +        if session in list(grpc.StatusCode):
    
    88
    +            self.__connected = False
    
    89
    +            return
    
    90
    +        self.__connected = True
    
    65 91
             self.__name = session.name
    
    66 92
     
    
    67 93
             self.__logger.info("Created bot session with name: [%s]", self.__name)
    
    ... ... @@ -73,6 +99,10 @@ class BotSession:
    73 99
             self.__logger.debug("Updating bot session: [%s]", self.__bot_id)
    
    74 100
     
    
    75 101
             session = self._bots_interface.update_bot_session(self.get_pb2())
    
    102
    +        if session in list(grpc.StatusCode):
    
    103
    +            self.__connected = False
    
    104
    +            return
    
    105
    +        self.__connected = True
    
    76 106
             server_ids = []
    
    77 107
     
    
    78 108
             for lease in session.leases:
    

  • buildgrid/server/instance.py
    ... ... @@ -71,7 +71,10 @@ class BuildGridServer:
    71 71
             self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT)
    
    72 72
             self.__print_log_records = True
    
    73 73
     
    
    74
    +        self.__build_metadata_queues = None
    
    75
    +
    
    74 76
             self.__state_monitoring_task = None
    
    77
    +        self.__build_monitoring_tasks = None
    
    75 78
             self.__logging_task = None
    
    76 79
     
    
    77 80
             # We always want a capabilities service
    
    ... ... @@ -95,6 +98,8 @@ class BuildGridServer:
    95 98
                     self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
    
    96 99
                     serialisation_format=MonitoringOutputFormat.JSON)
    
    97 100
     
    
    101
    +            self.__build_monitoring_tasks = []
    
    102
    +
    
    98 103
             # Setup the main logging handler:
    
    99 104
             root_logger = logging.getLogger()
    
    100 105
     
    
    ... ... @@ -119,6 +124,18 @@ class BuildGridServer:
    119 124
                     self._state_monitoring_worker(period=MONITORING_PERIOD),
    
    120 125
                     loop=self.__main_loop)
    
    121 126
     
    
    127
    +            self.__build_monitoring_tasks.clear()
    
    128
    +            for instance_name, scheduler in self._schedulers.items():
    
    129
    +                if not scheduler.is_instrumented:
    
    130
    +                    continue
    
    131
    +
    
    132
    +                message_queue = janus.Queue(loop=self.__main_loop)
    
    133
    +                scheduler.register_build_metadata_watcher(message_queue.sync_q)
    
    134
    +
    
    135
    +                self.__build_monitoring_tasks.append(asyncio.ensure_future(
    
    136
    +                    self._build_monitoring_worker(instance_name, message_queue),
    
    137
    +                    loop=self.__main_loop))
    
    138
    +
    
    122 139
             self.__logging_task = asyncio.ensure_future(
    
    123 140
                 self._logging_worker(), loop=self.__main_loop)
    
    124 141
     
    
    ... ... @@ -132,6 +149,10 @@ class BuildGridServer:
    132 149
                 if self.__state_monitoring_task is not None:
    
    133 150
                     self.__state_monitoring_task.cancel()
    
    134 151
     
    
    152
    +            for build_monitoring_task in self.__build_monitoring_tasks:
    
    153
    +                build_monitoring_task.cancel()
    
    154
    +            self.__build_monitoring_tasks.clear()
    
    155
    +
    
    135 156
                 self.__monitoring_bus.stop()
    
    136 157
     
    
    137 158
             if self.__logging_task is not None:
    
    ... ... @@ -352,6 +373,60 @@ class BuildGridServer:
    352 373
     
    
    353 374
             return log_record
    
    354 375
     
    
    376
    +    async def _build_monitoring_worker(self, instance_name, message_queue):
    
    377
    +        """Publishes builds metadata to the monitoring bus."""
    
    378
    +        async def __build_monitoring_worker():
    
    379
    +            metadata, context = await message_queue.async_q.get()
    
    380
    +
    
    381
    +            context.update({'instance-name': instance_name or ''})
    
    382
    +
    
    383
    +            # Emit build inputs fetching time record:
    
    384
    +            fetch_start = metadata.input_fetch_start_timestamp.ToDatetime()
    
    385
    +            fetch_completed = metadata.input_fetch_completed_timestamp.ToDatetime()
    
    386
    +            input_fetch_time = fetch_completed - fetch_start
    
    387
    +            timer_record = self._forge_timer_metric_record(
    
    388
    +                MetricRecordDomain.BUILD, 'inputs-fetching-time', input_fetch_time,
    
    389
    +                metadata=context)
    
    390
    +
    
    391
    +            await self.__monitoring_bus.send_record(timer_record)
    
    392
    +
    
    393
    +            # Emit build execution time record:
    
    394
    +            execution_start = metadata.execution_start_timestamp.ToDatetime()
    
    395
    +            execution_completed = metadata.execution_completed_timestamp.ToDatetime()
    
    396
    +            execution_time = execution_completed - execution_start
    
    397
    +            timer_record = self._forge_timer_metric_record(
    
    398
    +                MetricRecordDomain.BUILD, 'execution-time', execution_time,
    
    399
    +                metadata=context)
    
    400
    +
    
    401
    +            await self.__monitoring_bus.send_record(timer_record)
    
    402
    +
    
    403
    +            # Emit build outputs uploading time record:
    
    404
    +            upload_start = metadata.output_upload_start_timestamp.ToDatetime()
    
    405
    +            upload_completed = metadata.output_upload_completed_timestamp.ToDatetime()
    
    406
    +            output_upload_time = upload_completed - upload_start
    
    407
    +            timer_record = self._forge_timer_metric_record(
    
    408
    +                MetricRecordDomain.BUILD, 'outputs-uploading-time', output_upload_time,
    
    409
    +                metadata=context)
    
    410
    +
    
    411
    +            await self.__monitoring_bus.send_record(timer_record)
    
    412
    +
    
    413
    +            # Emit total build handling time record:
    
    414
    +            queued = metadata.queued_timestamp.ToDatetime()
    
    415
    +            worker_completed = metadata.worker_completed_timestamp.ToDatetime()
    
    416
    +            total_handling_time = worker_completed - queued
    
    417
    +            timer_record = self._forge_timer_metric_record(
    
    418
    +                MetricRecordDomain.BUILD, 'total-handling-time', total_handling_time,
    
    419
    +                metadata=context)
    
    420
    +
    
    421
    +            await self.__monitoring_bus.send_record(timer_record)
    
    422
    +
    
    423
    +        try:
    
    424
    +            while True:
    
    425
    +                await __build_monitoring_worker()
    
    426
    +
    
    427
    +        except asyncio.CancelledError:
    
    428
    +            pass
    
    429
    +
    
    355 430
         async def _state_monitoring_worker(self, period=1.0):
    
    356 431
             """Periodically publishes state metrics to the monitoring bus."""
    
    357 432
             async def __state_monitoring_worker():
    
    ... ... @@ -463,7 +538,7 @@ class BuildGridServer:
    463 538
             n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
    
    464 539
             gauge_record = self._forge_gauge_metric_record(
    
    465 540
                 MetricRecordDomain.STATE, 'clients-count', n_clients,
    
    466
    -            metadata={'instance-name': instance_name or 'void'})
    
    541
    +            metadata={'instance-name': instance_name or ''})
    
    467 542
     
    
    468 543
             return n_clients, gauge_record
    
    469 544
     
    
    ... ... @@ -480,7 +555,7 @@ class BuildGridServer:
    480 555
             n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
    
    481 556
             gauge_record = self._forge_gauge_metric_record(
    
    482 557
                 MetricRecordDomain.STATE, 'bots-count', n_bots,
    
    483
    -            metadata={'instance-name': instance_name or 'void'})
    
    558
    +            metadata={'instance-name': instance_name or ''})
    
    484 559
     
    
    485 560
             return n_bots, gauge_record
    
    486 561
     
    
    ... ... @@ -498,6 +573,6 @@ class BuildGridServer:
    498 573
             am_queue_time = self._schedulers[instance_name].query_am_queue_time()
    
    499 574
             timer_record = self._forge_timer_metric_record(
    
    500 575
                 MetricRecordDomain.STATE, 'average-queue-time', am_queue_time,
    
    501
    -            metadata={'instance-name': instance_name or 'void'})
    
    576
    +            metadata={'instance-name': instance_name or ''})
    
    502 577
     
    
    503 578
             return am_queue_time, timer_record

  • buildgrid/server/job.py
    ... ... @@ -83,6 +83,13 @@ class Job:
    83 83
             else:
    
    84 84
                 return None
    
    85 85
     
    
    86
    +    @property
    
    87
    +    def holds_cached_action_result(self):
    
    88
    +        if self.__execute_response is not None:
    
    89
    +            return self.__execute_response.cached_result
    
    90
    +        else:
    
    91
    +            return False
    
    92
    +
    
    86 93
         @property
    
    87 94
         def operation(self):
    
    88 95
             return self._operation
    

  • buildgrid/server/scheduler.py
    ... ... @@ -34,6 +34,8 @@ class Scheduler:
    34 34
         def __init__(self, action_cache=None, monitor=False):
    
    35 35
             self.__logger = logging.getLogger(__name__)
    
    36 36
     
    
    37
    +        self.__build_metadata_queues = None
    
    38
    +
    
    37 39
             self.__operations_by_stage = None
    
    38 40
             self.__leases_by_state = None
    
    39 41
             self.__queue_time_average = None
    
    ... ... @@ -46,6 +48,8 @@ class Scheduler:
    46 48
             self._is_instrumented = monitor
    
    47 49
     
    
    48 50
             if self._is_instrumented:
    
    51
    +            self.__build_metadata_queues = []
    
    52
    +
    
    49 53
                 self.__operations_by_stage = {}
    
    50 54
                 self.__leases_by_state = {}
    
    51 55
                 self.__queue_time_average = 0, timedelta()
    
    ... ... @@ -228,6 +232,10 @@ class Scheduler:
    228 232
         def is_instrumented(self):
    
    229 233
             return self._is_instrumented
    
    230 234
     
    
    235
    +    def register_build_metadata_watcher(self, message_queue):
    
    236
    +        if self.__build_metadata_queues is not None:
    
    237
    +            self.__build_metadata_queues.append(message_queue)
    
    238
    +
    
    231 239
         def query_n_jobs(self):
    
    232 240
             return len(self.jobs)
    
    233 241
     
    
    ... ... @@ -319,3 +327,12 @@ class Scheduler:
    319 327
                         average_time = average_time + ((queue_time - average_time) / average_order)
    
    320 328
     
    
    321 329
                     self.__queue_time_average = average_order, average_time
    
    330
    +
    
    331
    +                if not job.holds_cached_action_result:
    
    332
    +                    execution_metadata = job.action_result.execution_metadata
    
    333
    +                    context_metadata = {'job-is': job.name}
    
    334
    +
    
    335
    +                    message = (execution_metadata, context_metadata,)
    
    336
    +
    
    337
    +                    for message_queue in self.__build_metadata_queues:
    
    338
    +                        message_queue.put(message)



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]