[Notes] [Git][BuildGrid/buildgrid][mablanch/132-gather-state-metrics] 3 commits: execution/service.py: Expose client counts



Title: GitLab

Martin Blanchard pushed to branch mablanch/132-gather-state-metrics at BuildGrid / buildgrid

Commits:

3 changed files:

Changes:

  • buildgrid/server/bots/service.py
    ... ... @@ -23,8 +23,9 @@ import logging
    23 23
     
    
    24 24
     import grpc
    
    25 25
     
    
    26
    -from google.protobuf.empty_pb2 import Empty
    
    26
    +from google.protobuf import empty_pb2, timestamp_pb2
    
    27 27
     
    
    28
    +from buildgrid._enums import BotStatus
    
    28 29
     from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    29 30
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    30 31
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    ... ... @@ -34,20 +35,56 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    34 35
     
    
    35 36
         def __init__(self, server):
    
    36 37
             self.logger = logging.getLogger(__name__)
    
    38
    +        self.__bots_by_status = {}
    
    39
    +        self.__bots_by_instance = {}
    
    40
    +        self.__bots = {}
    
    37 41
     
    
    38 42
             self._instances = {}
    
    39 43
     
    
    40 44
             bots_pb2_grpc.add_BotsServicer_to_server(self, server)
    
    41 45
     
    
    42
    -    def add_instance(self, name, instance):
    
    43
    -        self._instances[name] = instance
    
    46
    +        self.__bots_by_status[BotStatus.OK] = set()
    
    47
    +        self.__bots_by_status[BotStatus.UNHEALTHY] = set()
    
    48
    +        self.__bots_by_status[BotStatus.HOST_REBOOTING] = set()
    
    49
    +        self.__bots_by_status[BotStatus.BOT_TERMINATING] = set()
    
    50
    +
    
    51
    +    @property
    
    52
    +    def n_bots(self):
    
    53
    +        return len(self.__bots)
    
    54
    +
    
    55
    +    @property
    
    56
    +    def n_bots_for_instance(self, instance_name):
    
    57
    +        return self.__bots_by_instance[instance_name]
    
    58
    +
    
    59
    +    @property
    
    60
    +    def n_bots_for_status(self, bot_status):
    
    61
    +        return len(self.__bots_by_status[bot_status])
    
    62
    +
    
    63
    +    # --- Public API ---
    
    64
    +
    
    65
    +    def add_instance(self, instance_name, instance):
    
    66
    +        self.__bots_by_instance[instance_name] = 0
    
    67
    +        self._instances[instance_name] = instance
    
    68
    +
    
    69
    +    # --- Public API: Servicer ---
    
    44 70
     
    
    45 71
         def CreateBotSession(self, request, context):
    
    72
    +        instance_name = request.parent
    
    73
    +        bot_status = BotStatus(request.bot_session.status)
    
    74
    +        bot_id = request.bot_session.bot_id
    
    75
    +
    
    46 76
             try:
    
    47
    -            parent = request.parent
    
    48
    -            instance = self._get_instance(request.parent)
    
    49
    -            return instance.create_bot_session(parent,
    
    50
    -                                               request.bot_session)
    
    77
    +            instance = self._get_instance(instance_name)
    
    78
    +            bot_session = instance.create_bot_session(instance_name,
    
    79
    +                                                      request.bot_session)
    
    80
    +            now = timestamp_pb2.Timestamp()
    
    81
    +            now.GetCurrentTime()
    
    82
    +
    
    83
    +            self.__bots[bot_id] = now
    
    84
    +            self.__bots_by_instance[instance_name] += 1
    
    85
    +            self.__bots_by_status[bot_status].add(bot_id)
    
    86
    +
    
    87
    +            return bot_session
    
    51 88
     
    
    52 89
             except InvalidArgumentError as e:
    
    53 90
                 self.logger.error(e)
    
    ... ... @@ -57,15 +94,27 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    57 94
             return bots_pb2.BotSession()
    
    58 95
     
    
    59 96
         def UpdateBotSession(self, request, context):
    
    97
    +        names = request.name.split("/")
    
    98
    +        bot_status = BotStatus(request.bot_session.status)
    
    99
    +        bot_id = request.bot_session.bot_id
    
    100
    +
    
    60 101
             try:
    
    61
    -            names = request.name.split("/")
    
    62
    -            # Operation name should be in format:
    
    63
    -            # {instance/name}/{uuid}
    
    64
    -            instance_name = ''.join(names[0:-1])
    
    102
    +            instance_name = '/'.join(names[:-1])
    
    65 103
     
    
    66 104
                 instance = self._get_instance(instance_name)
    
    67
    -            return instance.update_bot_session(request.name,
    
    68
    -                                               request.bot_session)
    
    105
    +            bot_session = instance.update_bot_session(request.name,
    
    106
    +                                                      request.bot_session)
    
    107
    +
    
    108
    +            self.__bots[bot_id].GetCurrentTime()
    
    109
    +            if bot_id not in self.__bots_by_status[bot_status]:
    
    110
    +                self.__bots_by_status[BotStatus.OK].discard(bot_id)
    
    111
    +                self.__bots_by_status[BotStatus.UNHEALTHY].discard(bot_id)
    
    112
    +                self.__bots_by_status[BotStatus.HOST_REBOOTING].discard(bot_id)
    
    113
    +                self.__bots_by_status[BotStatus.BOT_TERMINATING].discard(bot_id)
    
    114
    +
    
    115
    +                self.__bots_by_status[bot_status].add(bot_id)
    
    116
    +
    
    117
    +            return bot_session
    
    69 118
     
    
    70 119
             except InvalidArgumentError as e:
    
    71 120
                 self.logger.error(e)
    
    ... ... @@ -86,7 +135,10 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    86 135
     
    
    87 136
         def PostBotEventTemp(self, request, context):
    
    88 137
             context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    89
    -        return Empty()
    
    138
    +
    
    139
    +        return empty_pb2.Empty()
    
    140
    +
    
    141
    +    # --- Private API ---
    
    90 142
     
    
    91 143
         def _get_instance(self, name):
    
    92 144
             try:
    

  • buildgrid/server/execution/service.py
    ... ... @@ -35,25 +35,52 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    35 35
     
    
    36 36
         def __init__(self, server):
    
    37 37
             self.logger = logging.getLogger(__name__)
    
    38
    +        self.__peers_by_instance = {}
    
    39
    +        self.__peers = {}
    
    40
    +
    
    38 41
             self._instances = {}
    
    42
    +
    
    39 43
             remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
    
    40 44
     
    
    41
    -    def add_instance(self, name, instance):
    
    42
    -        self._instances[name] = instance
    
    45
    +    @property
    
    46
    +    def n_clients(self):
    
    47
    +        return len(self.__peers)
    
    48
    +
    
    49
    +    @property
    
    50
    +    def n_clients_for_instance(self, instance_name):
    
    51
    +        return len(self.__peers_by_instance[instance_name])
    
    52
    +
    
    53
    +    # --- Public API ---
    
    54
    +
    
    55
    +    def add_instance(self, instance_name, instance):
    
    56
    +        self.__peers_by_instance[instance_name] = set()
    
    57
    +        self._instances[instance_name] = instance
    
    58
    +
    
    59
    +    # --- Public API: Servicer ---
    
    43 60
     
    
    44 61
         def Execute(self, request, context):
    
    62
    +        instance_name = request.instance_name
    
    63
    +        message_queue = queue.Queue()
    
    64
    +        peer = context.peer()
    
    65
    +
    
    45 66
             try:
    
    46
    -            message_queue = queue.Queue()
    
    47
    -            instance = self._get_instance(request.instance_name)
    
    67
    +            instance = self._get_instance(instance_name)
    
    68
    +
    
    48 69
                 operation = instance.execute(request.action_digest,
    
    49 70
                                              request.skip_cache_lookup,
    
    50 71
                                              message_queue)
    
    51 72
     
    
    52
    -            context.add_callback(partial(instance.unregister_message_client,
    
    53
    -                                         operation.name, message_queue))
    
    73
    +            context.add_callback(partial(self._rpc_termination_callback,
    
    74
    +                                         peer, instance_name, operation.name, message_queue))
    
    54 75
     
    
    55
    -            instanced_op_name = "{}/{}".format(request.instance_name,
    
    56
    -                                               operation.name)
    
    76
    +            if peer in self.__peers:
    
    77
    +                self.__peers[peer] += 1
    
    78
    +            else:
    
    79
    +                self.__peers[peer] = 1
    
    80
    +
    
    81
    +            self.__peers_by_instance[instance_name].add(peer)
    
    82
    +
    
    83
    +            instanced_op_name = "{}/{}".format(instance_name, operation.name)
    
    57 84
     
    
    58 85
                 self.logger.info("Operation name: [{}]".format(instanced_op_name))
    
    59 86
     
    
    ... ... @@ -77,21 +104,23 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    77 104
                 yield operations_pb2.Operation()
    
    78 105
     
    
    79 106
         def WaitExecution(self, request, context):
    
    107
    +        names = request.name.split("/")
    
    108
    +        message_queue = queue.Queue()
    
    109
    +        peer = context.peer()
    
    110
    +
    
    80 111
             try:
    
    81
    -            names = request.name.split("/")
    
    112
    +            instance_name = '/'.join(names[:-1])
    
    113
    +            operation_name = names[-1]
    
    82 114
     
    
    83
    -            # Operation name should be in format:
    
    84
    -            # {instance/name}/{operation_id}
    
    85
    -            instance_name = ''.join(names[0:-1])
    
    115
    +            if instance_name != request.instance_name:
    
    116
    +                raise InvalidArgumentError()
    
    86 117
     
    
    87
    -            message_queue = queue.Queue()
    
    88
    -            operation_name = names[-1]
    
    89 118
                 instance = self._get_instance(instance_name)
    
    90 119
     
    
    91 120
                 instance.register_message_client(operation_name, message_queue)
    
    92 121
     
    
    93
    -            context.add_callback(partial(instance.unregister_message_client,
    
    94
    -                                         operation_name, message_queue))
    
    122
    +            context.add_callback(partial(self._rpc_termination_callback,
    
    123
    +                                         peer, instance_name, operation_name, message_queue))
    
    95 124
     
    
    96 125
                 for operation in instance.stream_operation_updates(message_queue,
    
    97 126
                                                                    operation_name):
    
    ... ... @@ -106,6 +135,18 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    106 135
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    107 136
                 yield operations_pb2.Operation()
    
    108 137
     
    
    138
    +    # --- Private API ---
    
    139
    +
    
    140
    +    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
    
    141
    +        instance = self._get_instance(instance_name)
    
    142
    +
    
    143
    +        instance.unregister_message_client(job_name, message_queue)
    
    144
    +
    
    145
    +        if self.__peers[peer] > 1:
    
    146
    +            self.__peers[peer] -= 1
    
    147
    +        else:
    
    148
    +            del self.__peers[peer]
    
    149
    +
    
    109 150
         def _get_instance(self, name):
    
    110 151
             try:
    
    111 152
                 return self._instances[name]
    

  • buildgrid/server/scheduler.py
    ... ... @@ -21,9 +21,10 @@ Schedules jobs.
    21 21
     
    
    22 22
     from collections import deque
    
    23 23
     
    
    24
    -from buildgrid._exceptions import NotFoundError
    
    24
    +from google.protobuf import duration_pb2
    
    25 25
     
    
    26
    -from .job import OperationStage, LeaseState
    
    26
    +from buildgrid._enums import LeaseState, OperationStage
    
    27
    +from buildgrid._exceptions import NotFoundError
    
    27 28
     
    
    28 29
     
    
    29 30
     class Scheduler:
    
    ... ... @@ -31,10 +32,53 @@ class Scheduler:
    31 32
         MAX_N_TRIES = 5
    
    32 33
     
    
    33 34
         def __init__(self, action_cache=None):
    
    35
    +        self.__queue_times_by_priority = {}
    
    36
    +        self.__queue_time = duration_pb2.Duration()
    
    37
    +        self.__retries_by_error = {}
    
    38
    +        self.__retries_count = 0
    
    39
    +
    
    34 40
             self._action_cache = action_cache
    
    35 41
             self.jobs = {}
    
    36 42
             self.queue = deque()
    
    37 43
     
    
    44
    +    @property
    
    45
    +    def n_jobs(self):
    
    46
    +        return len(self.jobs)
    
    47
    +
    
    48
    +    @property
    
    49
    +    def n_operations(self):
    
    50
    +        return len(self.jobs)
    
    51
    +
    
    52
    +    @property
    
    53
    +    def n_operations_by_stage(self):
    
    54
    +        return len(self.jobs)
    
    55
    +
    
    56
    +    @property
    
    57
    +    def n_leases(self):
    
    58
    +        return len(self.jobs)
    
    59
    +
    
    60
    +    @property
    
    61
    +    def n_leases_by_state(self):
    
    62
    +        return len(self.jobs)
    
    63
    +
    
    64
    +    @property
    
    65
    +    def n_retries(self):
    
    66
    +        return self.__retries_count
    
    67
    +
    
    68
    +    @property
    
    69
    +    def n_retries_for_error(self, error_type):
    
    70
    +        return self.__retries_by_error[error_type]
    
    71
    +
    
    72
    +    @property
    
    73
    +    def am_queue_time(self):
    
    74
    +        return self.__average_queue_time
    
    75
    +
    
    76
    +    @property
    
    77
    +    def am_queue_time_for_priority(self, priority_level):
    
    78
    +        return self.__queue_times_by_priority[priority_level]
    
    79
    +
    
    80
    +    # --- Public API ---
    
    81
    +
    
    38 82
         def register_client(self, job_name, queue):
    
    39 83
             self.jobs[job_name].register_client(queue)
    
    40 84
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]