[Notes] [Git][BuildGrid/buildgrid][mablanch/132-gather-state-metrics] 19 commits: Add private .proto file for monitoring bus messages



Title: GitLab

Martin Blanchard pushed to branch mablanch/132-gather-state-metrics at BuildGrid / buildgrid

Commits:

25 changed files:

Changes:

  • .pylintrc
    ... ... @@ -185,6 +185,7 @@ ignore-on-opaque-inference=yes
    185 185
     # for classes with dynamically set attributes). This supports the use of
    
    186 186
     # qualified names.
    
    187 187
     ignored-classes=google.protobuf.any_pb2.Any,
    
    188
    +                google.protobuf.duration_pb2.Duration,
    
    188 189
                     google.protobuf.timestamp_pb2.Timestamp
    
    189 190
     
    
    190 191
     # List of module names for which member attributes should not be checked
    
    ... ... @@ -460,6 +461,7 @@ known-third-party=boto3,
    460 461
                       enchant,
    
    461 462
                       google,
    
    462 463
                       grpc,
    
    464
    +                  janus,
    
    463 465
                       moto,
    
    464 466
                       yaml
    
    465 467
     
    
    ... ... @@ -523,4 +525,4 @@ valid-metaclass-classmethod-first-arg=mcs
    523 525
     
    
    524 526
     # Exceptions that will emit a warning when being caught. Defaults to
    
    525 527
     # "Exception"
    
    526
    -overgeneral-exceptions=Exception
    528
    +overgeneral-exceptions=Exception
    \ No newline at end of file

  • buildgrid/_app/commands/cmd_server.py
    ... ... @@ -20,7 +20,6 @@ Server command
    20 20
     Create a BuildGrid server.
    
    21 21
     """
    
    22 22
     
    
    23
    -import asyncio
    
    24 23
     import logging
    
    25 24
     import sys
    
    26 25
     
    
    ... ... @@ -52,18 +51,14 @@ def start(context, config):
    52 51
             click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True)
    
    53 52
             sys.exit(-1)
    
    54 53
     
    
    55
    -    loop = asyncio.get_event_loop()
    
    56 54
         try:
    
    57 55
             server.start()
    
    58
    -        loop.run_forever()
    
    59 56
     
    
    60 57
         except KeyboardInterrupt:
    
    61 58
             pass
    
    62 59
     
    
    63 60
         finally:
    
    64
    -        context.logger.info("Stopping server")
    
    65 61
             server.stop()
    
    66
    -        loop.close()
    
    67 62
     
    
    68 63
     
    
    69 64
     def _create_server_from_config(config):
    

  • buildgrid/_enums.py
    ... ... @@ -16,9 +16,13 @@
    16 16
     from enum import Enum
    
    17 17
     
    
    18 18
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    19
    +from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    19 20
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    20 21
     
    
    21 22
     
    
    23
    +# RWAPI enumerations
    
    24
    +# From google/devtools/remoteworkers/v1test2/bots.proto:
    
    25
    +
    
    22 26
     class BotStatus(Enum):
    
    23 27
         # Initially unknown state.
    
    24 28
         BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
    
    ... ... @@ -45,6 +49,9 @@ class LeaseState(Enum):
    45 49
         CANCELLED = bots_pb2.LeaseState.Value('CANCELLED')
    
    46 50
     
    
    47 51
     
    
    52
    +# REAPI enumerations
    
    53
    +# From build/bazel/remote/execution/v2/remote_execution.proto:
    
    54
    +
    
    48 55
     class OperationStage(Enum):
    
    49 56
         # Initially unknown stage.
    
    50 57
         UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
    
    ... ... @@ -56,3 +63,41 @@ class OperationStage(Enum):
    56 63
         EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
    
    57 64
         # Finished execution.
    
    58 65
         COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
    
    66
    +
    
    67
    +
    
    68
    +# Internal enumerations
    
    69
    +# From buildgrid.v2/monitoring.proto:
    
    70
    +
    
    71
    +class LogRecordLevel(Enum):
    
    72
    +    # Initially unknown level.
    
    73
    +    NOTSET = monitoring_pb2.LogRecord.Level.Value('NOTSET')
    
    74
    +    # Debug message severity level.
    
    75
    +    DEBUG = monitoring_pb2.LogRecord.Level.Value('DEBUG')
    
    76
    +    # Information message severity level.
    
    77
    +    INFO = monitoring_pb2.LogRecord.Level.Value('INFO')
    
    78
    +    # Warning message severity level.
    
    79
    +    WARNING = monitoring_pb2.LogRecord.Level.Value('WARNING')
    
    80
    +    # Error message severity level.
    
    81
    +    ERROR = monitoring_pb2.LogRecord.Level.Value('ERROR')
    
    82
    +    # Critical message severity level.
    
    83
    +    CRITICAL = monitoring_pb2.LogRecord.Level.Value('CRITICAL')
    
    84
    +
    
    85
    +
    
    86
    +class MetricRecordDomain(Enum):
    
    87
    +    # Initially unknown domain.
    
    88
    +    UNKNOWN = monitoring_pb2.MetricRecord.Domain.Value('UNKNOWN')
    
    89
    +    # A server state related metric.
    
    90
    +    STATE = monitoring_pb2.MetricRecord.Domain.Value('STATE')
    
    91
    +    # A build execution related metric.
    
    92
    +    BUILD = monitoring_pb2.MetricRecord.Domain.Value('BUILD')
    
    93
    +
    
    94
    +
    
    95
    +class MetricRecordType(Enum):
    
    96
    +    # Initially unknown type.
    
    97
    +    NONE = monitoring_pb2.MetricRecord.Type.Value('NONE')
    
    98
    +    # A metric for counting.
    
    99
    +    COUNTER = monitoring_pb2.MetricRecord.Type.Value('COUNTER')
    
    100
    +    # A metric for mesuring a duration.
    
    101
    +    TIMER = monitoring_pb2.MetricRecord.Type.Value('TIMER')
    
    102
    +    # A metric in arbitrary value.
    
    103
    +    GAUGE = monitoring_pb2.MetricRecord.Type.Value('GAUGE')

  • buildgrid/_protos/buildgrid/__init__.py

  • buildgrid/_protos/buildgrid/v2/__init__.py

  • buildgrid/_protos/buildgrid/v2/monitoring.proto
    1
    +// Copyright (C) 2018 Bloomberg LP
    
    2
    +//
    
    3
    +// Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +// you may not use this file except in compliance with the License.
    
    5
    +// You may obtain a copy of the License at
    
    6
    +//
    
    7
    +//  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +//
    
    9
    +// Unless required by applicable law or agreed to in writing, software
    
    10
    +// distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +// See the License for the specific language governing permissions and
    
    13
    +// limitations under the License.
    
    14
    +
    
    15
    +syntax = "proto3";
    
    16
    +
    
    17
    +package buildgrid.v2;
    
    18
    +
    
    19
    +import "google/api/annotations.proto";
    
    20
    +import "google/protobuf/duration.proto";
    
    21
    +import "google/protobuf/timestamp.proto";
    
    22
    +
    
    23
    +message BusMessage {
    
    24
    +  // The position of this message in the bus stream.
    
    25
    +  int64 sequence_number = 1;
    
    26
    +
    
    27
    +  // The carried message.
    
    28
    +  oneof record {
    
    29
    +    LogRecord log_record = 2;
    
    30
    +    MetricRecord metric_record = 3;
    
    31
    +  }
    
    32
    +}
    
    33
    +
    
    34
    +message LogRecord {
    
    35
    +  // When the record has been created.
    
    36
    +  google.protobuf.Timestamp creation_timestamp = 1;
    
    37
    +
    
    38
    +  enum Level {
    
    39
    +    NOTSET = 0;
    
    40
    +    // Debug message severity level.
    
    41
    +    DEBUG = 1;
    
    42
    +    // Information message severity level.
    
    43
    +    INFO = 2;
    
    44
    +    // Warning message severity level.
    
    45
    +    WARNING = 3;
    
    46
    +    // Error message severity level.
    
    47
    +    ERROR = 4;
    
    48
    +    // Critical message severity level.
    
    49
    +    CRITICAL = 5;
    
    50
    +  }
    
    51
    +
    
    52
    +  // The domain name for the record.
    
    53
    +  string domain = 2;
    
    54
    +
    
    55
    +  // The severity level of the record.
    
    56
    +  Level level = 3;
    
    57
    +
    
    58
    +  // The human-readable record's message.
    
    59
    +  string message = 4;
    
    60
    +
    
    61
    +  // An optional list of additional metadata.
    
    62
    +  map<string, string> extra = 5;
    
    63
    +}
    
    64
    +
    
    65
    +message MetricRecord {
    
    66
    +  // When the metric has been created.
    
    67
    +  google.protobuf.Timestamp creation_timestamp = 1;
    
    68
    +
    
    69
    +  enum Domain {
    
    70
    +    UNKNOWN = 0;
    
    71
    +    // A server state related metric.
    
    72
    +    STATE = 1;
    
    73
    +    // A build execution related metric.
    
    74
    +    BUILD = 2;
    
    75
    +  }
    
    76
    +
    
    77
    +  // The domain for the record.
    
    78
    +  Domain domain = 2;
    
    79
    +
    
    80
    +  enum Type {
    
    81
    +    NONE = 0;
    
    82
    +    // A metric for counting.
    
    83
    +    COUNTER = 1;
    
    84
    +    // A metric for mesuring a duration.
    
    85
    +    TIMER = 2;
    
    86
    +    // A metric in arbitrary value.
    
    87
    +    GAUGE = 3;
    
    88
    +  }
    
    89
    +
    
    90
    +  // The type of metric, see Type.
    
    91
    +  Type type = 3;
    
    92
    +
    
    93
    +  // The name identifying the metric.
    
    94
    +  string name = 4;
    
    95
    +
    
    96
    +  // The carried value, depending on the metric's type.
    
    97
    +  oneof data {
    
    98
    +    // Set for Type.COUNTER metrics.
    
    99
    +    int32 count = 5;
    
    100
    +    // Set for Type.TIMER metrics.
    
    101
    +    google.protobuf.Duration duration = 6;
    
    102
    +    // Set for Type.GAUGE metrics.
    
    103
    +    int32 value = 7;
    
    104
    +  }
    
    105
    +
    
    106
    +  // An optional list of additional metadata.
    
    107
    +  map<string, string> extra = 8;
    
    108
    +}
    \ No newline at end of file

  • buildgrid/_protos/buildgrid/v2/monitoring_pb2.py
    1
    +# Generated by the protocol buffer compiler.  DO NOT EDIT!
    
    2
    +# source: buildgrid/v2/monitoring.proto
    
    3
    +
    
    4
    +import sys
    
    5
    +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
    
    6
    +from google.protobuf import descriptor as _descriptor
    
    7
    +from google.protobuf import message as _message
    
    8
    +from google.protobuf import reflection as _reflection
    
    9
    +from google.protobuf import symbol_database as _symbol_database
    
    10
    +# @@protoc_insertion_point(imports)
    
    11
    +
    
    12
    +_sym_db = _symbol_database.Default()
    
    13
    +
    
    14
    +
    
    15
    +from buildgrid._protos.google.api import annotations_pb2 as google_dot_api_dot_annotations__pb2
    
    16
    +from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2
    
    17
    +from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
    
    18
    +
    
    19
    +
    
    20
    +DESCRIPTOR = _descriptor.FileDescriptor(
    
    21
    +  name='buildgrid/v2/monitoring.proto',
    
    22
    +  package='buildgrid.v2',
    
    23
    +  syntax='proto3',
    
    24
    +  serialized_options=None,
    
    25
    +  serialized_pb=_b('\n\x1d\x62uildgrid/v2/monitoring.proto\x12\x0c\x62uildgrid.v2\x1a\x1cgoogle/api/annotations.proto\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\x93\x01\n\nBusMessage\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x03\x12-\n\nlog_record\x18\x02 \x01(\x0b\x32\x17.buildgrid.v2.LogRecordH\x00\x12\x33\n\rmetric_record\x18\x03 \x01(\x0b\x32\x1a.buildgrid.v2.MetricRecordH\x00\x42\x08\n\x06record\"\xc3\x02\n\tLogRecord\x12\x36\n\x12\x63reation_timestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0e\n\x06\x64omain\x18\x02 \x01(\t\x12,\n\x05level\x18\x03 \x01(\x0e\x32\x1d.buildgrid.v2.LogRecord.Level\x12\x0f\n\x07message\x18\x04 \x01(\t\x12\x31\n\x05\x65xtra\x18\x05 \x03(\x0b\x32\".buildgrid.v2.LogRecord.ExtraEntry\x1a,\n\nExtraEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"N\n\x05Level\x12\n\n\x06NOTSET\x10\x00\x12\t\n\x05\x44\x45\x42UG\x10\x01\x12\x08\n\x04INFO\x10\x02\x12\x0b\n\x07WARNING\x10\x03\x12\t\n\x05\x45RROR\x10\x04\x12\x0c\n\x08\x43RITICAL\x10\x05\"\xd5\x03\n\x0cMetricRecord\x12\x36\n\x12\x63reation_timestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x31\n\x06\x64omain\x18\x02 \x01(\x0e\x32!.buildgrid.v2.MetricRecord.Domain\x12-\n\x04type\x18\x03 \x01(\x0e\x32\x1f.buildgrid.v2.MetricRecord.Type\x12\x0c\n\x04name\x18\x04 \x01(\t\x12\x0f\n\x05\x63ount\x18\x05 \x01(\x05H\x00\x12-\n\x08\x64uration\x18\x06 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x12\x0f\n\x05value\x18\x07 \x01(\x05H\x00\x12\x34\n\x05\x65xtra\x18\x08 \x03(\x0b\x32%.buildgrid.v2.MetricRecord.ExtraEntry\x1a,\n\nExtraEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"+\n\x06\x44omain\x12\x0b\n\x07UNKNOWN\x10\x00\x12\t\n\x05STATE\x10\x01\x12\t\n\x05\x42UILD\x10\x02\"3\n\x04Type\x12\x08\n\x04NONE\x10\x00\x12\x0b\n\x07\x43OUNTER\x10\x01\x12\t\n\x05TIMER\x10\x02\x12\t\n\x05GAUGE\x10\x03\x42\x06\n\x04\x64\x61tab\x06proto3')
    
    26
    +  ,
    
    27
    +  dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_duration__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,])
    
    28
    +
    
    29
    +
    
    30
    +
    
    31
    +_LOGRECORD_LEVEL = _descriptor.EnumDescriptor(
    
    32
    +  name='Level',
    
    33
    +  full_name='buildgrid.v2.LogRecord.Level',
    
    34
    +  filename=None,
    
    35
    +  file=DESCRIPTOR,
    
    36
    +  values=[
    
    37
    +    _descriptor.EnumValueDescriptor(
    
    38
    +      name='NOTSET', index=0, number=0,
    
    39
    +      serialized_options=None,
    
    40
    +      type=None),
    
    41
    +    _descriptor.EnumValueDescriptor(
    
    42
    +      name='DEBUG', index=1, number=1,
    
    43
    +      serialized_options=None,
    
    44
    +      type=None),
    
    45
    +    _descriptor.EnumValueDescriptor(
    
    46
    +      name='INFO', index=2, number=2,
    
    47
    +      serialized_options=None,
    
    48
    +      type=None),
    
    49
    +    _descriptor.EnumValueDescriptor(
    
    50
    +      name='WARNING', index=3, number=3,
    
    51
    +      serialized_options=None,
    
    52
    +      type=None),
    
    53
    +    _descriptor.EnumValueDescriptor(
    
    54
    +      name='ERROR', index=4, number=4,
    
    55
    +      serialized_options=None,
    
    56
    +      type=None),
    
    57
    +    _descriptor.EnumValueDescriptor(
    
    58
    +      name='CRITICAL', index=5, number=5,
    
    59
    +      serialized_options=None,
    
    60
    +      type=None),
    
    61
    +  ],
    
    62
    +  containing_type=None,
    
    63
    +  serialized_options=None,
    
    64
    +  serialized_start=538,
    
    65
    +  serialized_end=616,
    
    66
    +)
    
    67
    +_sym_db.RegisterEnumDescriptor(_LOGRECORD_LEVEL)
    
    68
    +
    
    69
    +_METRICRECORD_DOMAIN = _descriptor.EnumDescriptor(
    
    70
    +  name='Domain',
    
    71
    +  full_name='buildgrid.v2.MetricRecord.Domain',
    
    72
    +  filename=None,
    
    73
    +  file=DESCRIPTOR,
    
    74
    +  values=[
    
    75
    +    _descriptor.EnumValueDescriptor(
    
    76
    +      name='UNKNOWN', index=0, number=0,
    
    77
    +      serialized_options=None,
    
    78
    +      type=None),
    
    79
    +    _descriptor.EnumValueDescriptor(
    
    80
    +      name='STATE', index=1, number=1,
    
    81
    +      serialized_options=None,
    
    82
    +      type=None),
    
    83
    +    _descriptor.EnumValueDescriptor(
    
    84
    +      name='BUILD', index=2, number=2,
    
    85
    +      serialized_options=None,
    
    86
    +      type=None),
    
    87
    +  ],
    
    88
    +  containing_type=None,
    
    89
    +  serialized_options=None,
    
    90
    +  serialized_start=984,
    
    91
    +  serialized_end=1027,
    
    92
    +)
    
    93
    +_sym_db.RegisterEnumDescriptor(_METRICRECORD_DOMAIN)
    
    94
    +
    
    95
    +_METRICRECORD_TYPE = _descriptor.EnumDescriptor(
    
    96
    +  name='Type',
    
    97
    +  full_name='buildgrid.v2.MetricRecord.Type',
    
    98
    +  filename=None,
    
    99
    +  file=DESCRIPTOR,
    
    100
    +  values=[
    
    101
    +    _descriptor.EnumValueDescriptor(
    
    102
    +      name='NONE', index=0, number=0,
    
    103
    +      serialized_options=None,
    
    104
    +      type=None),
    
    105
    +    _descriptor.EnumValueDescriptor(
    
    106
    +      name='COUNTER', index=1, number=1,
    
    107
    +      serialized_options=None,
    
    108
    +      type=None),
    
    109
    +    _descriptor.EnumValueDescriptor(
    
    110
    +      name='TIMER', index=2, number=2,
    
    111
    +      serialized_options=None,
    
    112
    +      type=None),
    
    113
    +    _descriptor.EnumValueDescriptor(
    
    114
    +      name='GAUGE', index=3, number=3,
    
    115
    +      serialized_options=None,
    
    116
    +      type=None),
    
    117
    +  ],
    
    118
    +  containing_type=None,
    
    119
    +  serialized_options=None,
    
    120
    +  serialized_start=1029,
    
    121
    +  serialized_end=1080,
    
    122
    +)
    
    123
    +_sym_db.RegisterEnumDescriptor(_METRICRECORD_TYPE)
    
    124
    +
    
    125
    +
    
    126
    +_BUSMESSAGE = _descriptor.Descriptor(
    
    127
    +  name='BusMessage',
    
    128
    +  full_name='buildgrid.v2.BusMessage',
    
    129
    +  filename=None,
    
    130
    +  file=DESCRIPTOR,
    
    131
    +  containing_type=None,
    
    132
    +  fields=[
    
    133
    +    _descriptor.FieldDescriptor(
    
    134
    +      name='sequence_number', full_name='buildgrid.v2.BusMessage.sequence_number', index=0,
    
    135
    +      number=1, type=3, cpp_type=2, label=1,
    
    136
    +      has_default_value=False, default_value=0,
    
    137
    +      message_type=None, enum_type=None, containing_type=None,
    
    138
    +      is_extension=False, extension_scope=None,
    
    139
    +      serialized_options=None, file=DESCRIPTOR),
    
    140
    +    _descriptor.FieldDescriptor(
    
    141
    +      name='log_record', full_name='buildgrid.v2.BusMessage.log_record', index=1,
    
    142
    +      number=2, type=11, cpp_type=10, label=1,
    
    143
    +      has_default_value=False, default_value=None,
    
    144
    +      message_type=None, enum_type=None, containing_type=None,
    
    145
    +      is_extension=False, extension_scope=None,
    
    146
    +      serialized_options=None, file=DESCRIPTOR),
    
    147
    +    _descriptor.FieldDescriptor(
    
    148
    +      name='metric_record', full_name='buildgrid.v2.BusMessage.metric_record', index=2,
    
    149
    +      number=3, type=11, cpp_type=10, label=1,
    
    150
    +      has_default_value=False, default_value=None,
    
    151
    +      message_type=None, enum_type=None, containing_type=None,
    
    152
    +      is_extension=False, extension_scope=None,
    
    153
    +      serialized_options=None, file=DESCRIPTOR),
    
    154
    +  ],
    
    155
    +  extensions=[
    
    156
    +  ],
    
    157
    +  nested_types=[],
    
    158
    +  enum_types=[
    
    159
    +  ],
    
    160
    +  serialized_options=None,
    
    161
    +  is_extendable=False,
    
    162
    +  syntax='proto3',
    
    163
    +  extension_ranges=[],
    
    164
    +  oneofs=[
    
    165
    +    _descriptor.OneofDescriptor(
    
    166
    +      name='record', full_name='buildgrid.v2.BusMessage.record',
    
    167
    +      index=0, containing_type=None, fields=[]),
    
    168
    +  ],
    
    169
    +  serialized_start=143,
    
    170
    +  serialized_end=290,
    
    171
    +)
    
    172
    +
    
    173
    +
    
    174
    +_LOGRECORD_EXTRAENTRY = _descriptor.Descriptor(
    
    175
    +  name='ExtraEntry',
    
    176
    +  full_name='buildgrid.v2.LogRecord.ExtraEntry',
    
    177
    +  filename=None,
    
    178
    +  file=DESCRIPTOR,
    
    179
    +  containing_type=None,
    
    180
    +  fields=[
    
    181
    +    _descriptor.FieldDescriptor(
    
    182
    +      name='key', full_name='buildgrid.v2.LogRecord.ExtraEntry.key', index=0,
    
    183
    +      number=1, type=9, cpp_type=9, label=1,
    
    184
    +      has_default_value=False, default_value=_b("").decode('utf-8'),
    
    185
    +      message_type=None, enum_type=None, containing_type=None,
    
    186
    +      is_extension=False, extension_scope=None,
    
    187
    +      serialized_options=None, file=DESCRIPTOR),
    
    188
    +    _descriptor.FieldDescriptor(
    
    189
    +      name='value', full_name='buildgrid.v2.LogRecord.ExtraEntry.value', index=1,
    
    190
    +      number=2, type=9, cpp_type=9, label=1,
    
    191
    +      has_default_value=False, default_value=_b("").decode('utf-8'),
    
    192
    +      message_type=None, enum_type=None, containing_type=None,
    
    193
    +      is_extension=False, extension_scope=None,
    
    194
    +      serialized_options=None, file=DESCRIPTOR),
    
    195
    +  ],
    
    196
    +  extensions=[
    
    197
    +  ],
    
    198
    +  nested_types=[],
    
    199
    +  enum_types=[
    
    200
    +  ],
    
    201
    +  serialized_options=_b('8\001'),
    
    202
    +  is_extendable=False,
    
    203
    +  syntax='proto3',
    
    204
    +  extension_ranges=[],
    
    205
    +  oneofs=[
    
    206
    +  ],
    
    207
    +  serialized_start=492,
    
    208
    +  serialized_end=536,
    
    209
    +)
    
    210
    +
    
    211
    +_LOGRECORD = _descriptor.Descriptor(
    
    212
    +  name='LogRecord',
    
    213
    +  full_name='buildgrid.v2.LogRecord',
    
    214
    +  filename=None,
    
    215
    +  file=DESCRIPTOR,
    
    216
    +  containing_type=None,
    
    217
    +  fields=[
    
    218
    +    _descriptor.FieldDescriptor(
    
    219
    +      name='creation_timestamp', full_name='buildgrid.v2.LogRecord.creation_timestamp', index=0,
    
    220
    +      number=1, type=11, cpp_type=10, label=1,
    
    221
    +      has_default_value=False, default_value=None,
    
    222
    +      message_type=None, enum_type=None, containing_type=None,
    
    223
    +      is_extension=False, extension_scope=None,
    
    224
    +      serialized_options=None, file=DESCRIPTOR),
    
    225
    +    _descriptor.FieldDescriptor(
    
    226
    +      name='domain', full_name='buildgrid.v2.LogRecord.domain', index=1,
    
    227
    +      number=2, type=9, cpp_type=9, label=1,
    
    228
    +      has_default_value=False, default_value=_b("").decode('utf-8'),
    
    229
    +      message_type=None, enum_type=None, containing_type=None,
    
    230
    +      is_extension=False, extension_scope=None,
    
    231
    +      serialized_options=None, file=DESCRIPTOR),
    
    232
    +    _descriptor.FieldDescriptor(
    
    233
    +      name='level', full_name='buildgrid.v2.LogRecord.level', index=2,
    
    234
    +      number=3, type=14, cpp_type=8, label=1,
    
    235
    +      has_default_value=False, default_value=0,
    
    236
    +      message_type=None, enum_type=None, containing_type=None,
    
    237
    +      is_extension=False, extension_scope=None,
    
    238
    +      serialized_options=None, file=DESCRIPTOR),
    
    239
    +    _descriptor.FieldDescriptor(
    
    240
    +      name='message', full_name='buildgrid.v2.LogRecord.message', index=3,
    
    241
    +      number=4, type=9, cpp_type=9, label=1,
    
    242
    +      has_default_value=False, default_value=_b("").decode('utf-8'),
    
    243
    +      message_type=None, enum_type=None, containing_type=None,
    
    244
    +      is_extension=False, extension_scope=None,
    
    245
    +      serialized_options=None, file=DESCRIPTOR),
    
    246
    +    _descriptor.FieldDescriptor(
    
    247
    +      name='extra', full_name='buildgrid.v2.LogRecord.extra', index=4,
    
    248
    +      number=5, type=11, cpp_type=10, label=3,
    
    249
    +      has_default_value=False, default_value=[],
    
    250
    +      message_type=None, enum_type=None, containing_type=None,
    
    251
    +      is_extension=False, extension_scope=None,
    
    252
    +      serialized_options=None, file=DESCRIPTOR),
    
    253
    +  ],
    
    254
    +  extensions=[
    
    255
    +  ],
    
    256
    +  nested_types=[_LOGRECORD_EXTRAENTRY, ],
    
    257
    +  enum_types=[
    
    258
    +    _LOGRECORD_LEVEL,
    
    259
    +  ],
    
    260
    +  serialized_options=None,
    
    261
    +  is_extendable=False,
    
    262
    +  syntax='proto3',
    
    263
    +  extension_ranges=[],
    
    264
    +  oneofs=[
    
    265
    +  ],
    
    266
    +  serialized_start=293,
    
    267
    +  serialized_end=616,
    
    268
    +)
    
    269
    +
    
    270
    +
    
    271
    +_METRICRECORD_EXTRAENTRY = _descriptor.Descriptor(
    
    272
    +  name='ExtraEntry',
    
    273
    +  full_name='buildgrid.v2.MetricRecord.ExtraEntry',
    
    274
    +  filename=None,
    
    275
    +  file=DESCRIPTOR,
    
    276
    +  containing_type=None,
    
    277
    +  fields=[
    
    278
    +    _descriptor.FieldDescriptor(
    
    279
    +      name='key', full_name='buildgrid.v2.MetricRecord.ExtraEntry.key', index=0,
    
    280
    +      number=1, type=9, cpp_type=9, label=1,
    
    281
    +      has_default_value=False, default_value=_b("").decode('utf-8'),
    
    282
    +      message_type=None, enum_type=None, containing_type=None,
    
    283
    +      is_extension=False, extension_scope=None,
    
    284
    +      serialized_options=None, file=DESCRIPTOR),
    
    285
    +    _descriptor.FieldDescriptor(
    
    286
    +      name='value', full_name='buildgrid.v2.MetricRecord.ExtraEntry.value', index=1,
    
    287
    +      number=2, type=9, cpp_type=9, label=1,
    
    288
    +      has_default_value=False, default_value=_b("").decode('utf-8'),
    
    289
    +      message_type=None, enum_type=None, containing_type=None,
    
    290
    +      is_extension=False, extension_scope=None,
    
    291
    +      serialized_options=None, file=DESCRIPTOR),
    
    292
    +  ],
    
    293
    +  extensions=[
    
    294
    +  ],
    
    295
    +  nested_types=[],
    
    296
    +  enum_types=[
    
    297
    +  ],
    
    298
    +  serialized_options=_b('8\001'),
    
    299
    +  is_extendable=False,
    
    300
    +  syntax='proto3',
    
    301
    +  extension_ranges=[],
    
    302
    +  oneofs=[
    
    303
    +  ],
    
    304
    +  serialized_start=492,
    
    305
    +  serialized_end=536,
    
    306
    +)
    
    307
    +
    
    308
    +_METRICRECORD = _descriptor.Descriptor(
    
    309
    +  name='MetricRecord',
    
    310
    +  full_name='buildgrid.v2.MetricRecord',
    
    311
    +  filename=None,
    
    312
    +  file=DESCRIPTOR,
    
    313
    +  containing_type=None,
    
    314
    +  fields=[
    
    315
    +    _descriptor.FieldDescriptor(
    
    316
    +      name='creation_timestamp', full_name='buildgrid.v2.MetricRecord.creation_timestamp', index=0,
    
    317
    +      number=1, type=11, cpp_type=10, label=1,
    
    318
    +      has_default_value=False, default_value=None,
    
    319
    +      message_type=None, enum_type=None, containing_type=None,
    
    320
    +      is_extension=False, extension_scope=None,
    
    321
    +      serialized_options=None, file=DESCRIPTOR),
    
    322
    +    _descriptor.FieldDescriptor(
    
    323
    +      name='domain', full_name='buildgrid.v2.MetricRecord.domain', index=1,
    
    324
    +      number=2, type=14, cpp_type=8, label=1,
    
    325
    +      has_default_value=False, default_value=0,
    
    326
    +      message_type=None, enum_type=None, containing_type=None,
    
    327
    +      is_extension=False, extension_scope=None,
    
    328
    +      serialized_options=None, file=DESCRIPTOR),
    
    329
    +    _descriptor.FieldDescriptor(
    
    330
    +      name='type', full_name='buildgrid.v2.MetricRecord.type', index=2,
    
    331
    +      number=3, type=14, cpp_type=8, label=1,
    
    332
    +      has_default_value=False, default_value=0,
    
    333
    +      message_type=None, enum_type=None, containing_type=None,
    
    334
    +      is_extension=False, extension_scope=None,
    
    335
    +      serialized_options=None, file=DESCRIPTOR),
    
    336
    +    _descriptor.FieldDescriptor(
    
    337
    +      name='name', full_name='buildgrid.v2.MetricRecord.name', index=3,
    
    338
    +      number=4, type=9, cpp_type=9, label=1,
    
    339
    +      has_default_value=False, default_value=_b("").decode('utf-8'),
    
    340
    +      message_type=None, enum_type=None, containing_type=None,
    
    341
    +      is_extension=False, extension_scope=None,
    
    342
    +      serialized_options=None, file=DESCRIPTOR),
    
    343
    +    _descriptor.FieldDescriptor(
    
    344
    +      name='count', full_name='buildgrid.v2.MetricRecord.count', index=4,
    
    345
    +      number=5, type=5, cpp_type=1, label=1,
    
    346
    +      has_default_value=False, default_value=0,
    
    347
    +      message_type=None, enum_type=None, containing_type=None,
    
    348
    +      is_extension=False, extension_scope=None,
    
    349
    +      serialized_options=None, file=DESCRIPTOR),
    
    350
    +    _descriptor.FieldDescriptor(
    
    351
    +      name='duration', full_name='buildgrid.v2.MetricRecord.duration', index=5,
    
    352
    +      number=6, type=11, cpp_type=10, label=1,
    
    353
    +      has_default_value=False, default_value=None,
    
    354
    +      message_type=None, enum_type=None, containing_type=None,
    
    355
    +      is_extension=False, extension_scope=None,
    
    356
    +      serialized_options=None, file=DESCRIPTOR),
    
    357
    +    _descriptor.FieldDescriptor(
    
    358
    +      name='value', full_name='buildgrid.v2.MetricRecord.value', index=6,
    
    359
    +      number=7, type=5, cpp_type=1, label=1,
    
    360
    +      has_default_value=False, default_value=0,
    
    361
    +      message_type=None, enum_type=None, containing_type=None,
    
    362
    +      is_extension=False, extension_scope=None,
    
    363
    +      serialized_options=None, file=DESCRIPTOR),
    
    364
    +    _descriptor.FieldDescriptor(
    
    365
    +      name='extra', full_name='buildgrid.v2.MetricRecord.extra', index=7,
    
    366
    +      number=8, type=11, cpp_type=10, label=3,
    
    367
    +      has_default_value=False, default_value=[],
    
    368
    +      message_type=None, enum_type=None, containing_type=None,
    
    369
    +      is_extension=False, extension_scope=None,
    
    370
    +      serialized_options=None, file=DESCRIPTOR),
    
    371
    +  ],
    
    372
    +  extensions=[
    
    373
    +  ],
    
    374
    +  nested_types=[_METRICRECORD_EXTRAENTRY, ],
    
    375
    +  enum_types=[
    
    376
    +    _METRICRECORD_DOMAIN,
    
    377
    +    _METRICRECORD_TYPE,
    
    378
    +  ],
    
    379
    +  serialized_options=None,
    
    380
    +  is_extendable=False,
    
    381
    +  syntax='proto3',
    
    382
    +  extension_ranges=[],
    
    383
    +  oneofs=[
    
    384
    +    _descriptor.OneofDescriptor(
    
    385
    +      name='data', full_name='buildgrid.v2.MetricRecord.data',
    
    386
    +      index=0, containing_type=None, fields=[]),
    
    387
    +  ],
    
    388
    +  serialized_start=619,
    
    389
    +  serialized_end=1088,
    
    390
    +)
    
    391
    +
    
    392
    +_BUSMESSAGE.fields_by_name['log_record'].message_type = _LOGRECORD
    
    393
    +_BUSMESSAGE.fields_by_name['metric_record'].message_type = _METRICRECORD
    
    394
    +_BUSMESSAGE.oneofs_by_name['record'].fields.append(
    
    395
    +  _BUSMESSAGE.fields_by_name['log_record'])
    
    396
    +_BUSMESSAGE.fields_by_name['log_record'].containing_oneof = _BUSMESSAGE.oneofs_by_name['record']
    
    397
    +_BUSMESSAGE.oneofs_by_name['record'].fields.append(
    
    398
    +  _BUSMESSAGE.fields_by_name['metric_record'])
    
    399
    +_BUSMESSAGE.fields_by_name['metric_record'].containing_oneof = _BUSMESSAGE.oneofs_by_name['record']
    
    400
    +_LOGRECORD_EXTRAENTRY.containing_type = _LOGRECORD
    
    401
    +_LOGRECORD.fields_by_name['creation_timestamp'].message_type = google_dot_protobuf_dot_timestamp__pb2._TIMESTAMP
    
    402
    +_LOGRECORD.fields_by_name['level'].enum_type = _LOGRECORD_LEVEL
    
    403
    +_LOGRECORD.fields_by_name['extra'].message_type = _LOGRECORD_EXTRAENTRY
    
    404
    +_LOGRECORD_LEVEL.containing_type = _LOGRECORD
    
    405
    +_METRICRECORD_EXTRAENTRY.containing_type = _METRICRECORD
    
    406
    +_METRICRECORD.fields_by_name['creation_timestamp'].message_type = google_dot_protobuf_dot_timestamp__pb2._TIMESTAMP
    
    407
    +_METRICRECORD.fields_by_name['domain'].enum_type = _METRICRECORD_DOMAIN
    
    408
    +_METRICRECORD.fields_by_name['type'].enum_type = _METRICRECORD_TYPE
    
    409
    +_METRICRECORD.fields_by_name['duration'].message_type = google_dot_protobuf_dot_duration__pb2._DURATION
    
    410
    +_METRICRECORD.fields_by_name['extra'].message_type = _METRICRECORD_EXTRAENTRY
    
    411
    +_METRICRECORD_DOMAIN.containing_type = _METRICRECORD
    
    412
    +_METRICRECORD_TYPE.containing_type = _METRICRECORD
    
    413
    +_METRICRECORD.oneofs_by_name['data'].fields.append(
    
    414
    +  _METRICRECORD.fields_by_name['count'])
    
    415
    +_METRICRECORD.fields_by_name['count'].containing_oneof = _METRICRECORD.oneofs_by_name['data']
    
    416
    +_METRICRECORD.oneofs_by_name['data'].fields.append(
    
    417
    +  _METRICRECORD.fields_by_name['duration'])
    
    418
    +_METRICRECORD.fields_by_name['duration'].containing_oneof = _METRICRECORD.oneofs_by_name['data']
    
    419
    +_METRICRECORD.oneofs_by_name['data'].fields.append(
    
    420
    +  _METRICRECORD.fields_by_name['value'])
    
    421
    +_METRICRECORD.fields_by_name['value'].containing_oneof = _METRICRECORD.oneofs_by_name['data']
    
    422
    +DESCRIPTOR.message_types_by_name['BusMessage'] = _BUSMESSAGE
    
    423
    +DESCRIPTOR.message_types_by_name['LogRecord'] = _LOGRECORD
    
    424
    +DESCRIPTOR.message_types_by_name['MetricRecord'] = _METRICRECORD
    
    425
    +_sym_db.RegisterFileDescriptor(DESCRIPTOR)
    
    426
    +
    
    427
    +BusMessage = _reflection.GeneratedProtocolMessageType('BusMessage', (_message.Message,), dict(
    
    428
    +  DESCRIPTOR = _BUSMESSAGE,
    
    429
    +  __module__ = 'buildgrid.v2.monitoring_pb2'
    
    430
    +  # @@protoc_insertion_point(class_scope:buildgrid.v2.BusMessage)
    
    431
    +  ))
    
    432
    +_sym_db.RegisterMessage(BusMessage)
    
    433
    +
    
    434
    +LogRecord = _reflection.GeneratedProtocolMessageType('LogRecord', (_message.Message,), dict(
    
    435
    +
    
    436
    +  ExtraEntry = _reflection.GeneratedProtocolMessageType('ExtraEntry', (_message.Message,), dict(
    
    437
    +    DESCRIPTOR = _LOGRECORD_EXTRAENTRY,
    
    438
    +    __module__ = 'buildgrid.v2.monitoring_pb2'
    
    439
    +    # @@protoc_insertion_point(class_scope:buildgrid.v2.LogRecord.ExtraEntry)
    
    440
    +    ))
    
    441
    +  ,
    
    442
    +  DESCRIPTOR = _LOGRECORD,
    
    443
    +  __module__ = 'buildgrid.v2.monitoring_pb2'
    
    444
    +  # @@protoc_insertion_point(class_scope:buildgrid.v2.LogRecord)
    
    445
    +  ))
    
    446
    +_sym_db.RegisterMessage(LogRecord)
    
    447
    +_sym_db.RegisterMessage(LogRecord.ExtraEntry)
    
    448
    +
    
    449
    +MetricRecord = _reflection.GeneratedProtocolMessageType('MetricRecord', (_message.Message,), dict(
    
    450
    +
    
    451
    +  ExtraEntry = _reflection.GeneratedProtocolMessageType('ExtraEntry', (_message.Message,), dict(
    
    452
    +    DESCRIPTOR = _METRICRECORD_EXTRAENTRY,
    
    453
    +    __module__ = 'buildgrid.v2.monitoring_pb2'
    
    454
    +    # @@protoc_insertion_point(class_scope:buildgrid.v2.MetricRecord.ExtraEntry)
    
    455
    +    ))
    
    456
    +  ,
    
    457
    +  DESCRIPTOR = _METRICRECORD,
    
    458
    +  __module__ = 'buildgrid.v2.monitoring_pb2'
    
    459
    +  # @@protoc_insertion_point(class_scope:buildgrid.v2.MetricRecord)
    
    460
    +  ))
    
    461
    +_sym_db.RegisterMessage(MetricRecord)
    
    462
    +_sym_db.RegisterMessage(MetricRecord.ExtraEntry)
    
    463
    +
    
    464
    +
    
    465
    +_LOGRECORD_EXTRAENTRY._options = None
    
    466
    +_METRICRECORD_EXTRAENTRY._options = None
    
    467
    +# @@protoc_insertion_point(module_scope)

  • buildgrid/_protos/buildgrid/v2/monitoring_pb2_grpc.py
    1
    +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
    
    2
    +import grpc
    
    3
    +

  • buildgrid/server/_monitoring.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import asyncio
    
    17
    +from enum import Enum
    
    18
    +import sys
    
    19
    +
    
    20
    +from google.protobuf import json_format
    
    21
    +
    
    22
    +from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    23
    +
    
    24
    +
    
    25
    +class MonitoringOutputType(Enum):
    
    26
    +    # Standard output stream.
    
    27
    +    STDOUT = 'stdout'
    
    28
    +    # On-disk file.
    
    29
    +    FILE = 'file'
    
    30
    +    # UNIX domain socket.
    
    31
    +    SOCKET = 'socket'
    
    32
    +
    
    33
    +
    
    34
    +class MonitoringOutputFormat(Enum):
    
    35
    +    # Protobuf binary format.
    
    36
    +    BINARY = 'binary'
    
    37
    +    # JSON format.
    
    38
    +    JSON = 'json'
    
    39
    +
    
    40
    +
    
    41
    +class MonitoringBus:
    
    42
    +
    
    43
    +    def __init__(self, event_loop,
    
    44
    +                 endpoint_type=MonitoringOutputType.SOCKET, endpoint_location=None,
    
    45
    +                 serialisation_format=MonitoringOutputFormat.BINARY):
    
    46
    +        self.__event_loop = event_loop
    
    47
    +        self.__streaming_task = None
    
    48
    +
    
    49
    +        self.__message_queue = asyncio.Queue(loop=self.__event_loop)
    
    50
    +        self.__sequence_number = 1
    
    51
    +
    
    52
    +        self.__output_location = None
    
    53
    +        self.__async_output = False
    
    54
    +        self.__json_output = False
    
    55
    +
    
    56
    +        if endpoint_type == MonitoringOutputType.FILE:
    
    57
    +            self.__output_location = endpoint_location
    
    58
    +
    
    59
    +        elif endpoint_type == MonitoringOutputType.SOCKET:
    
    60
    +            self.__output_location = endpoint_location
    
    61
    +            self.__async_output = True
    
    62
    +
    
    63
    +        if serialisation_format == MonitoringOutputFormat.JSON:
    
    64
    +            self.__json_output = True
    
    65
    +
    
    66
    +    # --- Public API ---
    
    67
    +
    
    68
    +    def start(self):
    
    69
    +        """Starts the monitoring bus worker task."""
    
    70
    +        if self.__streaming_task is not None:
    
    71
    +            return
    
    72
    +
    
    73
    +        self.__streaming_task = asyncio.ensure_future(
    
    74
    +            self._streaming_worker(), loop=self.__event_loop)
    
    75
    +
    
    76
    +    def stop(self):
    
    77
    +        """Cancels the monitoring bus worker task."""
    
    78
    +        if self.__streaming_task is None:
    
    79
    +            return
    
    80
    +
    
    81
    +        self.__streaming_task.cancel()
    
    82
    +
    
    83
    +    async def send_record(self, record):
    
    84
    +        """Publishes a record onto the bus asynchronously.
    
    85
    +
    
    86
    +        Args:
    
    87
    +            record (Message): The
    
    88
    +        """
    
    89
    +        await self.__message_queue.put(record)
    
    90
    +
    
    91
    +    def send_record_nowait(self, record):
    
    92
    +        """Publishes a record onto the bus.
    
    93
    +
    
    94
    +        Args:
    
    95
    +            record (Message): The
    
    96
    +        """
    
    97
    +        self.__message_queue.put_nowait(record)
    
    98
    +
    
    99
    +    # --- Private API ---
    
    100
    +
    
    101
    +    async def _streaming_worker(self):
    
    102
    +        """Handles bus messages steaming work."""
    
    103
    +        async def __streaming_worker(end_points):
    
    104
    +            record = await self.__message_queue.get()
    
    105
    +
    
    106
    +            message = monitoring_pb2.BusMessage()
    
    107
    +            message.sequence_number = self.__sequence_number
    
    108
    +
    
    109
    +            if record.DESCRIPTOR is monitoring_pb2.LogRecord.DESCRIPTOR:
    
    110
    +                message.log_record.CopyFrom(record)
    
    111
    +
    
    112
    +            elif record.DESCRIPTOR is monitoring_pb2.MetricRecord.DESCRIPTOR:
    
    113
    +                message.metric_record.CopyFrom(record)
    
    114
    +
    
    115
    +            else:
    
    116
    +                return False
    
    117
    +
    
    118
    +            if self.__json_output:
    
    119
    +                binary_message = json_format.MessageToJson(message).encode()
    
    120
    +            else:
    
    121
    +                binary_message = message.SerializeToString()
    
    122
    +
    
    123
    +            for end_point in end_points:
    
    124
    +                end_point.write(binary_message)
    
    125
    +
    
    126
    +            return True
    
    127
    +
    
    128
    +        output_writers, output_file = [], None
    
    129
    +
    
    130
    +        async def __client_connected_callback(reader, writer):
    
    131
    +            output_writers.append(writer)
    
    132
    +
    
    133
    +        try:
    
    134
    +            if self.__async_output and self.__output_location:
    
    135
    +                await asyncio.start_unix_server(
    
    136
    +                    __client_connected_callback, path=self.__output_location,
    
    137
    +                    loop=self.__event_loop)
    
    138
    +
    
    139
    +                while True:
    
    140
    +                    if await __streaming_worker(output_writers):
    
    141
    +                        self.__sequence_number += 1
    
    142
    +
    
    143
    +                        for writer in output_writers:
    
    144
    +                            await writer.drain()
    
    145
    +
    
    146
    +            elif self.__output_location:
    
    147
    +                output_file = open(self.__output_location, mode='wb')
    
    148
    +
    
    149
    +                output_writers.append(output_file)
    
    150
    +
    
    151
    +                while True:
    
    152
    +                    if await __streaming_worker(iter(output_file)):
    
    153
    +                        self.__sequence_number += 1
    
    154
    +
    
    155
    +            else:
    
    156
    +                output_writers.append(sys.stdout.buffer)
    
    157
    +
    
    158
    +                while True:
    
    159
    +                    if await __streaming_worker(output_writers):
    
    160
    +                        self.__sequence_number += 1
    
    161
    +
    
    162
    +        except asyncio.CancelledError:
    
    163
    +            if output_file is not None:
    
    164
    +                output_file.close()
    
    165
    +
    
    166
    +            elif output_writers:
    
    167
    +                for writer in output_writers:
    
    168
    +                    writer.close()
    
    169
    +                    await writer.wait_closed()

  • buildgrid/server/bots/instance.py
    ... ... @@ -37,6 +37,10 @@ class BotsInterface:
    37 37
             self._bot_sessions = {}
    
    38 38
             self._scheduler = scheduler
    
    39 39
     
    
    40
    +    @property
    
    41
    +    def scheduler(self):
    
    42
    +        return self._scheduler
    
    43
    +
    
    40 44
         def register_instance_with_server(self, instance_name, server):
    
    41 45
             server.add_bots_interface(self, instance_name)
    
    42 46
     
    

  • buildgrid/server/bots/service.py
    ... ... @@ -23,8 +23,9 @@ import logging
    23 23
     
    
    24 24
     import grpc
    
    25 25
     
    
    26
    -from google.protobuf.empty_pb2 import Empty
    
    26
    +from google.protobuf import empty_pb2, timestamp_pb2
    
    27 27
     
    
    28
    +from buildgrid._enums import BotStatus
    
    28 29
     from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    29 30
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    30 31
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    ... ... @@ -32,24 +33,65 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grp
    32 33
     
    
    33 34
     class BotsService(bots_pb2_grpc.BotsServicer):
    
    34 35
     
    
    35
    -    def __init__(self, server):
    
    36
    +    def __init__(self, server, monitor=False):
    
    36 37
             self.__logger = logging.getLogger(__name__)
    
    37 38
     
    
    39
    +        self.__bots_by_status = None
    
    40
    +        self.__bots_by_instance = None
    
    41
    +        self.__bots = None
    
    42
    +
    
    38 43
             self._instances = {}
    
    39 44
     
    
    40 45
             bots_pb2_grpc.add_BotsServicer_to_server(self, server)
    
    41 46
     
    
    42
    -    def add_instance(self, name, instance):
    
    43
    -        self._instances[name] = instance
    
    47
    +        self._is_instrumented = monitor
    
    48
    +
    
    49
    +        if self._is_instrumented:
    
    50
    +            self.__bots_by_status = {}
    
    51
    +            self.__bots_by_instance = {}
    
    52
    +            self.__bots = {}
    
    53
    +
    
    54
    +            self.__bots_by_status[BotStatus.OK] = set()
    
    55
    +            self.__bots_by_status[BotStatus.UNHEALTHY] = set()
    
    56
    +            self.__bots_by_status[BotStatus.HOST_REBOOTING] = set()
    
    57
    +            self.__bots_by_status[BotStatus.BOT_TERMINATING] = set()
    
    58
    +
    
    59
    +    # --- Public API ---
    
    60
    +
    
    61
    +    def add_instance(self, instance_name, instance):
    
    62
    +        self._instances[instance_name] = instance
    
    63
    +
    
    64
    +        if self._is_instrumented:
    
    65
    +            self.__bots_by_instance[instance_name] = 0
    
    66
    +
    
    67
    +    # --- Public API: Servicer ---
    
    44 68
     
    
    45 69
         def CreateBotSession(self, request, context):
    
    70
    +        """Handles CreateBotSessionRequest messages.
    
    71
    +
    
    72
    +        Args:
    
    73
    +            request (CreateBotSessionRequest): The incoming RPC request.
    
    74
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    75
    +        """
    
    46 76
             self.__logger.debug("CreateBotSession request from [%s]", context.peer())
    
    47 77
     
    
    78
    +        instance_name = request.parent
    
    79
    +        bot_status = BotStatus(request.bot_session.status)
    
    80
    +        bot_id = request.bot_session.bot_id
    
    81
    +
    
    48 82
             try:
    
    49
    -            parent = request.parent
    
    50
    -            instance = self._get_instance(request.parent)
    
    51
    -            return instance.create_bot_session(parent,
    
    52
    -                                               request.bot_session)
    
    83
    +            instance = self._get_instance(instance_name)
    
    84
    +            bot_session = instance.create_bot_session(instance_name,
    
    85
    +                                                      request.bot_session)
    
    86
    +            now = timestamp_pb2.Timestamp()
    
    87
    +            now.GetCurrentTime()
    
    88
    +
    
    89
    +            if self._is_instrumented:
    
    90
    +                self.__bots[bot_id] = now
    
    91
    +                self.__bots_by_instance[instance_name] += 1
    
    92
    +                self.__bots_by_status[bot_status].add(bot_id)
    
    93
    +
    
    94
    +            return bot_session
    
    53 95
     
    
    54 96
             except InvalidArgumentError as e:
    
    55 97
                 self.__logger.error(e)
    
    ... ... @@ -59,17 +101,36 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    59 101
             return bots_pb2.BotSession()
    
    60 102
     
    
    61 103
         def UpdateBotSession(self, request, context):
    
    104
    +        """Handles UpdateBotSessionRequest messages.
    
    105
    +
    
    106
    +        Args:
    
    107
    +            request (UpdateBotSessionRequest): The incoming RPC request.
    
    108
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    109
    +        """
    
    62 110
             self.__logger.debug("UpdateBotSession request from [%s]", context.peer())
    
    63 111
     
    
    112
    +        names = request.name.split("/")
    
    113
    +        bot_status = BotStatus(request.bot_session.status)
    
    114
    +        bot_id = request.bot_session.bot_id
    
    115
    +
    
    64 116
             try:
    
    65
    -            names = request.name.split("/")
    
    66
    -            # Operation name should be in format:
    
    67
    -            # {instance/name}/{uuid}
    
    68
    -            instance_name = ''.join(names[0:-1])
    
    117
    +            instance_name = '/'.join(names[:-1])
    
    69 118
     
    
    70 119
                 instance = self._get_instance(instance_name)
    
    71
    -            return instance.update_bot_session(request.name,
    
    72
    -                                               request.bot_session)
    
    120
    +            bot_session = instance.update_bot_session(request.name,
    
    121
    +                                                      request.bot_session)
    
    122
    +
    
    123
    +            if self._is_instrumented:
    
    124
    +                self.__bots[bot_id].GetCurrentTime()
    
    125
    +                if bot_id not in self.__bots_by_status[bot_status]:
    
    126
    +                    self.__bots_by_status[BotStatus.OK].discard(bot_id)
    
    127
    +                    self.__bots_by_status[BotStatus.UNHEALTHY].discard(bot_id)
    
    128
    +                    self.__bots_by_status[BotStatus.HOST_REBOOTING].discard(bot_id)
    
    129
    +                    self.__bots_by_status[BotStatus.BOT_TERMINATING].discard(bot_id)
    
    130
    +
    
    131
    +                    self.__bots_by_status[bot_status].add(bot_id)
    
    132
    +
    
    133
    +            return bot_session
    
    73 134
     
    
    74 135
             except InvalidArgumentError as e:
    
    75 136
                 self.__logger.error(e)
    
    ... ... @@ -89,10 +150,47 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    89 150
             return bots_pb2.BotSession()
    
    90 151
     
    
    91 152
         def PostBotEventTemp(self, request, context):
    
    153
    +        """Handles PostBotEventTempRequest messages.
    
    154
    +
    
    155
    +        Args:
    
    156
    +            request (PostBotEventTempRequest): The incoming RPC request.
    
    157
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    158
    +        """
    
    92 159
             self.__logger.debug("PostBotEventTemp request from [%s]", context.peer())
    
    93 160
     
    
    94 161
             context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    95
    -        return Empty()
    
    162
    +
    
    163
    +        return empty_pb2.Empty()
    
    164
    +
    
    165
    +    # --- Public API: Monitoring ---
    
    166
    +
    
    167
    +    @property
    
    168
    +    def is_instrumented(self):
    
    169
    +        return self._is_instrumented
    
    170
    +
    
    171
    +    def query_n_bots(self):
    
    172
    +        if self.__bots is not None:
    
    173
    +            return len(self.__bots)
    
    174
    +
    
    175
    +        return 0
    
    176
    +
    
    177
    +    def query_n_bots_for_instance(self, instance_name):
    
    178
    +        try:
    
    179
    +            if self.__bots_by_instance is not None:
    
    180
    +                return len(self.__bots_by_instance[instance_name])
    
    181
    +        except KeyError:
    
    182
    +            pass
    
    183
    +        return 0
    
    184
    +
    
    185
    +    def query_n_bots_for_status(self, bot_status):
    
    186
    +        try:
    
    187
    +            if self.__bots_by_status is not None:
    
    188
    +                return len(self.__bots_by_status[bot_status])
    
    189
    +        except KeyError:
    
    190
    +            pass
    
    191
    +        return 0
    
    192
    +
    
    193
    +    # --- Private API ---
    
    96 194
     
    
    97 195
         def _get_instance(self, name):
    
    98 196
             try:
    

  • buildgrid/server/cas/instance.py
    ... ... @@ -24,7 +24,7 @@ import logging
    24 24
     from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    25 25
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    26 26
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    27
    -from buildgrid.settings import HASH
    
    27
    +from buildgrid.settings import HASH, HASH_LENGTH
    
    28 28
     
    
    29 29
     
    
    30 30
     class ContentAddressableStorageInstance:
    
    ... ... @@ -71,15 +71,12 @@ class ByteStreamInstance:
    71 71
         def register_instance_with_server(self, instance_name, server):
    
    72 72
             server.add_bytestream_instance(self, instance_name)
    
    73 73
     
    
    74
    -    def read(self, path, read_offset, read_limit):
    
    75
    -        storage = self._storage
    
    76
    -
    
    77
    -        if path[0] == "blobs":
    
    78
    -            path = [""] + path
    
    74
    +    def read(self, digest_hash, digest_size, read_offset, read_limit):
    
    75
    +        if len(digest_hash) != HASH_LENGTH or not digest_size.isdigit():
    
    76
    +            raise InvalidArgumentError("Invalid digest [{}/{}]"
    
    77
    +                                       .format(digest_hash, digest_size))
    
    79 78
     
    
    80
    -        # Parse/verify resource name.
    
    81
    -        # Read resource names look like "[instance/]blobs/abc123hash/99".
    
    82
    -        digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
    
    79
    +        digest = re_pb2.Digest(hash=digest_hash, size_bytes=int(digest_size))
    
    83 80
     
    
    84 81
             # Check the given read offset and limit.
    
    85 82
             if read_offset < 0 or read_offset > digest.size_bytes:
    
    ... ... @@ -95,7 +92,7 @@ class ByteStreamInstance:
    95 92
                 raise InvalidArgumentError("Negative read_limit is invalid")
    
    96 93
     
    
    97 94
             # Read the blob from storage and send its contents to the client.
    
    98
    -        result = storage.get_blob(digest)
    
    95
    +        result = self._storage.get_blob(digest)
    
    99 96
             if result is None:
    
    100 97
                 raise NotFoundError("Blob not found")
    
    101 98
     
    
    ... ... @@ -110,51 +107,35 @@ class ByteStreamInstance:
    110 107
                     data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
    
    111 108
                 bytes_remaining -= self.BLOCK_SIZE
    
    112 109
     
    
    113
    -    def write(self, requests):
    
    114
    -        storage = self._storage
    
    110
    +    def write(self, digest_hash, digest_size, first_block, other_blocks):
    
    111
    +        if len(digest_hash) != HASH_LENGTH or not digest_size.isdigit():
    
    112
    +            raise InvalidArgumentError("Invalid digest [{}/{}]"
    
    113
    +                                       .format(digest_hash, digest_size))
    
    115 114
     
    
    116
    -        first_request = next(requests)
    
    117
    -        path = first_request.resource_name.split("/")
    
    115
    +        digest = re_pb2.Digest(hash=digest_hash, size_bytes=int(digest_size))
    
    118 116
     
    
    119
    -        if path[0] == "uploads":
    
    120
    -            path = [""] + path
    
    121
    -
    
    122
    -        digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
    
    123
    -        write_session = storage.begin_write(digest)
    
    117
    +        write_session = self._storage.begin_write(digest)
    
    124 118
     
    
    125 119
             # Start the write session and write the first request's data.
    
    126
    -        write_session.write(first_request.data)
    
    127
    -        hash_ = HASH(first_request.data)
    
    128
    -        bytes_written = len(first_request.data)
    
    129
    -        finished = first_request.finish_write
    
    130
    -
    
    131
    -        # Handle subsequent write requests.
    
    132
    -        while not finished:
    
    133
    -
    
    134
    -            for request in requests:
    
    135
    -                if finished:
    
    136
    -                    raise InvalidArgumentError("Write request sent after write finished")
    
    137
    -
    
    138
    -                elif request.write_offset != bytes_written:
    
    139
    -                    raise InvalidArgumentError("Invalid write offset")
    
    120
    +        write_session.write(first_block)
    
    140 121
     
    
    141
    -                elif request.resource_name and request.resource_name != first_request.resource_name:
    
    142
    -                    raise InvalidArgumentError("Resource name changed mid-write")
    
    122
    +        computed_hash = HASH(first_block)
    
    123
    +        bytes_written = len(first_block)
    
    143 124
     
    
    144
    -                finished = request.finish_write
    
    145
    -                bytes_written += len(request.data)
    
    146
    -                if bytes_written > digest.size_bytes:
    
    147
    -                    raise InvalidArgumentError("Wrote too much data to blob")
    
    125
    +        # Handle subsequent write requests.
    
    126
    +        for next_block in other_blocks:
    
    127
    +            write_session.write(next_block)
    
    148 128
     
    
    149
    -                write_session.write(request.data)
    
    150
    -                hash_.update(request.data)
    
    129
    +            computed_hash.update(next_block)
    
    130
    +            bytes_written += len(next_block)
    
    151 131
     
    
    152 132
             # Check that the data matches the provided digest.
    
    153
    -        if bytes_written != digest.size_bytes or not finished:
    
    133
    +        if bytes_written != digest.size_bytes:
    
    154 134
                 raise NotImplementedError("Cannot close stream before finishing write")
    
    155 135
     
    
    156
    -        elif hash_.hexdigest() != digest.hash:
    
    136
    +        elif computed_hash.hexdigest() != digest.hash:
    
    157 137
                 raise InvalidArgumentError("Data does not match hash")
    
    158 138
     
    
    159
    -        storage.commit_write(digest, write_session)
    
    139
    +        self._storage.commit_write(digest, write_session)
    
    140
    +
    
    160 141
             return bytestream_pb2.WriteResponse(committed_size=bytes_written)

  • buildgrid/server/cas/service.py
    ... ... @@ -21,7 +21,6 @@ Implements the Content Addressable Storage API and ByteStream API.
    21 21
     """
    
    22 22
     
    
    23 23
     
    
    24
    -from itertools import tee
    
    25 24
     import logging
    
    26 25
     
    
    27 26
     import grpc
    
    ... ... @@ -115,27 +114,30 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    115 114
         def Read(self, request, context):
    
    116 115
             self.__logger.debug("Read request from [%s]", context.peer())
    
    117 116
     
    
    117
    +        names = request.resource_name.split('/')
    
    118
    +
    
    118 119
             try:
    
    119
    -            path = request.resource_name.split("/")
    
    120
    -            instance_name = path[0]
    
    120
    +            instance_name = ''
    
    121
    +            # Format: "{instance_name}/blobs/{hash}/{size}":
    
    122
    +            if len(names) < 3 or names[-3] != 'blobs':
    
    123
    +                raise InvalidArgumentError("Invalid resource name: [{}]"
    
    124
    +                                           .format(request.resource_name))
    
    121 125
     
    
    122
    -            # TODO: Decide on default instance name
    
    123
    -            if path[0] == "blobs":
    
    124
    -                if len(path) < 3 or not path[2].isdigit():
    
    125
    -                    raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
    
    126
    -                instance_name = ""
    
    126
    +            elif names[0] != 'blobs':
    
    127
    +                index = names.index('blobs')
    
    128
    +                instance_name = '/'.join(names[:index])
    
    129
    +                names = names[index:]
    
    127 130
     
    
    128
    -            elif path[1] == "blobs":
    
    129
    -                if len(path) < 4 or not path[3].isdigit():
    
    130
    -                    raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
    
    131
    +            if len(names) < 3:
    
    132
    +                raise InvalidArgumentError("Invalid resource name: [{}]"
    
    133
    +                                           .format(request.resource_name))
    
    131 134
     
    
    132
    -            else:
    
    133
    -                raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
    
    135
    +            hash_, size_bytes = names[1], names[2]
    
    134 136
     
    
    135 137
                 instance = self._get_instance(instance_name)
    
    136
    -            yield from instance.read(path,
    
    137
    -                                     request.read_offset,
    
    138
    -                                     request.read_limit)
    
    138
    +
    
    139
    +            yield from instance.read(hash_, size_bytes,
    
    140
    +                                     request.read_offset, request.read_limit)
    
    139 141
     
    
    140 142
             except InvalidArgumentError as e:
    
    141 143
                 self.__logger.error(e)
    
    ... ... @@ -158,31 +160,31 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    158 160
         def Write(self, requests, context):
    
    159 161
             self.__logger.debug("Write request from [%s]", context.peer())
    
    160 162
     
    
    161
    -        try:
    
    162
    -            requests, request_probe = tee(requests, 2)
    
    163
    -            first_request = next(request_probe)
    
    164
    -
    
    165
    -            path = first_request.resource_name.split("/")
    
    163
    +        request = next(requests)
    
    164
    +        names = request.resource_name.split('/')
    
    166 165
     
    
    167
    -            instance_name = path[0]
    
    166
    +        try:
    
    167
    +            instance_name = ''
    
    168
    +            # Format: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}/{anything}":
    
    169
    +            if len(names) < 5 or 'uploads' not in names or 'blobs' not in names:
    
    170
    +                raise InvalidArgumentError("Invalid resource name: [{}]"
    
    171
    +                                           .format(request.resource_name))
    
    168 172
     
    
    169
    -            # TODO: Sort out no instance name
    
    170
    -            if path[0] == "uploads":
    
    171
    -                if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit():
    
    172
    -                    raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
    
    173
    -                instance_name = ""
    
    173
    +            elif names[0] != 'uploads':
    
    174
    +                index = names.index('uploads')
    
    175
    +                instance_name = '/'.join(names[:index])
    
    176
    +                names = names[index:]
    
    174 177
     
    
    175
    -            elif path[1] == "uploads":
    
    176
    -                if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit():
    
    177
    -                    raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
    
    178
    +            if len(names) < 5:
    
    179
    +                raise InvalidArgumentError("Invalid resource name: [{}]"
    
    180
    +                                           .format(request.resource_name))
    
    178 181
     
    
    179
    -            else:
    
    180
    -                raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
    
    182
    +            _, hash_, size_bytes = names[1], names[3], names[4]
    
    181 183
     
    
    182 184
                 instance = self._get_instance(instance_name)
    
    183
    -            response = instance.write(requests)
    
    184 185
     
    
    185
    -            return response
    
    186
    +            return instance.write(hash_, size_bytes, request.data,
    
    187
    +                                  [request.data for request in requests])
    
    186 188
     
    
    187 189
             except NotImplementedError as e:
    
    188 190
                 self.__logger.error(e)
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -35,6 +35,10 @@ class ExecutionInstance:
    35 35
             self._storage = storage
    
    36 36
             self._scheduler = scheduler
    
    37 37
     
    
    38
    +    @property
    
    39
    +    def scheduler(self):
    
    40
    +        return self._scheduler
    
    41
    +
    
    38 42
         def register_instance_with_server(self, instance_name, server):
    
    39 43
             server.add_execution_instance(self, instance_name)
    
    40 44
     
    

  • buildgrid/server/execution/service.py
    ... ... @@ -33,30 +33,84 @@ from buildgrid._protos.google.longrunning import operations_pb2
    33 33
     
    
    34 34
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    35 35
     
    
    36
    -    def __init__(self, server):
    
    36
    +    def __init__(self, server, monitor=False):
    
    37 37
             self.__logger = logging.getLogger(__name__)
    
    38 38
     
    
    39
    +        self.__peers_by_instance = None
    
    40
    +        self.__peers = None
    
    41
    +
    
    39 42
             self._instances = {}
    
    43
    +
    
    40 44
             remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
    
    41 45
     
    
    42
    -    def add_instance(self, name, instance):
    
    43
    -        self._instances[name] = instance
    
    46
    +        self._is_instrumented = monitor
    
    47
    +
    
    48
    +        if self._is_instrumented:
    
    49
    +            self.__peers_by_instance = {}
    
    50
    +            self.__peers = {}
    
    51
    +
    
    52
    +    # --- Public API ---
    
    53
    +
    
    54
    +    def add_instance(self, instance_name, instance):
    
    55
    +        """Registers a new servicer instance.
    
    56
    +
    
    57
    +        Args:
    
    58
    +            instance_name (str): The new instance's name.
    
    59
    +            instance (ExecutionInstance): The new instance itself.
    
    60
    +        """
    
    61
    +        self._instances[instance_name] = instance
    
    62
    +
    
    63
    +        if self._is_instrumented:
    
    64
    +            self.__peers_by_instance[instance_name] = set()
    
    65
    +
    
    66
    +    def get_scheduler(self, instance_name):
    
    67
    +        """Retrieves a reference to the scheduler for an instance.
    
    68
    +
    
    69
    +        Args:
    
    70
    +            instance_name (str): The name of the instance to query.
    
    71
    +
    
    72
    +        Returns:
    
    73
    +            Scheduler: A reference to the scheduler for `instance_name`.
    
    74
    +
    
    75
    +        Raises:
    
    76
    +            InvalidArgumentError: If no instance named `instance_name` exists.
    
    77
    +        """
    
    78
    +        instance = self._get_instance(instance_name)
    
    79
    +
    
    80
    +        return instance.scheduler
    
    81
    +
    
    82
    +    # --- Public API: Servicer ---
    
    44 83
     
    
    45 84
         def Execute(self, request, context):
    
    85
    +        """Handles ExecuteRequest messages.
    
    86
    +
    
    87
    +        Args:
    
    88
    +            request (ExecuteRequest): The incoming RPC request.
    
    89
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    90
    +        """
    
    46 91
             self.__logger.debug("Execute request from [%s]", context.peer())
    
    47 92
     
    
    93
    +        instance_name = request.instance_name
    
    94
    +        message_queue = queue.Queue()
    
    95
    +        peer = context.peer()
    
    96
    +
    
    48 97
             try:
    
    49
    -            message_queue = queue.Queue()
    
    50
    -            instance = self._get_instance(request.instance_name)
    
    98
    +            instance = self._get_instance(instance_name)
    
    51 99
                 operation = instance.execute(request.action_digest,
    
    52 100
                                              request.skip_cache_lookup,
    
    53 101
                                              message_queue)
    
    54 102
     
    
    55
    -            context.add_callback(partial(instance.unregister_message_client,
    
    56
    -                                         operation.name, message_queue))
    
    103
    +            context.add_callback(partial(self._rpc_termination_callback,
    
    104
    +                                         peer, instance_name, operation.name, message_queue))
    
    57 105
     
    
    58
    -            instanced_op_name = "{}/{}".format(request.instance_name,
    
    59
    -                                               operation.name)
    
    106
    +            if self._is_instrumented:
    
    107
    +                if peer not in self.__peers:
    
    108
    +                    self.__peers_by_instance[instance_name].add(peer)
    
    109
    +                    self.__peers[peer] = 1
    
    110
    +                else:
    
    111
    +                    self.__peers[peer] += 1
    
    112
    +
    
    113
    +            instanced_op_name = "{}/{}".format(instance_name, operation.name)
    
    60 114
     
    
    61 115
                 self.__logger.info("Operation name: [%s]", instanced_op_name)
    
    62 116
     
    
    ... ... @@ -80,23 +134,33 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    80 134
                 yield operations_pb2.Operation()
    
    81 135
     
    
    82 136
         def WaitExecution(self, request, context):
    
    83
    -        self.__logger.debug("WaitExecution request from [%s]", context.peer())
    
    137
    +        """Handles WaitExecutionRequest messages.
    
    84 138
     
    
    85
    -        try:
    
    86
    -            names = request.name.split("/")
    
    139
    +        Args:
    
    140
    +            request (WaitExecutionRequest): The incoming RPC request.
    
    141
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    142
    +        """
    
    143
    +        self.__logger.debug("WaitExecution request from [%s]", context.peer())
    
    87 144
     
    
    88
    -            # Operation name should be in format:
    
    89
    -            # {instance/name}/{operation_id}
    
    90
    -            instance_name = ''.join(names[0:-1])
    
    145
    +        names = request.name.split('/')
    
    146
    +        instance_name = '/'.join(names[:-1])
    
    147
    +        operation_name = names[-1]
    
    148
    +        message_queue = queue.Queue()
    
    149
    +        peer = context.peer()
    
    91 150
     
    
    92
    -            message_queue = queue.Queue()
    
    93
    -            operation_name = names[-1]
    
    151
    +        try:
    
    94 152
                 instance = self._get_instance(instance_name)
    
    95 153
     
    
    96 154
                 instance.register_message_client(operation_name, message_queue)
    
    155
    +            context.add_callback(partial(self._rpc_termination_callback,
    
    156
    +                                         peer, instance_name, operation_name, message_queue))
    
    97 157
     
    
    98
    -            context.add_callback(partial(instance.unregister_message_client,
    
    99
    -                                         operation_name, message_queue))
    
    158
    +            if self._is_instrumented:
    
    159
    +                if peer not in self.__peers:
    
    160
    +                    self.__peers_by_instance[instance_name].add(peer)
    
    161
    +                    self.__peers[peer] = 1
    
    162
    +                else:
    
    163
    +                    self.__peers[peer] += 1
    
    100 164
     
    
    101 165
                 for operation in instance.stream_operation_updates(message_queue,
    
    102 166
                                                                    operation_name):
    
    ... ... @@ -111,6 +175,39 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    111 175
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    112 176
                 yield operations_pb2.Operation()
    
    113 177
     
    
    178
    +    # --- Public API: Monitoring ---
    
    179
    +
    
    180
    +    @property
    
    181
    +    def is_instrumented(self):
    
    182
    +        return self._is_instrumented
    
    183
    +
    
    184
    +    def query_n_clients(self):
    
    185
    +        if self.__peers is not None:
    
    186
    +            return len(self.__peers)
    
    187
    +        return 0
    
    188
    +
    
    189
    +    def query_n_clients_for_instance(self, instance_name):
    
    190
    +        try:
    
    191
    +            if self.__peers_by_instance is not None:
    
    192
    +                return len(self.__peers_by_instance[instance_name])
    
    193
    +        except KeyError:
    
    194
    +            pass
    
    195
    +        return 0
    
    196
    +
    
    197
    +    # --- Private API ---
    
    198
    +
    
    199
    +    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
    
    200
    +        instance = self._get_instance(instance_name)
    
    201
    +
    
    202
    +        instance.unregister_message_client(job_name, message_queue)
    
    203
    +
    
    204
    +        if self._is_instrumented:
    
    205
    +            if self.__peers[peer] > 1:
    
    206
    +                self.__peers[peer] -= 1
    
    207
    +            else:
    
    208
    +                self.__peers_by_instance[instance_name].remove(peer)
    
    209
    +                del self.__peers[peer]
    
    210
    +
    
    114 211
         def _get_instance(self, name):
    
    115 212
             try:
    
    116 213
                 return self._instances[name]
    

  • buildgrid/server/instance.py
    ... ... @@ -13,18 +13,26 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +import asyncio
    
    16 17
     from concurrent import futures
    
    18
    +from datetime import timedelta
    
    17 19
     import logging
    
    18 20
     import os
    
    21
    +import signal
    
    22
    +import time
    
    19 23
     
    
    20 24
     import grpc
    
    21 25
     
    
    22
    -from .cas.service import ByteStreamService, ContentAddressableStorageService
    
    23
    -from .actioncache.service import ActionCacheService
    
    24
    -from .execution.service import ExecutionService
    
    25
    -from .operations.service import OperationsService
    
    26
    -from .bots.service import BotsService
    
    27
    -from .referencestorage.service import ReferenceStorageService
    
    26
    +from buildgrid._enums import MetricRecordDomain, MetricRecordType
    
    27
    +from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    28
    +from buildgrid.server.actioncache.service import ActionCacheService
    
    29
    +from buildgrid.server.bots.service import BotsService
    
    30
    +from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    31
    +from buildgrid.server.execution.service import ExecutionService
    
    32
    +from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
    
    33
    +from buildgrid.server.operations.service import OperationsService
    
    34
    +from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    35
    +from buildgrid.settings import MONITORING_PERIOD
    
    28 36
     
    
    29 37
     
    
    30 38
     class BuildGridServer:
    
    ... ... @@ -34,7 +42,7 @@ class BuildGridServer:
    34 42
         requisite services.
    
    35 43
         """
    
    36 44
     
    
    37
    -    def __init__(self, max_workers=None):
    
    45
    +    def __init__(self, max_workers=None, monitor=False):
    
    38 46
             """Initializes a new :class:`BuildGridServer` instance.
    
    39 47
     
    
    40 48
             Args:
    
    ... ... @@ -46,9 +54,13 @@ class BuildGridServer:
    46 54
                 # Use max_workers default from Python 3.5+
    
    47 55
                 max_workers = (os.cpu_count() or 1) * 5
    
    48 56
     
    
    49
    -        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    57
    +        self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
    
    58
    +        self.__grpc_server = grpc.server(self.__grpc_executor)
    
    50 59
     
    
    51
    -        self._server = server
    
    60
    +        self.__main_loop = asyncio.get_event_loop()
    
    61
    +        self.__monitoring_bus = None
    
    62
    +
    
    63
    +        self.__state_monitoring_task = None
    
    52 64
     
    
    53 65
             self._execution_service = None
    
    54 66
             self._bots_service = None
    
    ... ... @@ -58,15 +70,44 @@ class BuildGridServer:
    58 70
             self._cas_service = None
    
    59 71
             self._bytestream_service = None
    
    60 72
     
    
    73
    +        self._schedulers = {}
    
    74
    +        self._instances = set()
    
    75
    +
    
    76
    +        self._is_instrumented = monitor
    
    77
    +
    
    78
    +        if self._is_instrumented:
    
    79
    +            self.__monitoring_bus = MonitoringBus(
    
    80
    +                self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
    
    81
    +                serialisation_format=MonitoringOutputFormat.JSON)
    
    82
    +
    
    83
    +    # --- Public API ---
    
    84
    +
    
    61 85
         def start(self):
    
    62
    -        """Starts the server.
    
    63
    -        """
    
    64
    -        self._server.start()
    
    86
    +        """Starts the BuildGrid server."""
    
    87
    +        self.__grpc_server.start()
    
    65 88
     
    
    66
    -    def stop(self, grace=0):
    
    67
    -        """Stops the server.
    
    68
    -        """
    
    69
    -        self._server.stop(grace)
    
    89
    +        if self._is_instrumented:
    
    90
    +            self.__monitoring_bus.start()
    
    91
    +
    
    92
    +            self.__state_monitoring_task = asyncio.ensure_future(
    
    93
    +                self._state_monitoring_worker(period=MONITORING_PERIOD),
    
    94
    +                loop=self.__main_loop)
    
    95
    +
    
    96
    +        self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
    
    97
    +
    
    98
    +        self.__main_loop.run_forever()
    
    99
    +
    
    100
    +    def stop(self):
    
    101
    +        """Stops the BuildGrid server."""
    
    102
    +        if self._is_instrumented:
    
    103
    +            if self.__state_monitoring_task is not None:
    
    104
    +                self.__state_monitoring_task.cancel()
    
    105
    +
    
    106
    +            self.__monitoring_bus.stop()
    
    107
    +
    
    108
    +        self.__main_loop.stop()
    
    109
    +
    
    110
    +        self.__grpc_server.stop(None)
    
    70 111
     
    
    71 112
         def add_port(self, address, credentials):
    
    72 113
             """Adds a port to the server.
    
    ... ... @@ -77,14 +118,19 @@ class BuildGridServer:
    77 118
             Args:
    
    78 119
                 address (str): The address with port number.
    
    79 120
                 credentials (:obj:`grpc.ChannelCredentials`): Credentials object.
    
    121
    +
    
    122
    +        Returns:
    
    123
    +            int: Number of the bound port.
    
    80 124
             """
    
    81 125
             if credentials is not None:
    
    82 126
                 self.__logger.info("Adding secure connection on: [%s]", address)
    
    83
    -            self._server.add_secure_port(address, credentials)
    
    127
    +            port_number = self.__grpc_server.add_secure_port(address, credentials)
    
    84 128
     
    
    85 129
             else:
    
    86 130
                 self.__logger.info("Adding insecure connection on [%s]", address)
    
    87
    -            self._server.add_insecure_port(address)
    
    131
    +            port_number = self.__grpc_server.add_insecure_port(address)
    
    132
    +
    
    133
    +        return port_number
    
    88 134
     
    
    89 135
         def add_execution_instance(self, instance, instance_name):
    
    90 136
             """Adds an :obj:`ExecutionInstance` to the service.
    
    ... ... @@ -96,10 +142,14 @@ class BuildGridServer:
    96 142
                 instance_name (str): Instance name.
    
    97 143
             """
    
    98 144
             if self._execution_service is None:
    
    99
    -            self._execution_service = ExecutionService(self._server)
    
    145
    +            self._execution_service = ExecutionService(
    
    146
    +                self.__grpc_server, monitor=self._is_instrumented)
    
    100 147
     
    
    101 148
             self._execution_service.add_instance(instance_name, instance)
    
    102 149
     
    
    150
    +        self._schedulers[instance_name] = instance.scheduler
    
    151
    +        self._instances.add(instance_name)
    
    152
    +
    
    103 153
         def add_bots_interface(self, instance, instance_name):
    
    104 154
             """Adds a :obj:`BotsInterface` to the service.
    
    105 155
     
    
    ... ... @@ -110,10 +160,13 @@ class BuildGridServer:
    110 160
                 instance_name (str): Instance name.
    
    111 161
             """
    
    112 162
             if self._bots_service is None:
    
    113
    -            self._bots_service = BotsService(self._server)
    
    163
    +            self._bots_service = BotsService(
    
    164
    +                self.__grpc_server, monitor=self._is_instrumented)
    
    114 165
     
    
    115 166
             self._bots_service.add_instance(instance_name, instance)
    
    116 167
     
    
    168
    +        self._instances.add(instance_name)
    
    169
    +
    
    117 170
         def add_operations_instance(self, instance, instance_name):
    
    118 171
             """Adds an :obj:`OperationsInstance` to the service.
    
    119 172
     
    
    ... ... @@ -124,8 +177,7 @@ class BuildGridServer:
    124 177
                 instance_name (str): Instance name.
    
    125 178
             """
    
    126 179
             if self._operations_service is None:
    
    127
    -            self._operations_service = OperationsService(self._server)
    
    128
    -
    
    180
    +            self._operations_service = OperationsService(self.__grpc_server)
    
    129 181
             self._operations_service.add_instance(instance_name, instance)
    
    130 182
     
    
    131 183
         def add_reference_storage_instance(self, instance, instance_name):
    
    ... ... @@ -138,8 +190,7 @@ class BuildGridServer:
    138 190
                 instance_name (str): Instance name.
    
    139 191
             """
    
    140 192
             if self._reference_storage_service is None:
    
    141
    -            self._reference_storage_service = ReferenceStorageService(self._server)
    
    142
    -
    
    193
    +            self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
    
    143 194
             self._reference_storage_service.add_instance(instance_name, instance)
    
    144 195
     
    
    145 196
         def add_action_cache_instance(self, instance, instance_name):
    
    ... ... @@ -152,8 +203,7 @@ class BuildGridServer:
    152 203
                 instance_name (str): Instance name.
    
    153 204
             """
    
    154 205
             if self._action_cache_service is None:
    
    155
    -            self._action_cache_service = ActionCacheService(self._server)
    
    156
    -
    
    206
    +            self._action_cache_service = ActionCacheService(self.__grpc_server)
    
    157 207
             self._action_cache_service.add_instance(instance_name, instance)
    
    158 208
     
    
    159 209
         def add_cas_instance(self, instance, instance_name):
    
    ... ... @@ -166,8 +216,7 @@ class BuildGridServer:
    166 216
                 instance_name (str): Instance name.
    
    167 217
             """
    
    168 218
             if self._cas_service is None:
    
    169
    -            self._cas_service = ContentAddressableStorageService(self._server)
    
    170
    -
    
    219
    +            self._cas_service = ContentAddressableStorageService(self.__grpc_server)
    
    171 220
             self._cas_service.add_instance(instance_name, instance)
    
    172 221
     
    
    173 222
         def add_bytestream_instance(self, instance, instance_name):
    
    ... ... @@ -180,6 +229,150 @@ class BuildGridServer:
    180 229
                 instance_name (str): Instance name.
    
    181 230
             """
    
    182 231
             if self._bytestream_service is None:
    
    183
    -            self._bytestream_service = ByteStreamService(self._server)
    
    184
    -
    
    232
    +            self._bytestream_service = ByteStreamService(self.__grpc_server)
    
    185 233
             self._bytestream_service.add_instance(instance_name, instance)
    
    234
    +
    
    235
    +    # --- Public API: Monitoring ---
    
    236
    +
    
    237
    +    @property
    
    238
    +    def is_instrumented(self):
    
    239
    +        return self._is_instrumented
    
    240
    +
    
    241
    +    # --- Private API ---
    
    242
    +
    
    243
    +    async def _state_monitoring_worker(self, period=1.0):
    
    244
    +        """Periodically publishes state metrics to the monitoring bus."""
    
    245
    +        async def __state_monitoring_worker():
    
    246
    +            # Emit total clients count record:
    
    247
    +            _, record = self._query_n_clients()
    
    248
    +            await self.__monitoring_bus.send_record(record)
    
    249
    +
    
    250
    +            # Emit total bots count record:
    
    251
    +            _, record = self._query_n_bots()
    
    252
    +            await self.__monitoring_bus.send_record(record)
    
    253
    +
    
    254
    +            queue_times = []
    
    255
    +            # Emits records by instance:
    
    256
    +            for instance_name in self._instances:
    
    257
    +                # Emit instance clients count record:
    
    258
    +                _, record = self._query_n_clients_for_instance(instance_name)
    
    259
    +                await self.__monitoring_bus.send_record(record)
    
    260
    +
    
    261
    +                # Emit instance bots count record:
    
    262
    +                _, record = self._query_n_bots_for_instance(instance_name)
    
    263
    +                await self.__monitoring_bus.send_record(record)
    
    264
    +
    
    265
    +                # Emit instance average queue time record:
    
    266
    +                queue_time, record = self._query_am_queue_time_for_instance(instance_name)
    
    267
    +                await self.__monitoring_bus.send_record(record)
    
    268
    +                if queue_time:
    
    269
    +                    queue_times.append(queue_time)
    
    270
    +
    
    271
    +            # Emit overall average queue time record:
    
    272
    +            if queue_times:
    
    273
    +                am_queue_time = sum(queue_times, timedelta()) / len(queue_times)
    
    274
    +            else:
    
    275
    +                am_queue_time = timedelta()
    
    276
    +            record = self._forge_timer_metric_record(
    
    277
    +                MetricRecordDomain.STATE,
    
    278
    +                'average-queue-time',
    
    279
    +                am_queue_time)
    
    280
    +
    
    281
    +            await self.__monitoring_bus.send_record(record)
    
    282
    +
    
    283
    +        try:
    
    284
    +            while True:
    
    285
    +                start = time.time()
    
    286
    +                await __state_monitoring_worker()
    
    287
    +
    
    288
    +                end = time.time()
    
    289
    +                await asyncio.sleep(period - (end - start))
    
    290
    +
    
    291
    +        except asyncio.CancelledError:
    
    292
    +            pass
    
    293
    +        # except BaseException as e:
    
    294
    +        #     print(f'__state_monitoring_worker: {e}')
    
    295
    +
    
    296
    +    def _forge_counter_metric_record(self, domain, name, count, extra=None):
    
    297
    +        counter_record = monitoring_pb2.MetricRecord()
    
    298
    +
    
    299
    +        counter_record.creation_timestamp.GetCurrentTime()
    
    300
    +        counter_record.domain = domain.value
    
    301
    +        counter_record.type = MetricRecordType.COUNTER.value
    
    302
    +        counter_record.name = name
    
    303
    +        counter_record.count = count
    
    304
    +        if extra is not None:
    
    305
    +            counter_record.extra.update(extra)
    
    306
    +
    
    307
    +        return counter_record
    
    308
    +
    
    309
    +    def _forge_timer_metric_record(self, domain, name, duration, extra=None):
    
    310
    +        timer_record = monitoring_pb2.MetricRecord()
    
    311
    +
    
    312
    +        timer_record.creation_timestamp.GetCurrentTime()
    
    313
    +        timer_record.domain = domain.value
    
    314
    +        timer_record.type = MetricRecordType.TIMER.value
    
    315
    +        timer_record.name = name
    
    316
    +        timer_record.duration.FromTimedelta(duration)
    
    317
    +        if extra is not None:
    
    318
    +            timer_record.extra.update(extra)
    
    319
    +
    
    320
    +        return timer_record
    
    321
    +
    
    322
    +    def _forge_gauge_metric_record(self, domain, name, value, extra=None):
    
    323
    +        gauge_record = monitoring_pb2.MetricRecord()
    
    324
    +
    
    325
    +        gauge_record.creation_timestamp.GetCurrentTime()
    
    326
    +        gauge_record.domain = domain.value
    
    327
    +        gauge_record.type = MetricRecordType.GAUGE.value
    
    328
    +        gauge_record.name = name
    
    329
    +        gauge_record.value = value
    
    330
    +        if extra is not None:
    
    331
    +            gauge_record.extra.update(extra)
    
    332
    +
    
    333
    +        return gauge_record
    
    334
    +
    
    335
    +    # --- Private API: Monitoring ---
    
    336
    +
    
    337
    +    def _query_n_clients(self):
    
    338
    +        """Queries the number of clients connected."""
    
    339
    +        n_clients = self._execution_service.query_n_clients()
    
    340
    +        gauge_record = self._forge_gauge_metric_record(
    
    341
    +            MetricRecordDomain.STATE, 'clients-count', n_clients)
    
    342
    +
    
    343
    +        return n_clients, gauge_record
    
    344
    +
    
    345
    +    def _query_n_clients_for_instance(self, instance_name):
    
    346
    +        """Queries the number of clients connected for a given instance"""
    
    347
    +        n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
    
    348
    +        gauge_record = self._forge_gauge_metric_record(
    
    349
    +            MetricRecordDomain.STATE, 'clients-count', n_clients,
    
    350
    +            extra={'instance-name': instance_name or 'void'})
    
    351
    +
    
    352
    +        return n_clients, gauge_record
    
    353
    +
    
    354
    +    def _query_n_bots(self):
    
    355
    +        """Queries the number of bots connected."""
    
    356
    +        n_bots = self._bots_service.query_n_bots()
    
    357
    +        gauge_record = self._forge_gauge_metric_record(
    
    358
    +            MetricRecordDomain.STATE, 'bots-count', n_bots)
    
    359
    +
    
    360
    +        return n_bots, gauge_record
    
    361
    +
    
    362
    +    def _query_n_bots_for_instance(self, instance_name):
    
    363
    +        """Queries the number of bots connected for a given instance."""
    
    364
    +        n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
    
    365
    +        gauge_record = self._forge_gauge_metric_record(
    
    366
    +            MetricRecordDomain.STATE, 'bots-count', n_bots,
    
    367
    +            extra={'instance-name': instance_name or 'void'})
    
    368
    +
    
    369
    +        return n_bots, gauge_record
    
    370
    +
    
    371
    +    def _query_am_queue_time_for_instance(self, instance_name):
    
    372
    +        """Queries the average job's queue time for a given instance."""
    
    373
    +        am_queue_time = self._schedulers[instance_name].query_am_queue_time()
    
    374
    +        timer_record = self._forge_timer_metric_record(
    
    375
    +            MetricRecordDomain.STATE, 'average-queue-time', am_queue_time,
    
    376
    +            extra={'instance-name': instance_name or 'void'})
    
    377
    +
    
    378
    +        return am_queue_time, timer_record

  • buildgrid/server/job.py
    ... ... @@ -13,10 +13,11 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +from datetime import datetime
    
    16 17
     import logging
    
    17 18
     import uuid
    
    18 19
     
    
    19
    -from google.protobuf import timestamp_pb2
    
    20
    +from google.protobuf import duration_pb2, timestamp_pb2
    
    20 21
     
    
    21 22
     from buildgrid._enums import LeaseState, OperationStage
    
    22 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    ... ... @@ -37,6 +38,7 @@ class Job:
    37 38
             self.__execute_response = None
    
    38 39
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    39 40
             self.__queued_timestamp = timestamp_pb2.Timestamp()
    
    41
    +        self.__queued_time_duration = duration_pb2.Duration()
    
    40 42
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    41 43
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    42 44
     
    
    ... ... @@ -50,6 +52,8 @@ class Job:
    50 52
             self._operation.done = False
    
    51 53
             self._n_tries = 0
    
    52 54
     
    
    55
    +    # --- Public API ---
    
    56
    +
    
    53 57
         @property
    
    54 58
         def name(self):
    
    55 59
             return self._name
    
    ... ... @@ -179,7 +183,7 @@ class Job:
    179 183
                     result.Unpack(action_result)
    
    180 184
     
    
    181 185
                 action_metadata = action_result.execution_metadata
    
    182
    -            action_metadata.queued_timestamp.CopyFrom(self.__worker_start_timestamp)
    
    186
    +            action_metadata.queued_timestamp.CopyFrom(self.__queued_timestamp)
    
    183 187
                 action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp)
    
    184 188
                 action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp)
    
    185 189
     
    
    ... ... @@ -204,6 +208,10 @@ class Job:
    204 208
                     self.__queued_timestamp.GetCurrentTime()
    
    205 209
                 self._n_tries += 1
    
    206 210
     
    
    211
    +        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    212
    +            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    213
    +            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    214
    +
    
    207 215
             elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    208 216
                 if self.__execute_response is not None:
    
    209 217
                     self._operation.response.Pack(self.__execute_response)
    
    ... ... @@ -213,3 +221,11 @@ class Job:
    213 221
     
    
    214 222
             for queue in self._operation_update_queues:
    
    215 223
                 queue.put(self._operation)
    
    224
    +
    
    225
    +    # --- Public API: Monitoring ---
    
    226
    +
    
    227
    +    def query_queue_time(self):
    
    228
    +        return self.__queued_time_duration.ToTimedelta()
    
    229
    +
    
    230
    +    def query_n_retries(self):
    
    231
    +        return self._n_tries - 1 if self._n_tries > 0 else 0

  • buildgrid/server/operations/instance.py
    ... ... @@ -32,6 +32,10 @@ class OperationsInstance:
    32 32
     
    
    33 33
             self._scheduler = scheduler
    
    34 34
     
    
    35
    +    @property
    
    36
    +    def scheduler(self):
    
    37
    +        return self._scheduler
    
    38
    +
    
    35 39
         def register_instance_with_server(self, instance_name, server):
    
    36 40
             server.add_operations_instance(self, instance_name)
    
    37 41
     
    

  • buildgrid/server/operations/service.py
    ... ... @@ -38,8 +38,18 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    38 38
     
    
    39 39
             operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
    
    40 40
     
    
    41
    -    def add_instance(self, name, instance):
    
    42
    -        self._instances[name] = instance
    
    41
    +    # --- Public API ---
    
    42
    +
    
    43
    +    def add_instance(self, instance_name, instance):
    
    44
    +        """Registers a new servicer instance.
    
    45
    +
    
    46
    +        Args:
    
    47
    +            instance_name (str): The new instance's name.
    
    48
    +            instance (OperationsInstance): The new instance itself.
    
    49
    +        """
    
    50
    +        self._instances[instance_name] = instance
    
    51
    +
    
    52
    +    # --- Public API: Servicer ---
    
    43 53
     
    
    44 54
         def GetOperation(self, request, context):
    
    45 55
             self.__logger.debug("GetOperation request from [%s]", context.peer())
    
    ... ... @@ -132,6 +142,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    132 142
     
    
    133 143
             return Empty()
    
    134 144
     
    
    145
    +    # --- Private API ---
    
    146
    +
    
    135 147
         def _parse_instance_name(self, name):
    
    136 148
             """ If the instance name is not blank, 'name' will have the form
    
    137 149
             {instance_name}/{operation_uuid}. Otherwise, it will just be
    

  • buildgrid/server/scheduler.py
    ... ... @@ -20,24 +20,48 @@ Schedules jobs.
    20 20
     """
    
    21 21
     
    
    22 22
     from collections import deque
    
    23
    +from datetime import timedelta
    
    23 24
     import logging
    
    24 25
     
    
    26
    +from buildgrid._enums import LeaseState, OperationStage
    
    25 27
     from buildgrid._exceptions import NotFoundError
    
    26 28
     
    
    27
    -from .job import OperationStage, LeaseState
    
    28
    -
    
    29 29
     
    
    30 30
     class Scheduler:
    
    31 31
     
    
    32 32
         MAX_N_TRIES = 5
    
    33 33
     
    
    34
    -    def __init__(self, action_cache=None):
    
    34
    +    def __init__(self, action_cache=None, monitor=False):
    
    35 35
             self.__logger = logging.getLogger(__name__)
    
    36 36
     
    
    37
    +        self.__operations_by_stage = None
    
    38
    +        self.__leases_by_state = None
    
    39
    +        self.__queue_time_average = None
    
    40
    +        self.__retries_count = 0
    
    41
    +
    
    37 42
             self._action_cache = action_cache
    
    38 43
             self.jobs = {}
    
    39 44
             self.queue = deque()
    
    40 45
     
    
    46
    +        self._is_instrumented = monitor
    
    47
    +
    
    48
    +        if self._is_instrumented:
    
    49
    +            self.__operations_by_stage = {}
    
    50
    +            self.__leases_by_state = {}
    
    51
    +            self.__queue_time_average = 0, timedelta()
    
    52
    +
    
    53
    +            self.__operations_by_stage[OperationStage.CACHE_CHECK] = set()
    
    54
    +            self.__operations_by_stage[OperationStage.QUEUED] = set()
    
    55
    +            self.__operations_by_stage[OperationStage.EXECUTING] = set()
    
    56
    +            self.__operations_by_stage[OperationStage.COPLETED] = set()
    
    57
    +
    
    58
    +            self.__leases_by_state[LeaseState.PENDING] = set()
    
    59
    +            self.__leases_by_state[LeaseState.ACTIVE] = set()
    
    60
    +            self.__leases_by_state[LeaseState.COMPLETED] = set()
    
    61
    +            self.__leases_by_state[LeaseState.CANCELLED] = set()
    
    62
    +
    
    63
    +    # --- Public API ---
    
    64
    +
    
    41 65
         def register_client(self, job_name, queue):
    
    42 66
             self.jobs[job_name].register_client(queue)
    
    43 67
     
    
    ... ... @@ -62,22 +86,29 @@ class Scheduler:
    62 86
                     job.set_cached_result(action_result)
    
    63 87
                     operation_stage = OperationStage.COMPLETED
    
    64 88
     
    
    89
    +                if self._is_instrumented:
    
    90
    +                    self.__retries_count += 1
    
    91
    +
    
    65 92
             else:
    
    66 93
                 operation_stage = OperationStage.QUEUED
    
    67 94
                 self.queue.append(job)
    
    68 95
     
    
    69
    -        job.update_operation_stage(operation_stage)
    
    96
    +        self._update_job_operation_stage(job.name, operation_stage)
    
    70 97
     
    
    71 98
         def retry_job(self, job_name):
    
    72
    -        if job_name in self.jobs:
    
    73
    -            job = self.jobs[job_name]
    
    74
    -            if job.n_tries >= self.MAX_N_TRIES:
    
    75
    -                # TODO: Decide what to do with these jobs
    
    76
    -                job.update_operation_stage(OperationStage.COMPLETED)
    
    77
    -                # TODO: Mark these jobs as done
    
    78
    -            else:
    
    79
    -                job.update_operation_stage(OperationStage.QUEUED)
    
    80
    -                self.queue.appendleft(job)
    
    99
    +        job = self.jobs[job_name]
    
    100
    +
    
    101
    +        operation_stage = None
    
    102
    +        if job.n_tries >= self.MAX_N_TRIES:
    
    103
    +            # TODO: Decide what to do with these jobs
    
    104
    +            operation_stage = OperationStage.COMPLETED
    
    105
    +            # TODO: Mark these jobs as done
    
    106
    +
    
    107
    +        else:
    
    108
    +            operation_stage = OperationStage.QUEUED
    
    109
    +            self.queue.appendleft(job)
    
    110
    +
    
    111
    +        self._update_job_operation_stage(job_name, operation_stage)
    
    81 112
     
    
    82 113
         def list_jobs(self):
    
    83 114
             return self.jobs.values()
    
    ... ... @@ -112,13 +143,14 @@ class Scheduler:
    112 143
             """
    
    113 144
             job = self.jobs[job_name]
    
    114 145
     
    
    146
    +        operation_stage = None
    
    115 147
             if lease_state == LeaseState.PENDING:
    
    116 148
                 job.update_lease_state(LeaseState.PENDING)
    
    117
    -            job.update_operation_stage(OperationStage.QUEUED)
    
    149
    +            operation_stage = OperationStage.QUEUED
    
    118 150
     
    
    119 151
             elif lease_state == LeaseState.ACTIVE:
    
    120 152
                 job.update_lease_state(LeaseState.ACTIVE)
    
    121
    -            job.update_operation_stage(OperationStage.EXECUTING)
    
    153
    +            operation_stage = OperationStage.EXECUTING
    
    122 154
     
    
    123 155
             elif lease_state == LeaseState.COMPLETED:
    
    124 156
                 job.update_lease_state(LeaseState.COMPLETED,
    
    ... ... @@ -127,7 +159,9 @@ class Scheduler:
    127 159
                 if self._action_cache is not None and not job.do_not_cache:
    
    128 160
                     self._action_cache.update_action_result(job.action_digest, job.action_result)
    
    129 161
     
    
    130
    -            job.update_operation_stage(OperationStage.COMPLETED)
    
    162
    +            operation_stage = OperationStage.COMPLETED
    
    163
    +
    
    164
    +        self._update_job_operation_stage(job_name, operation_stage)
    
    131 165
     
    
    132 166
         def get_job_lease(self, job_name):
    
    133 167
             """Returns the lease associated to job, if any have been emitted yet."""
    
    ... ... @@ -136,3 +170,78 @@ class Scheduler:
    136 170
         def get_job_operation(self, job_name):
    
    137 171
             """Returns the operation associated to job."""
    
    138 172
             return self.jobs[job_name].operation
    
    173
    +
    
    174
    +    # --- Public API: Monitoring ---
    
    175
    +
    
    176
    +    @property
    
    177
    +    def is_instrumented(self):
    
    178
    +        return self._is_instrumented
    
    179
    +
    
    180
    +    def query_n_jobs(self):
    
    181
    +        return len(self.jobs)
    
    182
    +
    
    183
    +    def query_n_operations(self):
    
    184
    +        # For now n_operations == n_jobs:
    
    185
    +        return len(self.jobs)
    
    186
    +
    
    187
    +    def query_n_operations_by_stage(self, operation_stage):
    
    188
    +        try:
    
    189
    +            if self.__operations_by_stage is not None:
    
    190
    +                return len(self.__operations_by_stage[operation_stage])
    
    191
    +        except KeyError:
    
    192
    +            pass
    
    193
    +        return 0
    
    194
    +
    
    195
    +    def query_n_leases(self):
    
    196
    +        return len(self.jobs)
    
    197
    +
    
    198
    +    def query_n_leases_by_state(self, lease_state):
    
    199
    +        try:
    
    200
    +            if self.__leases_by_state is not None:
    
    201
    +                return len(self.__leases_by_state[lease_state])
    
    202
    +        except KeyError:
    
    203
    +            pass
    
    204
    +        return 0
    
    205
    +
    
    206
    +    def query_n_retries(self):
    
    207
    +        return self.__retries_count
    
    208
    +
    
    209
    +    def query_am_queue_time(self):
    
    210
    +        if self.__queue_time_average is not None:
    
    211
    +            return self.__queue_time_average[1]
    
    212
    +        return 0
    
    213
    +
    
    214
    +    # --- Private API ---
    
    215
    +
    
    216
    +    def _update_job_operation_stage(self, job_name, operation_stage):
    
    217
    +        """Requests a stage transition for the job's :class:Operations.
    
    218
    +
    
    219
    +        Args:
    
    220
    +            job_name (str): name of the job to query.
    
    221
    +            operation_stage (OperationStage): the stage to transition to.
    
    222
    +        """
    
    223
    +        job = self.jobs[job_name]
    
    224
    +
    
    225
    +        if operation_stage == OperationStage.CACHE_CHECK:
    
    226
    +            job.update_operation_stage(OperationStage.CACHE_CHECK)
    
    227
    +
    
    228
    +        elif operation_stage == OperationStage.QUEUED:
    
    229
    +            job.update_operation_stage(OperationStage.QUEUED)
    
    230
    +
    
    231
    +        elif operation_stage == OperationStage.EXECUTING:
    
    232
    +            job.update_operation_stage(OperationStage.EXECUTING)
    
    233
    +
    
    234
    +        elif operation_stage == OperationStage.COMPLETED:
    
    235
    +            job.update_operation_stage(OperationStage.COMPLETED)
    
    236
    +
    
    237
    +            if self._is_instrumented:
    
    238
    +                average_order, average_time = self.__queue_time_average
    
    239
    +
    
    240
    +                average_order += 1
    
    241
    +                if average_order <= 1:
    
    242
    +                    average_time = job.query_queue_time()
    
    243
    +                else:
    
    244
    +                    queue_time = job.query_queue_time()
    
    245
    +                    average_time = average_time + ((queue_time - average_time) / average_order)
    
    246
    +
    
    247
    +                self.__queue_time_average = average_order, average_time

  • buildgrid/settings.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    1 16
     import hashlib
    
    2 17
     
    
    3 18
     
    
    4
    -# The hash function that CAS uses
    
    19
    +# Hash function used for computing digests:
    
    5 20
     HASH = hashlib.sha256
    
    21
    +
    
    22
    +# Lenght in bytes of a hash string returned by HASH:
    
    6 23
     HASH_LENGTH = HASH().digest_size * 2
    
    24
    +
    
    25
    +# Period, in seconds, for the monitoring cycle:
    
    26
    +MONITORING_PERIOD = 5.0

  • setup.py
    ... ... @@ -112,13 +112,15 @@ setup(
    112 112
         license="Apache License, Version 2.0",
    
    113 113
         description="A remote execution service",
    
    114 114
         packages=find_packages(),
    
    115
    +    python_requires='>= 3.5.3',  # janus requirement
    
    115 116
         install_requires=[
    
    116
    -        'protobuf',
    
    117
    -        'grpcio',
    
    118
    -        'Click',
    
    119
    -        'PyYAML',
    
    120 117
             'boto3 < 1.8.0',
    
    121 118
             'botocore < 1.11.0',
    
    119
    +        'click',
    
    120
    +        'grpcio',
    
    121
    +        'janus',
    
    122
    +        'protobuf',
    
    123
    +        'pyyaml',
    
    122 124
         ],
    
    123 125
         entry_points={
    
    124 126
             'console_scripts': [
    

  • tests/cas/test_services.py
    ... ... @@ -137,7 +137,7 @@ def test_bytestream_write(mocked, instance, extra_data):
    137 137
             bytestream_pb2.WriteRequest(data=b'def', write_offset=3, finish_write=True)
    
    138 138
         ]
    
    139 139
     
    
    140
    -    response = servicer.Write(requests, context)
    
    140
    +    response = servicer.Write(iter(requests), context)
    
    141 141
         assert response.committed_size == 6
    
    142 142
         assert len(storage.data) == 1
    
    143 143
         assert (hash_, 6) in storage.data
    
    ... ... @@ -159,7 +159,7 @@ def test_bytestream_write_rejects_wrong_hash(mocked):
    159 159
             bytestream_pb2.WriteRequest(resource_name=resource_name, data=data, finish_write=True)
    
    160 160
         ]
    
    161 161
     
    
    162
    -    servicer.Write(requests, context)
    
    162
    +    servicer.Write(iter(requests), context)
    
    163 163
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    164 164
     
    
    165 165
         assert len(storage.data) is 0
    

  • tests/server_instance.py
    ... ... @@ -13,19 +13,24 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    -from buildgrid._app.settings import parser
    
    17
    -from buildgrid._app.commands.cmd_server import _create_server_from_config
    
    18
    -from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    19
    -from buildgrid.server.actioncache.service import ActionCacheService
    
    20
    -from buildgrid.server.execution.service import ExecutionService
    
    21
    -from buildgrid.server.operations.service import OperationsService
    
    22
    -from buildgrid.server.bots.service import BotsService
    
    23
    -from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    16
    +import grpc
    
    17
    +
    
    18
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    19
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    20
    +from buildgrid._protos.buildstream.v2 import buildstream_pb2
    
    21
    +from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
    
    22
    +from buildgrid._protos.google.bytestream import bytestream_pb2
    
    23
    +from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    24
    +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    25
    +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    26
    +from buildgrid._protos.google.longrunning import operations_pb2
    
    27
    +from buildgrid._protos.google.longrunning import operations_pb2_grpc
    
    24 28
     
    
    25 29
     from .utils.cas import run_in_subprocess
    
    30
    +from .utils.server import serve
    
    26 31
     
    
    27 32
     
    
    28
    -config = """
    
    33
    +CONFIGURATION = """
    
    29 34
     server:
    
    30 35
       - !channel
    
    31 36
         port: 50051
    
    ... ... @@ -72,24 +77,102 @@ instances:
    72 77
     
    
    73 78
     def test_create_server():
    
    74 79
         # Actual test function, to be run in a subprocess:
    
    75
    -    def __test_create_server(queue, config_data):
    
    76
    -        settings = parser.get_parser().safe_load(config)
    
    77
    -        server = _create_server_from_config(settings)
    
    80
    +    def __test_create_server(queue, remote):
    
    81
    +        # Open a channel to the remote server:
    
    82
    +        channel = grpc.insecure_channel(remote)
    
    78 83
     
    
    79
    -        server.start()
    
    80
    -        server.stop()
    
    84
    +        try:
    
    85
    +            stub = remote_execution_pb2_grpc.ExecutionStub(channel)
    
    86
    +            request = remote_execution_pb2.ExecuteRequest(instance_name='main')
    
    87
    +            response = next(stub.Execute(request))
    
    88
    +
    
    89
    +            assert response.DESCRIPTOR is operations_pb2.Operation.DESCRIPTOR
    
    90
    +
    
    91
    +        except grpc.RpcError as e:
    
    92
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    93
    +                queue.put(False)
    
    94
    +        except AssertionError:
    
    95
    +            queue.put(False)
    
    96
    +
    
    97
    +        try:
    
    98
    +            stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
    
    99
    +            request = remote_execution_pb2.GetActionResultRequest(instance_name='main')
    
    100
    +            response = stub.GetActionResult(request)
    
    101
    +
    
    102
    +            assert response.DESCRIPTOR is remote_execution_pb2.ActionResult.DESCRIPTOR
    
    103
    +
    
    104
    +        except grpc.RpcError as e:
    
    105
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    106
    +                queue.put(False)
    
    107
    +        except AssertionError:
    
    108
    +            queue.put(False)
    
    109
    +
    
    110
    +        try:
    
    111
    +            stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
    
    112
    +            request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name='main')
    
    113
    +            response = stub.BatchUpdateBlobs(request)
    
    114
    +
    
    115
    +            assert response.DESCRIPTOR is remote_execution_pb2.BatchUpdateBlobsResponse.DESCRIPTOR
    
    116
    +
    
    117
    +        except grpc.RpcError as e:
    
    118
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    119
    +                queue.put(False)
    
    120
    +        except AssertionError:
    
    121
    +            queue.put(False)
    
    122
    +
    
    123
    +        try:
    
    124
    +            stub = buildstream_pb2_grpc.ReferenceStorageStub(channel)
    
    125
    +            request = buildstream_pb2.GetReferenceRequest(instance_name='main')
    
    126
    +            response = stub.GetReference(request)
    
    127
    +
    
    128
    +            assert response.DESCRIPTOR is buildstream_pb2.GetReferenceResponse.DESCRIPTOR
    
    129
    +
    
    130
    +        except grpc.RpcError as e:
    
    131
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    132
    +                queue.put(False)
    
    133
    +        except AssertionError:
    
    134
    +            queue.put(False)
    
    81 135
     
    
    82 136
             try:
    
    83
    -            assert isinstance(server._execution_service, ExecutionService)
    
    84
    -            assert isinstance(server._operations_service, OperationsService)
    
    85
    -            assert isinstance(server._bots_service, BotsService)
    
    86
    -            assert isinstance(server._reference_storage_service, ReferenceStorageService)
    
    87
    -            assert isinstance(server._action_cache_service, ActionCacheService)
    
    88
    -            assert isinstance(server._cas_service, ContentAddressableStorageService)
    
    89
    -            assert isinstance(server._bytestream_service, ByteStreamService)
    
    137
    +            stub = bytestream_pb2_grpc.ByteStreamStub(channel)
    
    138
    +            request = bytestream_pb2.ReadRequest()
    
    139
    +            response = stub.Read(request)
    
    140
    +
    
    141
    +            assert next(response).DESCRIPTOR is bytestream_pb2.ReadResponse.DESCRIPTOR
    
    142
    +
    
    143
    +        except grpc.RpcError as e:
    
    144
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    145
    +                queue.put(False)
    
    90 146
             except AssertionError:
    
    91 147
                 queue.put(False)
    
    92
    -        else:
    
    93
    -            queue.put(True)
    
    94 148
     
    
    95
    -    assert run_in_subprocess(__test_create_server, config)
    149
    +        try:
    
    150
    +            stub = operations_pb2_grpc.OperationsStub(channel)
    
    151
    +            request = operations_pb2.ListOperationsRequest(name='main')
    
    152
    +            response = stub.ListOperations(request)
    
    153
    +
    
    154
    +            assert response.DESCRIPTOR is operations_pb2.ListOperationsResponse.DESCRIPTOR
    
    155
    +
    
    156
    +        except grpc.RpcError as e:
    
    157
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    158
    +                queue.put(False)
    
    159
    +        except AssertionError:
    
    160
    +            queue.put(False)
    
    161
    +
    
    162
    +        try:
    
    163
    +            stub = bots_pb2_grpc.BotsStub(channel)
    
    164
    +            request = bots_pb2.CreateBotSessionRequest()
    
    165
    +            response = stub.CreateBotSession(request)
    
    166
    +
    
    167
    +            assert response.DESCRIPTOR is bots_pb2.BotSession.DESCRIPTOR
    
    168
    +
    
    169
    +        except grpc.RpcError as e:
    
    170
    +            if e.code() == grpc.StatusCode.UNIMPLEMENTED:
    
    171
    +                queue.put(False)
    
    172
    +        except AssertionError:
    
    173
    +            queue.put(False)
    
    174
    +
    
    175
    +        queue.put(True)
    
    176
    +
    
    177
    +    with serve(CONFIGURATION) as server:
    
    178
    +        assert run_in_subprocess(__test_create_server, server.remote)

  • tests/utils/server.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +from contextlib import contextmanager
    
    17
    +import multiprocessing
    
    18
    +import signal
    
    19
    +
    
    20
    +import pytest_cov
    
    21
    +
    
    22
    +from buildgrid._app.settings import parser
    
    23
    +from buildgrid.server.instance import BuildGridServer
    
    24
    +
    
    25
    +
    
    26
    +@contextmanager
    
    27
    +def serve(configuration):
    
    28
    +    server = Server(configuration)
    
    29
    +    try:
    
    30
    +        yield server
    
    31
    +    finally:
    
    32
    +        server.quit()
    
    33
    +
    
    34
    +
    
    35
    +class Server:
    
    36
    +
    
    37
    +    def __init__(self, configuration):
    
    38
    +
    
    39
    +        self.configuration = configuration
    
    40
    +
    
    41
    +        self.__queue = multiprocessing.Queue()
    
    42
    +        self.__process = multiprocessing.Process(
    
    43
    +            target=Server.serve,
    
    44
    +            args=(self.__queue, self.configuration))
    
    45
    +        self.__process.start()
    
    46
    +
    
    47
    +        self.port = self.__queue.get()
    
    48
    +        self.remote = 'localhost:{}'.format(self.port)
    
    49
    +
    
    50
    +    @classmethod
    
    51
    +    def serve(cls, queue, configuration):
    
    52
    +        pytest_cov.embed.cleanup_on_sigterm()
    
    53
    +
    
    54
    +        server = BuildGridServer()
    
    55
    +
    
    56
    +        def __signal_handler(signum, frame):
    
    57
    +            server.stop()
    
    58
    +
    
    59
    +        signal.signal(signal.SIGINT, signal.SIG_IGN)
    
    60
    +        signal.signal(signal.SIGTERM, __signal_handler)
    
    61
    +
    
    62
    +        instances = parser.get_parser().safe_load(configuration)['instances']
    
    63
    +        for instance in instances:
    
    64
    +            instance_name = instance['name']
    
    65
    +            services = instance['services']
    
    66
    +            for service in services:
    
    67
    +                service.register_instance_with_server(instance_name, server)
    
    68
    +
    
    69
    +        port = server.add_port('localhost:0', None)
    
    70
    +
    
    71
    +        queue.put(port)
    
    72
    +
    
    73
    +        server.start()
    
    74
    +
    
    75
    +    def quit(self):
    
    76
    +        if self.__process:
    
    77
    +            self.__process.terminate()
    
    78
    +            self.__process.join()



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]