[Notes] [Git][BuildGrid/buildgrid][santigl/106-request-metadata] Add support for RequestMetadata



Title: GitLab

Santiago Gil pushed to branch santigl/106-request-metadata at BuildGrid / buildgrid

Commits:

7 changed files:

Changes:

  • buildgrid/_app/commands/cmd_execute.py
    ... ... @@ -30,6 +30,8 @@ from buildgrid.client.authentication import setup_channel
    30 30
     from buildgrid.client.cas import download, upload
    
    31 31
     from buildgrid._exceptions import InvalidArgumentError
    
    32 32
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    33
    +from buildgrid.settings import REQUEST_METADATA_HEADER_NAME, \
    
    34
    +    REQUEST_METADATA_TOOL_NAME, REQUEST_METADATA_TOOL_VERSION
    
    33 35
     from buildgrid.utils import create_digest
    
    34 36
     
    
    35 37
     from ..cli import pass_context
    
    ... ... @@ -66,10 +68,12 @@ def cli(context, remote, instance_name, auth_token, client_key, client_cert, ser
    66 68
     @cli.command('request-dummy', short_help="Send a dummy action.")
    
    67 69
     @click.option('--number', type=click.INT, default=1, show_default=True,
    
    68 70
                   help="Number of request to send.")
    
    71
    +@click.option('--request-metadata', is_flag=True,
    
    72
    +              help="Attach RequestMetadata to the request header.")
    
    69 73
     @click.option('--wait-for-completion', is_flag=True,
    
    70 74
                   help="Stream updates until jobs are completed.")
    
    71 75
     @pass_context
    
    72
    -def request_dummy(context, number, wait_for_completion):
    
    76
    +def request_dummy(context, number, request_metadata, wait_for_completion):
    
    73 77
     
    
    74 78
         click.echo("Sending execution request...")
    
    75 79
         command = remote_execution_pb2.Command()
    
    ... ... @@ -85,12 +89,21 @@ def request_dummy(context, number, wait_for_completion):
    85 89
                                                       action_digest=action_digest,
    
    86 90
                                                       skip_cache_lookup=True)
    
    87 91
     
    
    92
    +    # If enabled, we attach some `RequestMetadata` information to the request:
    
    93
    +    execute_arguments = {}
    
    94
    +    if request_metadata:
    
    95
    +        metadata = request_metadata_header_entry(tool_name=REQUEST_METADATA_TOOL_NAME,
    
    96
    +                                                 tool_version=REQUEST_METADATA_TOOL_VERSION,
    
    97
    +                                                 action_id='2',
    
    98
    +                                                 tool_invocation_id='3',
    
    99
    +                                                 correlated_invocations_id='4')
    
    100
    +        execute_arguments['metadata'] = metadata
    
    101
    +
    
    88 102
         responses = []
    
    89 103
         for _ in range(0, number):
    
    90
    -        responses.append(stub.Execute(request))
    
    104
    +        responses.append(stub.Execute(request, **execute_arguments))
    
    91 105
     
    
    92 106
         for response in responses:
    
    93
    -
    
    94 107
             if wait_for_completion:
    
    95 108
                 result = None
    
    96 109
                 for stream in response:
    
    ... ... @@ -113,14 +126,22 @@ def request_dummy(context, number, wait_for_completion):
    113 126
                   help="Output directory for the output files.")
    
    114 127
     @click.option('-p', '--platform-property', nargs=2, type=(click.STRING, click.STRING), multiple=True,
    
    115 128
                   help="List of key-value pairs of required platform properties.")
    
    129
    +@click.option('-t', '--tool-details', nargs=2, type=str,
    
    130
    +              default=(REQUEST_METADATA_TOOL_NAME, REQUEST_METADATA_TOOL_VERSION),
    
    131
    +              help="Tool name and version.")
    
    132
    +@click.option('-a', '--action-id', type=str, help='Action ID.')
    
    133
    +@click.option('-i', '--invocation-id', type=str, help='Tool invocation ID.')
    
    134
    +@click.option('-c', '--correlation-id', type=str, help='Correlated invocation ID.')
    
    116 135
     @click.argument('input-root', nargs=1, type=click.Path(), required=True)
    
    117 136
     @click.argument('commands', nargs=-1, type=click.STRING, required=True)
    
    118 137
     @pass_context
    
    119 138
     def run_command(context, input_root, commands, output_file, output_directory,
    
    120
    -                platform_property):
    
    139
    +                platform_property, tool_details, action_id, invocation_id,
    
    140
    +                correlation_id):
    
    121 141
         stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
    
    122 142
     
    
    123 143
         output_executables = []
    
    144
    +
    
    124 145
         with upload(context.channel, instance=context.instance_name) as uploader:
    
    125 146
             command = remote_execution_pb2.Command()
    
    126 147
     
    
    ... ... @@ -157,7 +178,14 @@ def run_command(context, input_root, commands, output_file, output_directory,
    157 178
         request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
    
    158 179
                                                       action_digest=action_digest,
    
    159 180
                                                       skip_cache_lookup=True)
    
    160
    -    response = stub.Execute(request)
    
    181
    +
    
    182
    +    metadata = request_metadata_header_entry(tool_name=tool_details[0],
    
    183
    +                                             tool_version=tool_details[1],
    
    184
    +                                             action_id=action_id,
    
    185
    +                                             tool_invocation_id=invocation_id,
    
    186
    +                                             correlated_invocations_id=correlation_id)
    
    187
    +
    
    188
    +    response = stub.Execute(request, metadata=metadata)
    
    161 189
     
    
    162 190
         stream = None
    
    163 191
         for stream in response:
    
    ... ... @@ -180,3 +208,25 @@ def run_command(context, input_root, commands, output_file, output_directory,
    180 208
             if output_file_response.path in output_executables:
    
    181 209
                 st = os.stat(path)
    
    182 210
                 os.chmod(path, st.st_mode | stat.S_IXUSR)
    
    211
    +
    
    212
    +
    
    213
    +def request_metadata_header_entry(tool_name=None, tool_version=None,
    
    214
    +                                  action_id=None, tool_invocation_id=None,
    
    215
    +                                  correlated_invocations_id=None):
    
    216
    +    """Creates a serialized RequestMetadata entry to attach to a gRPC
    
    217
    +    call header. Arguments should be of type str or None.
    
    218
    +    """
    
    219
    +    request_metadata = remote_execution_pb2.RequestMetadata()
    
    220
    +    if action_id:
    
    221
    +        request_metadata.action_id = action_id
    
    222
    +    if tool_invocation_id:
    
    223
    +        request_metadata.tool_invocation_id = tool_invocation_id
    
    224
    +    if correlated_invocations_id:
    
    225
    +        request_metadata.correlated_invocations_id = correlated_invocations_id
    
    226
    +    if tool_name:
    
    227
    +        request_metadata.tool_details.tool_name = tool_name
    
    228
    +    if tool_version:
    
    229
    +        request_metadata.tool_details.tool_version = tool_version
    
    230
    +
    
    231
    +    return ((REQUEST_METADATA_HEADER_NAME,
    
    232
    +             request_metadata.SerializeToString()),)

  • buildgrid/server/execution/instance.py
    ... ... @@ -62,7 +62,7 @@ class ExecutionInstance:
    62 62
             return get_hash_type()
    
    63 63
     
    
    64 64
         def execute(self, action_digest, skip_cache_lookup):
    
    65
    -        """ Sends a job for execution.
    
    65
    +        """Sends a job for execution.
    
    66 66
             Queues an action and creates an Operation instance to be associated with
    
    67 67
             this action.
    
    68 68
             """
    

  • buildgrid/server/execution/service.py
    ... ... @@ -27,9 +27,11 @@ from functools import partial
    27 27
     import grpc
    
    28 28
     
    
    29 29
     from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
    
    30
    -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    30
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32
    +from buildgrid.server.peer import Peer
    
    32 33
     from buildgrid.server._authentication import AuthContext, authorize
    
    34
    +from buildgrid.settings import REQUEST_METADATA_HEADER_NAME
    
    33 35
     
    
    34 36
     
    
    35 37
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    ... ... @@ -94,7 +96,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    94 96
     
    
    95 97
             instance_name = request.instance_name
    
    96 98
             message_queue = queue.Queue()
    
    97
    -        peer = context.peer()
    
    99
    +        peer_id = context.peer()
    
    100
    +
    
    101
    +        request_metadata = self._context_extract_request_metadata(context)
    
    102
    +        peer = Peer(peer_id=peer_id, request_metadata=request_metadata)
    
    98 103
     
    
    99 104
             try:
    
    100 105
                 instance = self._get_instance(instance_name)
    
    ... ... @@ -102,8 +107,8 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    102 107
                 job_name = instance.execute(request.action_digest,
    
    103 108
                                             request.skip_cache_lookup)
    
    104 109
     
    
    105
    -            operation_name = instance.register_job_peer(job_name,
    
    106
    -                                                        peer, message_queue)
    
    110
    +            operation_name = instance.register_job_peer(job_name, peer,
    
    111
    +                                                        message_queue)
    
    107 112
     
    
    108 113
                 context.add_callback(partial(self._rpc_termination_callback,
    
    109 114
                                              peer, instance_name, operation_name))
    
    ... ... @@ -161,8 +166,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    161 166
             try:
    
    162 167
                 instance = self._get_instance(instance_name)
    
    163 168
     
    
    164
    -            instance.register_operation_peer(operation_name,
    
    165
    -                                             peer, message_queue)
    
    169
    +            instance.register_operation_peer(operation_name, peer, message_queue)
    
    166 170
     
    
    167 171
                 context.add_callback(partial(self._rpc_termination_callback,
    
    168 172
                                              peer, instance_name, operation_name))
    
    ... ... @@ -231,3 +235,21 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    231 235
     
    
    232 236
             except KeyError:
    
    233 237
                 raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))
    
    238
    +
    
    239
    +    @classmethod
    
    240
    +    def _context_extract_request_metadata(cls, context):
    
    241
    +        """Given a context object, extract the RequestMetadata header
    
    242
    +        values if they are present. If they were not provided,
    
    243
    +        returns None.
    
    244
    +        """
    
    245
    +        invocation_metadata = context.invocation_metadata()
    
    246
    +        request_metadata_entry = next((entry for entry in invocation_metadata
    
    247
    +                                       if entry.key == REQUEST_METADATA_HEADER_NAME),
    
    248
    +                                      None)
    
    249
    +        if not request_metadata_entry:
    
    250
    +            return None
    
    251
    +
    
    252
    +        request_metadata = remote_execution_pb2.RequestMetadata()
    
    253
    +        request_metadata.ParseFromString(request_metadata_entry.value)
    
    254
    +
    
    255
    +        return request_metadata

  • buildgrid/server/job.py
    ... ... @@ -12,9 +12,9 @@
    12 12
     # See the License for the specific language governing permissions and
    
    13 13
     # limitations under the License.
    
    14 14
     
    
    15
    -
    
    16 15
     from datetime import datetime
    
    17 16
     import logging
    
    17
    +from threading import Lock
    
    18 18
     import uuid
    
    19 19
     
    
    20 20
     from google.protobuf import duration_pb2, timestamp_pb2
    
    ... ... @@ -62,6 +62,9 @@ class Job:
    62 62
             self._platform_requirements = platform_requirements \
    
    63 63
                 if platform_requirements else dict()
    
    64 64
     
    
    65
    +        self.__peers_lock = Lock()
    
    66
    +        self.__peers = set()
    
    67
    +
    
    65 68
             self._done = False
    
    66 69
     
    
    67 70
         def __lt__(self, other):
    
    ... ... @@ -175,7 +178,7 @@ class Job:
    175 178
             """Subscribes to a new job's :class:`Operation` stage changes.
    
    176 179
     
    
    177 180
             Args:
    
    178
    -            peer (str): a unique string identifying the client.
    
    181
    +            peer (Peer): an object that represents the client.
    
    179 182
                 message_queue (queue.Queue): the event queue to register.
    
    180 183
     
    
    181 184
             Returns:
    
    ... ... @@ -194,10 +197,13 @@ class Job:
    194 197
                                 self._name, new_operation.name)
    
    195 198
     
    
    196 199
             self.__operations_by_name[new_operation.name] = new_operation
    
    197
    -        self.__operations_by_peer[peer] = new_operation
    
    198
    -        self.__operations_message_queues[peer] = message_queue
    
    200
    +        self.__operations_by_peer[peer.peer_id] = new_operation
    
    201
    +        self.__operations_message_queues[peer.peer_id] = message_queue
    
    202
    +
    
    203
    +        self._send_operations_updates(peers=[peer.peer_id])
    
    199 204
     
    
    200
    -        self._send_operations_updates(peers=[peer])
    
    205
    +        with self.__peers_lock:
    
    206
    +            self.__peers.add(peer)
    
    201 207
     
    
    202 208
             return new_operation.name
    
    203 209
     
    
    ... ... @@ -206,7 +212,7 @@ class Job:
    206 212
     
    
    207 213
             Args:
    
    208 214
                 operation_name (str): an existing operation's name to subscribe to.
    
    209
    -            peer (str): a unique string identifying the client.
    
    215
    +            peer (Peer): an object that represents the client.
    
    210 216
                 message_queue (queue.Queue): the event queue to register.
    
    211 217
     
    
    212 218
             Returns:
    
    ... ... @@ -222,18 +228,20 @@ class Job:
    222 228
                 raise NotFoundError("Operation name does not exist: [{}]"
    
    223 229
                                     .format(operation_name))
    
    224 230
     
    
    225
    -        self.__operations_by_peer[peer] = operation
    
    226
    -        self.__operations_message_queues[peer] = message_queue
    
    231
    +        self.__operations_by_peer[peer.peer_id] = operation
    
    232
    +        self.__operations_message_queues[peer.peer_id] = message_queue
    
    227 233
     
    
    228
    -        self._send_operations_updates(peers=[peer])
    
    234
    +        self._send_operations_updates(peers=[peer.peer_id])
    
    235
    +
    
    236
    +        with self.__peers_lock:
    
    237
    +            self.__peers.add(peer)
    
    229 238
     
    
    230 239
         def unregister_operation_peer(self, operation_name, peer):
    
    231 240
             """Unsubscribes to the job's :class:`Operation` stage change.
    
    232 241
     
    
    233 242
             Args:
    
    234 243
                 operation_name (str): an existing operation's name to unsubscribe from.
    
    235
    -            peer (str): a unique string identifying the client.
    
    236
    -
    
    244
    +            peer (Peer): an object that represents the client.
    
    237 245
             Raises:
    
    238 246
                 NotFoundError: If no operation with `operation_name` exists.
    
    239 247
             """
    
    ... ... @@ -244,10 +252,10 @@ class Job:
    244 252
                 raise NotFoundError("Operation name does not exist: [{}]"
    
    245 253
                                     .format(operation_name))
    
    246 254
     
    
    247
    -        if peer in self.__operations_message_queues:
    
    248
    -            del self.__operations_message_queues[peer]
    
    255
    +        if peer.peer_id in self.__operations_message_queues:
    
    256
    +            del self.__operations_message_queues[peer.peer_id]
    
    249 257
     
    
    250
    -        del self.__operations_by_peer[peer]
    
    258
    +        del self.__operations_by_peer[peer.peer_id]
    
    251 259
     
    
    252 260
             # Drop the operation if nobody is watching it anymore:
    
    253 261
             if operation not in self.__operations_by_peer.values():
    
    ... ... @@ -258,6 +266,9 @@ class Job:
    258 266
                 self.__logger.debug("Operation deleted for job [%s]: [%s]",
    
    259 267
                                     self._name, operation.name)
    
    260 268
     
    
    269
    +        with self.__peers_lock:
    
    270
    +            self.__peers.remove(peer)
    
    271
    +
    
    261 272
         def list_operations(self):
    
    262 273
             """Lists the :class:`Operation` related to a job.
    
    263 274
     
    

  • buildgrid/server/peer.py
    1
    +# Copyright (C) 2019 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +class Peer:
    
    17
    +    """Represents a client during a session."""
    
    18
    +    def __init__(self, peer_id, token=None, request_metadata=None):
    
    19
    +        self._id = peer_id  # This uniquely identifies a client
    
    20
    +        self._token = token
    
    21
    +
    
    22
    +        self.__request_metadata = request_metadata
    
    23
    +
    
    24
    +    def __eq__(self, other):
    
    25
    +        if isinstance(other, Peer):
    
    26
    +            return self.peer_id == other.peer_id
    
    27
    +        return False
    
    28
    +
    
    29
    +    def __hash__(self):
    
    30
    +        return hash(self.peer_id)  # This string is unique for each peer
    
    31
    +
    
    32
    +    @property
    
    33
    +    def peer_id(self):
    
    34
    +        return self._id
    
    35
    +
    
    36
    +    @property
    
    37
    +    def token(self):
    
    38
    +        return self._token
    
    39
    +
    
    40
    +    # -- `RequestMetadata` optional values (attached to the Execute() call) --
    
    41
    +    @property
    
    42
    +    def request_metadata(self):
    
    43
    +        return self.__request_metadata
    
    44
    +
    
    45
    +    @property
    
    46
    +    def tool_name(self):
    
    47
    +        if self.__request_metadata and self.__request_metadata.tool_details:
    
    48
    +            return self.__request_metadata.tool_details.tool_name
    
    49
    +        return None
    
    50
    +
    
    51
    +    def tool_version(self):
    
    52
    +        if self.__request_metadata and self.__request_metadata.tool_details:
    
    53
    +            return self.__request_metadata.tool_details.tool_version
    
    54
    +        return None
    
    55
    +
    
    56
    +    @property
    
    57
    +    def action_id(self):
    
    58
    +        if self.__request_metadata:
    
    59
    +            return self.__request_metadata.action_id
    
    60
    +        return None
    
    61
    +
    
    62
    +    @property
    
    63
    +    def tool_invocation_id(self):
    
    64
    +        if self.__request_metadata:
    
    65
    +            return self.__request_metadata.tool_invocation_id
    
    66
    +        return None
    
    67
    +
    
    68
    +    @property
    
    69
    +    def correlated_invocations_id(self):
    
    70
    +        if self.__request_metadata:
    
    71
    +            return self.__request_metadata.correlated_invocations_id
    
    72
    +        return None

  • buildgrid/server/scheduler.py
    ... ... @@ -54,9 +54,8 @@ class Scheduler:
    54 54
     
    
    55 55
             self.__queue = []
    
    56 56
     
    
    57
    -        self._is_instrumented = monitor
    
    58
    -
    
    59
    -        if self._is_instrumented:
    
    57
    +        self._is_instrumented = False
    
    58
    +        if monitor:
    
    60 59
                 self.activate_monitoring()
    
    61 60
     
    
    62 61
         # --- Public API ---
    
    ... ... @@ -87,7 +86,7 @@ class Scheduler:
    87 86
     
    
    88 87
             Args:
    
    89 88
                 job_name (str): name of the job to subscribe to.
    
    90
    -            peer (str): a unique string identifying the client.
    
    89
    +            peer (Peer): object that represents the client
    
    91 90
                 message_queue (queue.Queue): the event queue to register.
    
    92 91
     
    
    93 92
             Returns:
    
    ... ... @@ -114,7 +113,7 @@ class Scheduler:
    114 113
     
    
    115 114
             Args:
    
    116 115
                 operation_name (str): name of the operation to subscribe to.
    
    117
    -            peer (str): a unique string identifying the client.
    
    116
    +            peer (Peer): an object that represents the client.
    
    118 117
                 message_queue (queue.Queue): the event queue to register.
    
    119 118
     
    
    120 119
             Returns:
    
    ... ... @@ -137,7 +136,7 @@ class Scheduler:
    137 136
     
    
    138 137
             Args:
    
    139 138
                 operation_name (str): name of the operation to unsubscribe from.
    
    140
    -            peer (str): a unique string identifying the client.
    
    139
    +            peer (Peer): object that represents the client
    
    141 140
     
    
    142 141
             Raises:
    
    143 142
                 NotFoundError: If no operation with `operation_name` exists.
    

  • buildgrid/settings.py
    ... ... @@ -43,3 +43,13 @@ BROWSER_URL_FORMAT = '%(type)s/%(instance)s/%(hash)s/%(sizebytes)s/'
    43 43
     #  type       - Type of CAS object, eg. 'action_result', 'command'...
    
    44 44
     #  hash       - Object's digest hash.
    
    45 45
     #  sizebytes  - Object's digest size in bytes.
    
    46
    +
    
    47
    +
    
    48
    +# Name of the header key to attach optional `RequestMetadata`values.
    
    49
    +# (This is defined in the REAPI specification.)
    
    50
    +REQUEST_METADATA_HEADER_NAME = 'requestmetadata-bin'
    
    51
    +
    
    52
    +# 'RequestMetadata' header values. These values will be used when
    
    53
    +# attaching optional metadata to a gRPC request's header:
    
    54
    +REQUEST_METADATA_TOOL_NAME = 'BuildGrid'
    
    55
    +REQUEST_METADATA_TOOL_VERSION = '0.1'



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]