[Notes] [Git][BuildGrid/buildgrid][mablanch/132-gather-state-metrics] 3 commits: setup.py: Introduce janus dependency (Python >= 3.5.3)



Title: GitLab

Martin Blanchard pushed to branch mablanch/132-gather-state-metrics at BuildGrid / buildgrid

Commits:

4 changed files:

Changes:

  • .pylintrc
    ... ... @@ -461,6 +461,7 @@ known-third-party=boto3,
    461 461
                       enchant,
    
    462 462
                       google,
    
    463 463
                       grpc,
    
    464
    +                  janus,
    
    464 465
                       moto,
    
    465 466
                       yaml
    
    466 467
     
    

  • buildgrid/server/instance.py
    ... ... @@ -15,12 +15,16 @@
    15 15
     
    
    16 16
     import asyncio
    
    17 17
     from concurrent import futures
    
    18
    +from datetime import timedelta
    
    18 19
     import logging
    
    19 20
     import os
    
    20 21
     import signal
    
    22
    +import time
    
    21 23
     
    
    22 24
     import grpc
    
    23 25
     
    
    26
    +from buildgrid._enums import MetricRecordDomain, MetricRecordType
    
    27
    +from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    24 28
     from buildgrid.server.actioncache.service import ActionCacheService
    
    25 29
     from buildgrid.server.bots.service import BotsService
    
    26 30
     from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    ... ... @@ -28,6 +32,7 @@ from buildgrid.server.execution.service import ExecutionService
    28 32
     from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
    
    29 33
     from buildgrid.server.operations.service import OperationsService
    
    30 34
     from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    35
    +from buildgrid.settings import MONITORING_PERIOD
    
    31 36
     
    
    32 37
     
    
    33 38
     class BuildGridServer:
    
    ... ... @@ -55,6 +60,8 @@ class BuildGridServer:
    55 60
             self.__main_loop = asyncio.get_event_loop()
    
    56 61
             self.__monitoring_bus = None
    
    57 62
     
    
    63
    +        self.__state_monitoring_task = None
    
    64
    +
    
    58 65
             self._execution_service = None
    
    59 66
             self._bots_service = None
    
    60 67
             self._operations_service = None
    
    ... ... @@ -63,6 +70,9 @@ class BuildGridServer:
    63 70
             self._cas_service = None
    
    64 71
             self._bytestream_service = None
    
    65 72
     
    
    73
    +        self._schedulers = {}
    
    74
    +        self._instances = set()
    
    75
    +
    
    66 76
             self._is_instrumented = monitor
    
    67 77
     
    
    68 78
             if self._is_instrumented:
    
    ... ... @@ -79,6 +89,10 @@ class BuildGridServer:
    79 89
             if self._is_instrumented:
    
    80 90
                 self.__monitoring_bus.start()
    
    81 91
     
    
    92
    +            self.__state_monitoring_task = asyncio.ensure_future(
    
    93
    +                self._state_monitoring_worker(period=MONITORING_PERIOD),
    
    94
    +                loop=self.__main_loop)
    
    95
    +
    
    82 96
             self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
    
    83 97
     
    
    84 98
             self.__main_loop.run_forever()
    
    ... ... @@ -86,6 +100,9 @@ class BuildGridServer:
    86 100
         def stop(self):
    
    87 101
             """Stops the BuildGrid server."""
    
    88 102
             if self._is_instrumented:
    
    103
    +            if self.__state_monitoring_task is not None:
    
    104
    +                self.__state_monitoring_task.cancel()
    
    105
    +
    
    89 106
                 self.__monitoring_bus.stop()
    
    90 107
     
    
    91 108
             self.__main_loop.stop()
    
    ... ... @@ -130,6 +147,9 @@ class BuildGridServer:
    130 147
     
    
    131 148
             self._execution_service.add_instance(instance_name, instance)
    
    132 149
     
    
    150
    +        self._schedulers[instance_name] = instance.scheduler
    
    151
    +        self._instances.add(instance_name)
    
    152
    +
    
    133 153
         def add_bots_interface(self, instance, instance_name):
    
    134 154
             """Adds a :obj:`BotsInterface` to the service.
    
    135 155
     
    
    ... ... @@ -145,6 +165,8 @@ class BuildGridServer:
    145 165
     
    
    146 166
             self._bots_service.add_instance(instance_name, instance)
    
    147 167
     
    
    168
    +        self._instances.add(instance_name)
    
    169
    +
    
    148 170
         def add_operations_instance(self, instance, instance_name):
    
    149 171
             """Adds an :obj:`OperationsInstance` to the service.
    
    150 172
     
    
    ... ... @@ -220,3 +242,140 @@ class BuildGridServer:
    220 242
         @property
    
    221 243
         def is_instrumented(self):
    
    222 244
             return self._is_instrumented
    
    245
    +
    
    246
    +    # --- Private API ---
    
    247
    +
    
    248
    +    async def _state_monitoring_worker(self, period=1.0):
    
    249
    +        """Periodically publishes state metrics to the monitoring bus."""
    
    250
    +        async def __state_monitoring_worker():
    
    251
    +            # Emit total clients count record:
    
    252
    +            _, record = self._query_n_clients()
    
    253
    +            await self.__monitoring_bus.send_record(record)
    
    254
    +
    
    255
    +            # Emit total bots count record:
    
    256
    +            _, record = self._query_n_bots()
    
    257
    +            await self.__monitoring_bus.send_record(record)
    
    258
    +
    
    259
    +            queue_times = []
    
    260
    +            # Emits records by instance:
    
    261
    +            for instance_name in self._instances:
    
    262
    +                # Emit instance clients count record:
    
    263
    +                _, record = self._query_n_clients_for_instance(instance_name)
    
    264
    +                await self.__monitoring_bus.send_record(record)
    
    265
    +
    
    266
    +                # Emit instance bots count record:
    
    267
    +                _, record = self._query_n_bots_for_instance(instance_name)
    
    268
    +                await self.__monitoring_bus.send_record(record)
    
    269
    +
    
    270
    +                # Emit instance average queue time record:
    
    271
    +                queue_time, record = self._query_am_queue_time_for_instance(instance_name)
    
    272
    +                await self.__monitoring_bus.send_record(record)
    
    273
    +                if queue_time:
    
    274
    +                    queue_times.append(queue_time)
    
    275
    +
    
    276
    +            # Emit overall average queue time record:
    
    277
    +            if queue_times:
    
    278
    +                am_queue_time = sum(queue_times, timedelta()) / len(queue_times)
    
    279
    +            else:
    
    280
    +                am_queue_time = timedelta()
    
    281
    +            record = self._forge_timer_metric_record(
    
    282
    +                MetricRecordDomain.STATE,
    
    283
    +                'average-queue-time',
    
    284
    +                am_queue_time)
    
    285
    +
    
    286
    +            await self.__monitoring_bus.send_record(record)
    
    287
    +
    
    288
    +        try:
    
    289
    +            while True:
    
    290
    +                start = time.time()
    
    291
    +                await __state_monitoring_worker()
    
    292
    +
    
    293
    +                end = time.time()
    
    294
    +                await asyncio.sleep(period - (end - start))
    
    295
    +
    
    296
    +        except asyncio.CancelledError:
    
    297
    +            pass
    
    298
    +
    
    299
    +    def _forge_counter_metric_record(self, domain, name, count, metadata=None):
    
    300
    +        counter_record = monitoring_pb2.MetricRecord()
    
    301
    +
    
    302
    +        counter_record.creation_timestamp.GetCurrentTime()
    
    303
    +        counter_record.domain = domain.value
    
    304
    +        counter_record.type = MetricRecordType.COUNTER.value
    
    305
    +        counter_record.name = name
    
    306
    +        counter_record.count = count
    
    307
    +        if metadata is not None:
    
    308
    +            counter_record.metadata.update(metadata)
    
    309
    +
    
    310
    +        return counter_record
    
    311
    +
    
    312
    +    def _forge_timer_metric_record(self, domain, name, duration, metadata=None):
    
    313
    +        timer_record = monitoring_pb2.MetricRecord()
    
    314
    +
    
    315
    +        timer_record.creation_timestamp.GetCurrentTime()
    
    316
    +        timer_record.domain = domain.value
    
    317
    +        timer_record.type = MetricRecordType.TIMER.value
    
    318
    +        timer_record.name = name
    
    319
    +        timer_record.duration.FromTimedelta(duration)
    
    320
    +        if metadata is not None:
    
    321
    +            timer_record.metadata.update(metadata)
    
    322
    +
    
    323
    +        return timer_record
    
    324
    +
    
    325
    +    def _forge_gauge_metric_record(self, domain, name, value, metadata=None):
    
    326
    +        gauge_record = monitoring_pb2.MetricRecord()
    
    327
    +
    
    328
    +        gauge_record.creation_timestamp.GetCurrentTime()
    
    329
    +        gauge_record.domain = domain.value
    
    330
    +        gauge_record.type = MetricRecordType.GAUGE.value
    
    331
    +        gauge_record.name = name
    
    332
    +        gauge_record.value = value
    
    333
    +        if metadata is not None:
    
    334
    +            gauge_record.metadata.update(metadata)
    
    335
    +
    
    336
    +        return gauge_record
    
    337
    +
    
    338
    +    # --- Private API: Monitoring ---
    
    339
    +
    
    340
    +    def _query_n_clients(self):
    
    341
    +        """Queries the number of clients connected."""
    
    342
    +        n_clients = self._execution_service.query_n_clients()
    
    343
    +        gauge_record = self._forge_gauge_metric_record(
    
    344
    +            MetricRecordDomain.STATE, 'clients-count', n_clients)
    
    345
    +
    
    346
    +        return n_clients, gauge_record
    
    347
    +
    
    348
    +    def _query_n_clients_for_instance(self, instance_name):
    
    349
    +        """Queries the number of clients connected for a given instance"""
    
    350
    +        n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
    
    351
    +        gauge_record = self._forge_gauge_metric_record(
    
    352
    +            MetricRecordDomain.STATE, 'clients-count', n_clients,
    
    353
    +            metadata={'instance-name': instance_name or 'void'})
    
    354
    +
    
    355
    +        return n_clients, gauge_record
    
    356
    +
    
    357
    +    def _query_n_bots(self):
    
    358
    +        """Queries the number of bots connected."""
    
    359
    +        n_bots = self._bots_service.query_n_bots()
    
    360
    +        gauge_record = self._forge_gauge_metric_record(
    
    361
    +            MetricRecordDomain.STATE, 'bots-count', n_bots)
    
    362
    +
    
    363
    +        return n_bots, gauge_record
    
    364
    +
    
    365
    +    def _query_n_bots_for_instance(self, instance_name):
    
    366
    +        """Queries the number of bots connected for a given instance."""
    
    367
    +        n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
    
    368
    +        gauge_record = self._forge_gauge_metric_record(
    
    369
    +            MetricRecordDomain.STATE, 'bots-count', n_bots,
    
    370
    +            metadata={'instance-name': instance_name or 'void'})
    
    371
    +
    
    372
    +        return n_bots, gauge_record
    
    373
    +
    
    374
    +    def _query_am_queue_time_for_instance(self, instance_name):
    
    375
    +        """Queries the average job's queue time for a given instance."""
    
    376
    +        am_queue_time = self._schedulers[instance_name].query_am_queue_time()
    
    377
    +        timer_record = self._forge_timer_metric_record(
    
    378
    +            MetricRecordDomain.STATE, 'average-queue-time', am_queue_time,
    
    379
    +            metadata={'instance-name': instance_name or 'void'})
    
    380
    +
    
    381
    +        return am_queue_time, timer_record

  • buildgrid/settings.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    1 16
     import hashlib
    
    2 17
     
    
    3 18
     
    
    4
    -# The hash function that CAS uses
    
    19
    +# Hash function used for computing digests:
    
    5 20
     HASH = hashlib.sha256
    
    21
    +
    
    22
    +# Lenght in bytes of a hash string returned by HASH:
    
    6 23
     HASH_LENGTH = HASH().digest_size * 2
    
    24
    +
    
    25
    +# Period, in seconds, for the monitoring cycle:
    
    26
    +MONITORING_PERIOD = 5.0

  • setup.py
    ... ... @@ -112,13 +112,15 @@ setup(
    112 112
         license="Apache License, Version 2.0",
    
    113 113
         description="A remote execution service",
    
    114 114
         packages=find_packages(),
    
    115
    +    python_requires='>= 3.5.3',  # janus requirement
    
    115 116
         install_requires=[
    
    116
    -        'protobuf',
    
    117
    -        'grpcio',
    
    118
    -        'Click',
    
    119
    -        'PyYAML',
    
    120 117
             'boto3 < 1.8.0',
    
    121 118
             'botocore < 1.11.0',
    
    119
    +        'click',
    
    120
    +        'grpcio',
    
    121
    +        'janus',
    
    122
    +        'protobuf',
    
    123
    +        'pyyaml',
    
    122 124
         ],
    
    123 125
         entry_points={
    
    124 126
             'console_scripts': [
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]