[Notes] [Git][BuildGrid/buildgrid][santigl/106-request-metadata] 3 commits: Server: add support for RequestMetadata



Title: GitLab

Santiago Gil pushed to branch santigl/106-request-metadata at BuildGrid / buildgrid

Commits:

7 changed files:

Changes:

  • buildgrid/_app/commands/cmd_execute.py
    ... ... @@ -30,7 +30,8 @@ from buildgrid.client.authentication import setup_channel
    30 30
     from buildgrid.client.cas import download, upload
    
    31 31
     from buildgrid._exceptions import InvalidArgumentError
    
    32 32
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    33
    -from buildgrid.utils import create_digest
    
    33
    +from buildgrid.settings import REQUEST_METADATA_TOOL_NAME, REQUEST_METADATA_TOOL_VERSION
    
    34
    +from buildgrid.utils import create_digest, request_metadata_header_entry
    
    34 35
     
    
    35 36
     from ..cli import pass_context
    
    36 37
     
    
    ... ... @@ -48,8 +49,17 @@ from ..cli import pass_context
    48 49
                   help="Public server certificate for TLS (PEM-encoded).")
    
    49 50
     @click.option('--instance-name', type=click.STRING, default=None, show_default=True,
    
    50 51
                   help="Targeted farm instance name.")
    
    52
    +@click.option('-t', '--tool-name', type=str, default=REQUEST_METADATA_TOOL_NAME,
    
    53
    +              help='Tool name.')
    
    54
    +@click.option('-n', '--tool-version', type=str, default=REQUEST_METADATA_TOOL_VERSION,
    
    55
    +              help='Tool version.')
    
    56
    +@click.option('-a', '--action-id', type=str, help='Action ID.')
    
    57
    +@click.option('-i', '--invocation-id', type=str, help='Tool invocation ID.')
    
    58
    +@click.option('-c', '--correlation-id', type=str, help='Correlated invocation ID.')
    
    51 59
     @pass_context
    
    52
    -def cli(context, remote, instance_name, auth_token, client_key, client_cert, server_cert):
    
    60
    +def cli(context, remote, instance_name, auth_token, client_key, client_cert,
    
    61
    +        server_cert, tool_name, tool_version, action_id, invocation_id,
    
    62
    +        correlation_id):
    
    53 63
         """Entry point for the bgd-execute CLI command group."""
    
    54 64
         try:
    
    55 65
             context.channel, _ = setup_channel(remote, auth_token=auth_token,
    
    ... ... @@ -62,6 +72,14 @@ def cli(context, remote, instance_name, auth_token, client_key, client_cert, ser
    62 72
     
    
    63 73
         context.instance_name = instance_name
    
    64 74
     
    
    75
    +    request_metadata = request_metadata_header_entry(tool_name=tool_name,
    
    76
    +                                                     tool_version=tool_version,
    
    77
    +                                                     action_id=action_id,
    
    78
    +                                                     tool_invocation_id=invocation_id,
    
    79
    +                                                     correlated_invocations_id=correlation_id)
    
    80
    +
    
    81
    +    context.request_metadata = request_metadata
    
    82
    +
    
    65 83
     
    
    66 84
     @cli.command('request-dummy', short_help="Send a dummy action.")
    
    67 85
     @click.option('--number', type=click.INT, default=1, show_default=True,
    
    ... ... @@ -85,12 +103,12 @@ def request_dummy(context, number, wait_for_completion):
    85 103
                                                       action_digest=action_digest,
    
    86 104
                                                       skip_cache_lookup=True)
    
    87 105
     
    
    106
    +    print(context.request_metadata)
    
    88 107
         responses = []
    
    89 108
         for _ in range(0, number):
    
    90
    -        responses.append(stub.Execute(request))
    
    109
    +        responses.append(stub.Execute(request, metadata=context.request_metadata))
    
    91 110
     
    
    92 111
         for response in responses:
    
    93
    -
    
    94 112
             if wait_for_completion:
    
    95 113
                 result = None
    
    96 114
                 for stream in response:
    
    ... ... @@ -121,6 +139,7 @@ def run_command(context, input_root, commands, output_file, output_directory,
    121 139
         stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
    
    122 140
     
    
    123 141
         output_executables = []
    
    142
    +
    
    124 143
         with upload(context.channel, instance=context.instance_name) as uploader:
    
    125 144
             command = remote_execution_pb2.Command()
    
    126 145
     
    
    ... ... @@ -157,7 +176,8 @@ def run_command(context, input_root, commands, output_file, output_directory,
    157 176
         request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
    
    158 177
                                                       action_digest=action_digest,
    
    159 178
                                                       skip_cache_lookup=True)
    
    160
    -    response = stub.Execute(request)
    
    179
    +
    
    180
    +    response = stub.Execute(request, metadata=context.request_metadata)
    
    161 181
     
    
    162 182
         stream = None
    
    163 183
         for stream in response:
    

  • buildgrid/server/execution/service.py
    ... ... @@ -27,9 +27,11 @@ from functools import partial
    27 27
     import grpc
    
    28 28
     
    
    29 29
     from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
    
    30
    -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    30
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32
    +from buildgrid.server.peer import Peer
    
    32 33
     from buildgrid.server._authentication import AuthContext, authorize
    
    34
    +from buildgrid.settings import REQUEST_METADATA_HEADER_NAME
    
    33 35
     
    
    34 36
     
    
    35 37
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    ... ... @@ -94,7 +96,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    94 96
     
    
    95 97
             instance_name = request.instance_name
    
    96 98
             message_queue = queue.Queue()
    
    97
    -        peer = context.peer()
    
    99
    +        peer_id = context.peer()
    
    100
    +
    
    101
    +        request_metadata = self._context_extract_request_metadata(context)
    
    102
    +        peer = Peer(peer_id=peer_id, request_metadata=request_metadata)
    
    98 103
     
    
    99 104
             try:
    
    100 105
                 instance = self._get_instance(instance_name)
    
    ... ... @@ -102,8 +107,8 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    102 107
                 job_name = instance.execute(request.action_digest,
    
    103 108
                                             request.skip_cache_lookup)
    
    104 109
     
    
    105
    -            operation_name = instance.register_job_peer(job_name,
    
    106
    -                                                        peer, message_queue)
    
    110
    +            operation_name = instance.register_job_peer(job_name, peer,
    
    111
    +                                                        message_queue)
    
    107 112
     
    
    108 113
                 context.add_callback(partial(self._rpc_termination_callback,
    
    109 114
                                              peer, instance_name, operation_name))
    
    ... ... @@ -161,8 +166,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    161 166
             try:
    
    162 167
                 instance = self._get_instance(instance_name)
    
    163 168
     
    
    164
    -            instance.register_operation_peer(operation_name,
    
    165
    -                                             peer, message_queue)
    
    169
    +            instance.register_operation_peer(operation_name, peer, message_queue)
    
    166 170
     
    
    167 171
                 context.add_callback(partial(self._rpc_termination_callback,
    
    168 172
                                              peer, instance_name, operation_name))
    
    ... ... @@ -231,3 +235,21 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    231 235
     
    
    232 236
             except KeyError:
    
    233 237
                 raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))
    
    238
    +
    
    239
    +    @classmethod
    
    240
    +    def _context_extract_request_metadata(cls, context):
    
    241
    +        """Given a context object, extract the RequestMetadata header
    
    242
    +        values if they are present. If they were not provided,
    
    243
    +        returns None.
    
    244
    +        """
    
    245
    +        invocation_metadata = context.invocation_metadata()
    
    246
    +        request_metadata_entry = next((entry for entry in invocation_metadata
    
    247
    +                                       if entry.key == REQUEST_METADATA_HEADER_NAME),
    
    248
    +                                      None)
    
    249
    +        if not request_metadata_entry:
    
    250
    +            return None
    
    251
    +
    
    252
    +        request_metadata = remote_execution_pb2.RequestMetadata()
    
    253
    +        request_metadata.ParseFromString(request_metadata_entry.value)
    
    254
    +
    
    255
    +        return request_metadata

  • buildgrid/server/job.py
    ... ... @@ -12,9 +12,9 @@
    12 12
     # See the License for the specific language governing permissions and
    
    13 13
     # limitations under the License.
    
    14 14
     
    
    15
    -
    
    16 15
     from datetime import datetime
    
    17 16
     import logging
    
    17
    +from threading import Lock
    
    18 18
     import uuid
    
    19 19
     
    
    20 20
     from google.protobuf import duration_pb2, timestamp_pb2
    
    ... ... @@ -62,6 +62,9 @@ class Job:
    62 62
             self._platform_requirements = platform_requirements \
    
    63 63
                 if platform_requirements else dict()
    
    64 64
     
    
    65
    +        self.__peers_lock = Lock()
    
    66
    +        self.__peers = set()
    
    67
    +
    
    65 68
             self._done = False
    
    66 69
     
    
    67 70
         def __lt__(self, other):
    
    ... ... @@ -175,7 +178,7 @@ class Job:
    175 178
             """Subscribes to a new job's :class:`Operation` stage changes.
    
    176 179
     
    
    177 180
             Args:
    
    178
    -            peer (str): a unique string identifying the client.
    
    181
    +            peer (Peer): an object that represents the client.
    
    179 182
                 message_queue (queue.Queue): the event queue to register.
    
    180 183
     
    
    181 184
             Returns:
    
    ... ... @@ -194,10 +197,13 @@ class Job:
    194 197
                                 self._name, new_operation.name)
    
    195 198
     
    
    196 199
             self.__operations_by_name[new_operation.name] = new_operation
    
    197
    -        self.__operations_by_peer[peer] = new_operation
    
    198
    -        self.__operations_message_queues[peer] = message_queue
    
    200
    +        self.__operations_by_peer[peer.uid] = new_operation
    
    201
    +        self.__operations_message_queues[peer.uid] = message_queue
    
    202
    +
    
    203
    +        self._send_operations_updates(peers=[peer.uid])
    
    199 204
     
    
    200
    -        self._send_operations_updates(peers=[peer])
    
    205
    +        with self.__peers_lock:
    
    206
    +            self.__peers.add(peer)
    
    201 207
     
    
    202 208
             return new_operation.name
    
    203 209
     
    
    ... ... @@ -206,7 +212,7 @@ class Job:
    206 212
     
    
    207 213
             Args:
    
    208 214
                 operation_name (str): an existing operation's name to subscribe to.
    
    209
    -            peer (str): a unique string identifying the client.
    
    215
    +            peer (Peer): an object that represents the client.
    
    210 216
                 message_queue (queue.Queue): the event queue to register.
    
    211 217
     
    
    212 218
             Returns:
    
    ... ... @@ -222,18 +228,20 @@ class Job:
    222 228
                 raise NotFoundError("Operation name does not exist: [{}]"
    
    223 229
                                     .format(operation_name))
    
    224 230
     
    
    225
    -        self.__operations_by_peer[peer] = operation
    
    226
    -        self.__operations_message_queues[peer] = message_queue
    
    231
    +        self.__operations_by_peer[peer.uid] = operation
    
    232
    +        self.__operations_message_queues[peer.uid] = message_queue
    
    227 233
     
    
    228
    -        self._send_operations_updates(peers=[peer])
    
    234
    +        self._send_operations_updates(peers=[peer.uid])
    
    235
    +
    
    236
    +        with self.__peers_lock:
    
    237
    +            self.__peers.add(peer)
    
    229 238
     
    
    230 239
         def unregister_operation_peer(self, operation_name, peer):
    
    231 240
             """Unsubscribes to the job's :class:`Operation` stage change.
    
    232 241
     
    
    233 242
             Args:
    
    234 243
                 operation_name (str): an existing operation's name to unsubscribe from.
    
    235
    -            peer (str): a unique string identifying the client.
    
    236
    -
    
    244
    +            peer (Peer): an object that represents the client.
    
    237 245
             Raises:
    
    238 246
                 NotFoundError: If no operation with `operation_name` exists.
    
    239 247
             """
    
    ... ... @@ -244,10 +252,10 @@ class Job:
    244 252
                 raise NotFoundError("Operation name does not exist: [{}]"
    
    245 253
                                     .format(operation_name))
    
    246 254
     
    
    247
    -        if peer in self.__operations_message_queues:
    
    248
    -            del self.__operations_message_queues[peer]
    
    255
    +        if peer.uid in self.__operations_message_queues:
    
    256
    +            del self.__operations_message_queues[peer.uid]
    
    249 257
     
    
    250
    -        del self.__operations_by_peer[peer]
    
    258
    +        del self.__operations_by_peer[peer.uid]
    
    251 259
     
    
    252 260
             # Drop the operation if nobody is watching it anymore:
    
    253 261
             if operation not in self.__operations_by_peer.values():
    
    ... ... @@ -258,6 +266,9 @@ class Job:
    258 266
                 self.__logger.debug("Operation deleted for job [%s]: [%s]",
    
    259 267
                                     self._name, operation.name)
    
    260 268
     
    
    269
    +        with self.__peers_lock:
    
    270
    +            self.__peers.remove(peer)
    
    271
    +
    
    261 272
         def list_operations(self):
    
    262 273
             """Lists the :class:`Operation` related to a job.
    
    263 274
     
    

  • buildgrid/server/peer.py
    1
    +# Copyright (C) 2019 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +class Peer:
    
    17
    +    """Represents a client during a session."""
    
    18
    +    def __init__(self, peer_id, token=None, request_metadata=None):
    
    19
    +        self._id = peer_id  # This uniquely identifies a client
    
    20
    +        self._token = token
    
    21
    +
    
    22
    +        self.__request_metadata = request_metadata
    
    23
    +
    
    24
    +    def __eq__(self, other):
    
    25
    +        if isinstance(other, Peer):
    
    26
    +            return self.uid == other.uid
    
    27
    +        return False
    
    28
    +
    
    29
    +    def __hash__(self):
    
    30
    +        return hash(self.uid)  # This string is unique for each peer
    
    31
    +
    
    32
    +    @property
    
    33
    +    def uid(self):
    
    34
    +        return self._id
    
    35
    +
    
    36
    +    @property
    
    37
    +    def token(self):
    
    38
    +        return self._token
    
    39
    +
    
    40
    +    # -- `RequestMetadata` optional values (attached to the Execute() call) --
    
    41
    +    @property
    
    42
    +    def request_metadata(self):
    
    43
    +        return self.__request_metadata
    
    44
    +
    
    45
    +    @property
    
    46
    +    def tool_name(self):
    
    47
    +        if self.__request_metadata and self.__request_metadata.tool_details:
    
    48
    +            return self.__request_metadata.tool_details.tool_name
    
    49
    +        return None
    
    50
    +
    
    51
    +    def tool_version(self):
    
    52
    +        if self.__request_metadata and self.__request_metadata.tool_details:
    
    53
    +            return self.__request_metadata.tool_details.tool_version
    
    54
    +        return None
    
    55
    +
    
    56
    +    @property
    
    57
    +    def action_id(self):
    
    58
    +        if self.__request_metadata:
    
    59
    +            return self.__request_metadata.action_id
    
    60
    +        return None
    
    61
    +
    
    62
    +    @property
    
    63
    +    def tool_invocation_id(self):
    
    64
    +        if self.__request_metadata:
    
    65
    +            return self.__request_metadata.tool_invocation_id
    
    66
    +        return None
    
    67
    +
    
    68
    +    @property
    
    69
    +    def correlated_invocations_id(self):
    
    70
    +        if self.__request_metadata:
    
    71
    +            return self.__request_metadata.correlated_invocations_id
    
    72
    +        return None

  • buildgrid/server/scheduler.py
    ... ... @@ -54,9 +54,8 @@ class Scheduler:
    54 54
     
    
    55 55
             self.__queue = []
    
    56 56
     
    
    57
    -        self._is_instrumented = monitor
    
    58
    -
    
    59
    -        if self._is_instrumented:
    
    57
    +        self._is_instrumented = False
    
    58
    +        if monitor:
    
    60 59
                 self.activate_monitoring()
    
    61 60
     
    
    62 61
         # --- Public API ---
    
    ... ... @@ -87,7 +86,7 @@ class Scheduler:
    87 86
     
    
    88 87
             Args:
    
    89 88
                 job_name (str): name of the job to subscribe to.
    
    90
    -            peer (str): a unique string identifying the client.
    
    89
    +            peer (Peer): object that represents the client
    
    91 90
                 message_queue (queue.Queue): the event queue to register.
    
    92 91
     
    
    93 92
             Returns:
    
    ... ... @@ -114,7 +113,7 @@ class Scheduler:
    114 113
     
    
    115 114
             Args:
    
    116 115
                 operation_name (str): name of the operation to subscribe to.
    
    117
    -            peer (str): a unique string identifying the client.
    
    116
    +            peer (Peer): an object that represents the client.
    
    118 117
                 message_queue (queue.Queue): the event queue to register.
    
    119 118
     
    
    120 119
             Returns:
    
    ... ... @@ -137,7 +136,7 @@ class Scheduler:
    137 136
     
    
    138 137
             Args:
    
    139 138
                 operation_name (str): name of the operation to unsubscribe from.
    
    140
    -            peer (str): a unique string identifying the client.
    
    139
    +            peer (Peer): object that represents the client
    
    141 140
     
    
    142 141
             Raises:
    
    143 142
                 NotFoundError: If no operation with `operation_name` exists.
    

  • buildgrid/settings.py
    ... ... @@ -14,6 +14,7 @@
    14 14
     
    
    15 15
     
    
    16 16
     import hashlib
    
    17
    +from _version import __version__
    
    17 18
     
    
    18 19
     
    
    19 20
     # Hash function used for computing digests:
    
    ... ... @@ -43,3 +44,13 @@ BROWSER_URL_FORMAT = '%(type)s/%(instance)s/%(hash)s/%(sizebytes)s/'
    43 44
     #  type       - Type of CAS object, eg. 'action_result', 'command'...
    
    44 45
     #  hash       - Object's digest hash.
    
    45 46
     #  sizebytes  - Object's digest size in bytes.
    
    47
    +
    
    48
    +
    
    49
    +# Name of the header key to attach optional `RequestMetadata`values.
    
    50
    +# (This is defined in the REAPI specification.)
    
    51
    +REQUEST_METADATA_HEADER_NAME = 'requestmetadata-bin'
    
    52
    +
    
    53
    +# 'RequestMetadata' header values. These values will be used when
    
    54
    +# attaching optional metadata to a gRPC request's header:
    
    55
    +REQUEST_METADATA_TOOL_NAME = 'buildgrid'
    
    56
    +REQUEST_METADATA_TOOL_VERSION = __version__

  • buildgrid/utils.py
    ... ... @@ -18,7 +18,7 @@ from operator import attrgetter
    18 18
     import os
    
    19 19
     import socket
    
    20 20
     
    
    21
    -from buildgrid.settings import HASH, HASH_LENGTH, BROWSER_URL_FORMAT
    
    21
    +from buildgrid.settings import HASH, HASH_LENGTH, BROWSER_URL_FORMAT, REQUEST_METADATA_HEADER_NAME
    
    22 22
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    23 23
     
    
    24 24
     
    
    ... ... @@ -284,3 +284,25 @@ def output_directory_maker(directory_path, working_path, tree_digest):
    284 284
         output_directory.path = os.path.relpath(directory_path, start=working_path)
    
    285 285
     
    
    286 286
         return output_directory
    
    287
    +
    
    288
    +
    
    289
    +def request_metadata_header_entry(tool_name=None, tool_version=None,
    
    290
    +                                  action_id=None, tool_invocation_id=None,
    
    291
    +                                  correlated_invocations_id=None):
    
    292
    +    """Creates a serialized RequestMetadata entry to attach to a gRPC
    
    293
    +    call header. Arguments should be of type str or None.
    
    294
    +    """
    
    295
    +    request_metadata = remote_execution_pb2.RequestMetadata()
    
    296
    +    if action_id:
    
    297
    +        request_metadata.action_id = action_id
    
    298
    +    if tool_invocation_id:
    
    299
    +        request_metadata.tool_invocation_id = tool_invocation_id
    
    300
    +    if correlated_invocations_id:
    
    301
    +        request_metadata.correlated_invocations_id = correlated_invocations_id
    
    302
    +    if tool_name:
    
    303
    +        request_metadata.tool_details.tool_name = tool_name
    
    304
    +    if tool_version:
    
    305
    +        request_metadata.tool_details.tool_version = tool_version
    
    306
    +
    
    307
    +    return ((REQUEST_METADATA_HEADER_NAME,
    
    308
    +             request_metadata.SerializeToString()),)



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]