[Notes] [Git][BuildGrid/buildgrid][santigl/106-request-metadata] Server: add support for RequestMetadata



Title: GitLab

Santiago Gil pushed to branch santigl/106-request-metadata at BuildGrid / buildgrid

Commits:

5 changed files:

Changes:

  • buildgrid/server/execution/service.py
    ... ... @@ -30,6 +30,8 @@ from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError,
    30 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32 32
     from buildgrid.server._authentication import AuthContext, authorize
    
    33
    +from buildgrid.server.peer import Peer
    
    34
    +from buildgrid.server.requestmetadata import context_extract_request_metadata
    
    33 35
     
    
    34 36
     
    
    35 37
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    ... ... @@ -94,7 +96,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    94 96
     
    
    95 97
             instance_name = request.instance_name
    
    96 98
             message_queue = queue.Queue()
    
    97
    -        peer = context.peer()
    
    99
    +        peer_id = context.peer()
    
    100
    +
    
    101
    +        request_metadata = context_extract_request_metadata(context)
    
    102
    +        peer = Peer(uid=peer_id, request_metadata=request_metadata)
    
    98 103
     
    
    99 104
             try:
    
    100 105
                 instance = self._get_instance(instance_name)
    
    ... ... @@ -102,8 +107,8 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    102 107
                 job_name = instance.execute(request.action_digest,
    
    103 108
                                             request.skip_cache_lookup)
    
    104 109
     
    
    105
    -            operation_name = instance.register_job_peer(job_name,
    
    106
    -                                                        peer, message_queue)
    
    110
    +            operation_name = instance.register_job_peer(job_name, peer,
    
    111
    +                                                        message_queue)
    
    107 112
     
    
    108 113
                 context.add_callback(partial(self._rpc_termination_callback,
    
    109 114
                                              peer, instance_name, operation_name))
    
    ... ... @@ -161,8 +166,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    161 166
             try:
    
    162 167
                 instance = self._get_instance(instance_name)
    
    163 168
     
    
    164
    -            instance.register_operation_peer(operation_name,
    
    165
    -                                             peer, message_queue)
    
    169
    +            instance.register_operation_peer(operation_name, peer, message_queue)
    
    166 170
     
    
    167 171
                 context.add_callback(partial(self._rpc_termination_callback,
    
    168 172
                                              peer, instance_name, operation_name))
    

  • buildgrid/server/job.py
    ... ... @@ -12,9 +12,9 @@
    12 12
     # See the License for the specific language governing permissions and
    
    13 13
     # limitations under the License.
    
    14 14
     
    
    15
    -
    
    16 15
     from datetime import datetime
    
    17 16
     import logging
    
    17
    +from threading import Lock
    
    18 18
     import uuid
    
    19 19
     
    
    20 20
     from google.protobuf import duration_pb2, timestamp_pb2
    
    ... ... @@ -175,7 +175,7 @@ class Job:
    175 175
             """Subscribes to a new job's :class:`Operation` stage changes.
    
    176 176
     
    
    177 177
             Args:
    
    178
    -            peer (str): a unique string identifying the client.
    
    178
    +            peer (Peer): an object that represents the client.
    
    179 179
                 message_queue (queue.Queue): the event queue to register.
    
    180 180
     
    
    181 181
             Returns:
    
    ... ... @@ -194,10 +194,10 @@ class Job:
    194 194
                                 self._name, new_operation.name)
    
    195 195
     
    
    196 196
             self.__operations_by_name[new_operation.name] = new_operation
    
    197
    -        self.__operations_by_peer[peer] = new_operation
    
    198
    -        self.__operations_message_queues[peer] = message_queue
    
    197
    +        self.__operations_by_peer[peer.uid] = new_operation
    
    198
    +        self.__operations_message_queues[peer.uid] = message_queue
    
    199 199
     
    
    200
    -        self._send_operations_updates(peers=[peer])
    
    200
    +        self._send_operations_updates(peers=[peer.uid])
    
    201 201
     
    
    202 202
             return new_operation.name
    
    203 203
     
    
    ... ... @@ -206,7 +206,7 @@ class Job:
    206 206
     
    
    207 207
             Args:
    
    208 208
                 operation_name (str): an existing operation's name to subscribe to.
    
    209
    -            peer (str): a unique string identifying the client.
    
    209
    +            peer (Peer): an object that represents the client.
    
    210 210
                 message_queue (queue.Queue): the event queue to register.
    
    211 211
     
    
    212 212
             Returns:
    
    ... ... @@ -222,18 +222,17 @@ class Job:
    222 222
                 raise NotFoundError("Operation name does not exist: [{}]"
    
    223 223
                                     .format(operation_name))
    
    224 224
     
    
    225
    -        self.__operations_by_peer[peer] = operation
    
    226
    -        self.__operations_message_queues[peer] = message_queue
    
    225
    +        self.__operations_by_peer[peer.uid] = operation
    
    226
    +        self.__operations_message_queues[peer.uid] = message_queue
    
    227 227
     
    
    228
    -        self._send_operations_updates(peers=[peer])
    
    228
    +        self._send_operations_updates(peers=[peer.uid])
    
    229 229
     
    
    230 230
         def unregister_operation_peer(self, operation_name, peer):
    
    231 231
             """Unsubscribes to the job's :class:`Operation` stage change.
    
    232 232
     
    
    233 233
             Args:
    
    234 234
                 operation_name (str): an existing operation's name to unsubscribe from.
    
    235
    -            peer (str): a unique string identifying the client.
    
    236
    -
    
    235
    +            peer (Peer): an object that represents the client.
    
    237 236
             Raises:
    
    238 237
                 NotFoundError: If no operation with `operation_name` exists.
    
    239 238
             """
    
    ... ... @@ -244,10 +243,10 @@ class Job:
    244 243
                 raise NotFoundError("Operation name does not exist: [{}]"
    
    245 244
                                     .format(operation_name))
    
    246 245
     
    
    247
    -        if peer in self.__operations_message_queues:
    
    248
    -            del self.__operations_message_queues[peer]
    
    246
    +        if peer.uid in self.__operations_message_queues:
    
    247
    +            del self.__operations_message_queues[peer.uid]
    
    249 248
     
    
    250
    -        del self.__operations_by_peer[peer]
    
    249
    +        del self.__operations_by_peer[peer.uid]
    
    251 250
     
    
    252 251
             # Drop the operation if nobody is watching it anymore:
    
    253 252
             if operation not in self.__operations_by_peer.values():
    

  • buildgrid/server/peer.py
    1
    +# Copyright (C) 2019 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +from collections import defaultdict
    
    16
    +import logging
    
    17
    +from threading import Lock
    
    18
    +
    
    19
    +
    
    20
    +class Peer:
    
    21
    +    """Represents a client during a session."""
    
    22
    +
    
    23
    +    # We will keep a global list of Peers indexed by their `Peer.uid`
    
    24
    +    # and a reference counter so that we can clean `Peer`s that do not
    
    25
    +    # exist anymore. Both are protected by the same lock:
    
    26
    +    __peers_by_uid = {}
    
    27
    +    __peers_ref_count = defaultdict(int)
    
    28
    +    __peers_lock = Lock()
    
    29
    +
    
    30
    +    def __init__(self, uid, token=None, request_metadata=None):
    
    31
    +        self._uid = uid  # This uniquely identifies a client
    
    32
    +        self._token = token
    
    33
    +
    
    34
    +        self.__request_metadata = request_metadata
    
    35
    +
    
    36
    +        self.__logger = logging.getLogger(__name__)
    
    37
    +
    
    38
    +        with self.__peers_lock:
    
    39
    +            if self._uid in self.__peers_by_uid and \
    
    40
    +                    self.__peers_by_uid[self._uid] != self:
    
    41
    +                self.__logger.debug('Creating another '
    
    42
    +                                    'instance of Peer with uid {} '
    
    43
    +                                    'with different attributes', self._uid)
    
    44
    +
    
    45
    +            self.__peers_by_uid[self._uid] = self
    
    46
    +            self.__peers_ref_count[self._uid] += 1
    
    47
    +
    
    48
    +    def __del__(self):
    
    49
    +        with self.__peers_lock:
    
    50
    +            assert self.uid in self.__peers_by_uid
    
    51
    +            assert self.__peers_by_uid[self.uid] > 0
    
    52
    +
    
    53
    +            self.__peers_ref_count[self.uid] -= 1
    
    54
    +
    
    55
    +            if self.__peers_ref_count[self.uid] < 1:
    
    56
    +                del self.__peers_by_uid
    
    57
    +                del self.__peers_ref_count[self.uid]
    
    58
    +
    
    59
    +    def __eq__(self, other):
    
    60
    +        if not isinstance(other, Peer):
    
    61
    +            return False
    
    62
    +
    
    63
    +        return self.uid == other.uid and self.token == other.token and \
    
    64
    +            self.request_metadata == other.request_metadata
    
    65
    +
    
    66
    +    def __hash__(self):
    
    67
    +        return hash(self.uid)  # This string is unique for each peer
    
    68
    +
    
    69
    +    @property
    
    70
    +    def uid(self):
    
    71
    +        return self._uid
    
    72
    +
    
    73
    +    @property
    
    74
    +    def token(self):
    
    75
    +        return self._token
    
    76
    +
    
    77
    +    # -- `RequestMetadata` optional values (attached to the Execute() call) --
    
    78
    +    @property
    
    79
    +    def request_metadata(self):
    
    80
    +        return self.__request_metadata
    
    81
    +
    
    82
    +    @property
    
    83
    +    def tool_name(self):
    
    84
    +        if self.__request_metadata and self.__request_metadata.tool_details:
    
    85
    +            return self.__request_metadata.tool_details.tool_name
    
    86
    +        return None
    
    87
    +
    
    88
    +    def tool_version(self):
    
    89
    +        if self.__request_metadata and self.__request_metadata.tool_details:
    
    90
    +            return self.__request_metadata.tool_details.tool_version
    
    91
    +        return None
    
    92
    +
    
    93
    +    @property
    
    94
    +    def action_id(self):
    
    95
    +        if self.__request_metadata:
    
    96
    +            return self.__request_metadata.action_id
    
    97
    +        return None
    
    98
    +
    
    99
    +    @property
    
    100
    +    def tool_invocation_id(self):
    
    101
    +        if self.__request_metadata:
    
    102
    +            return self.__request_metadata.tool_invocation_id
    
    103
    +        return None
    
    104
    +
    
    105
    +    @property
    
    106
    +    def correlated_invocations_id(self):
    
    107
    +        if self.__request_metadata:
    
    108
    +            return self.__request_metadata.correlated_invocations_id
    
    109
    +        return None

  • buildgrid/server/requestmetadata.py
    1
    +# Copyright (C) 2019 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    16
    +from buildgrid.settings import REQUEST_METADATA_HEADER_NAME
    
    17
    +
    
    18
    +
    
    19
    +def context_extract_request_metadata(context):
    
    20
    +    """Given a `grpc.ServicerContext` object, extract the RequestMetadata
    
    21
    +    header values if they are present. If they were not provided,
    
    22
    +    returns None.
    
    23
    +
    
    24
    +    Args:
    
    25
    +        context (grpc.ServicerContext): Context for a RPC call.
    
    26
    +
    
    27
    +    Returns:
    
    28
    +        A `RequestMetadata` proto if RequestMetadata values are present,
    
    29
    +        otherwise None.
    
    30
    +    """
    
    31
    +    invocation_metadata = context.invocation_metadata()
    
    32
    +    request_metadata_entry = next((entry for entry in invocation_metadata
    
    33
    +                                   if entry.key == REQUEST_METADATA_HEADER_NAME),
    
    34
    +                                  None)
    
    35
    +    if not request_metadata_entry:
    
    36
    +        return None
    
    37
    +
    
    38
    +    request_metadata = remote_execution_pb2.RequestMetadata()
    
    39
    +    request_metadata.ParseFromString(request_metadata_entry.value)
    
    40
    +
    
    41
    +    return request_metadata

  • buildgrid/server/scheduler.py
    ... ... @@ -54,9 +54,8 @@ class Scheduler:
    54 54
     
    
    55 55
             self.__queue = []
    
    56 56
     
    
    57
    -        self._is_instrumented = monitor
    
    58
    -
    
    59
    -        if self._is_instrumented:
    
    57
    +        self._is_instrumented = False
    
    58
    +        if monitor:
    
    60 59
                 self.activate_monitoring()
    
    61 60
     
    
    62 61
         # --- Public API ---
    
    ... ... @@ -87,7 +86,7 @@ class Scheduler:
    87 86
     
    
    88 87
             Args:
    
    89 88
                 job_name (str): name of the job to subscribe to.
    
    90
    -            peer (str): a unique string identifying the client.
    
    89
    +            peer (Peer): object that represents the client
    
    91 90
                 message_queue (queue.Queue): the event queue to register.
    
    92 91
     
    
    93 92
             Returns:
    
    ... ... @@ -114,7 +113,7 @@ class Scheduler:
    114 113
     
    
    115 114
             Args:
    
    116 115
                 operation_name (str): name of the operation to subscribe to.
    
    117
    -            peer (str): a unique string identifying the client.
    
    116
    +            peer (Peer): an object that represents the client.
    
    118 117
                 message_queue (queue.Queue): the event queue to register.
    
    119 118
     
    
    120 119
             Returns:
    
    ... ... @@ -137,7 +136,7 @@ class Scheduler:
    137 136
     
    
    138 137
             Args:
    
    139 138
                 operation_name (str): name of the operation to unsubscribe from.
    
    140
    -            peer (str): a unique string identifying the client.
    
    139
    +            peer (Peer): object that represents the client
    
    141 140
     
    
    142 141
             Raises:
    
    143 142
                 NotFoundError: If no operation with `operation_name` exists.
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]