Santiago Gil pushed to branch santigl/106-request-metadata at BuildGrid / buildgrid
Commits:
- 
839afd1d
by Santiago Gil at 2019-02-27T13:55:25Z
- 
dc77d7e3
by Santiago Gil at 2019-02-27T14:05:09Z
- 
6237ed29
by Santiago Gil at 2019-02-27T15:09:20Z
16 changed files:
- buildgrid/_app/commands/cmd_actioncache.py
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/_app/commands/cmd_capabilities.py
- buildgrid/_app/commands/cmd_cas.py
- buildgrid/_app/commands/cmd_execute.py
- buildgrid/_app/commands/cmd_operation.py
- buildgrid/client/authentication.py
- + buildgrid/client/channel.py
- + buildgrid/client/requestmetadata.py
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- + buildgrid/server/peer.py
- + buildgrid/server/requestmetadata.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- tests/auth/test_client.py
Changes:
| ... | ... | @@ -21,8 +21,9 @@ import click | 
| 21 | 21 |  from google.protobuf import json_format
 | 
| 22 | 22 |  | 
| 23 | 23 |  from buildgrid.client.actioncache import query
 | 
| 24 | -from buildgrid.client.authentication import setup_channel
 | |
| 24 | +from buildgrid.client.channel import setup_channel
 | |
| 25 | 25 |  from buildgrid.client.cas import download
 | 
| 26 | +from buildgrid._exceptions import InvalidArgumentError
 | |
| 26 | 27 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 27 | 28 |  from buildgrid.utils import create_digest, parse_digest
 | 
| 28 | 29 |  | 
| ... | ... | @@ -42,13 +43,26 @@ from ..cli import pass_context | 
| 42 | 43 |                help="Public server certificate for TLS (PEM-encoded)")
 | 
| 43 | 44 |  @click.option('--instance-name', type=click.STRING, default=None, show_default=True,
 | 
| 44 | 45 |                help="Targeted farm instance name.")
 | 
| 46 | +@click.option('-t', '--tool-name', type=str, help='Tool name.')
 | |
| 47 | +@click.option('-n', '--tool-version', type=str, help='Tool version.')
 | |
| 48 | +@click.option('-a', '--action-id', type=str, help='Action ID.')
 | |
| 49 | +@click.option('-i', '--invocation-id', type=str, help='Tool invocation ID.')
 | |
| 50 | +@click.option('-c', '--correlation-id', type=str, help='Correlated invocation ID.')
 | |
| 45 | 51 |  @pass_context
 | 
| 46 | -def cli(context, remote, instance_name, auth_token, client_key, client_cert, server_cert):
 | |
| 52 | +def cli(context, remote, instance_name, auth_token, client_key, client_cert,
 | |
| 53 | +        server_cert, tool_name, tool_version, action_id, invocation_id,
 | |
| 54 | +        correlation_id):
 | |
| 47 | 55 |      """Entry-point for the ``bgd action-cache`` CLI command group."""
 | 
| 48 | 56 |      try:
 | 
| 49 | 57 |          context.channel, _ = setup_channel(remote, auth_token=auth_token,
 | 
| 50 | -                                           client_key=client_key, client_cert=client_cert,
 | |
| 51 | -                                           server_cert=server_cert)
 | |
| 58 | +                                           client_key=client_key,
 | |
| 59 | +                                           client_cert=client_cert,
 | |
| 60 | +                                           server_cert=server_cert,
 | |
| 61 | +                                           tool_name=tool_name,
 | |
| 62 | +                                           tool_version=tool_version,
 | |
| 63 | +                                           action_id=action_id,
 | |
| 64 | +                                           tool_invocation_id=invocation_id,
 | |
| 65 | +                                           correlated_invocations_id=correlation_id)
 | |
| 52 | 66 |  | 
| 53 | 67 |      except InvalidArgumentError as e:
 | 
| 54 | 68 |          click.echo("Error: {}.".format(e), err=True)
 | 
| ... | ... | @@ -29,7 +29,7 @@ from buildgrid.bot import bot, interface, session | 
| 29 | 29 |  from buildgrid.bot.hardware.interface import HardwareInterface
 | 
| 30 | 30 |  from buildgrid.bot.hardware.device import Device
 | 
| 31 | 31 |  from buildgrid.bot.hardware.worker import Worker
 | 
| 32 | -from buildgrid.client.authentication import setup_channel
 | |
| 32 | +from buildgrid.client.channel import setup_channel
 | |
| 33 | 33 |  from buildgrid._exceptions import InvalidArgumentError
 | 
| 34 | 34 |  | 
| 35 | 35 |  from ..bots import buildbox, dummy, host
 | 
| ... | ... | @@ -70,12 +70,16 @@ def cli(context, parent, update_period, remote, auth_token, client_key, | 
| 70 | 70 |      setup_logging(verbosity=verbose)
 | 
| 71 | 71 |      # Setup the remote execution server channel:
 | 
| 72 | 72 |      try:
 | 
| 73 | -        context.channel, details = setup_channel(remote, auth_token=auth_token, server_cert=server_cert,
 | |
| 74 | -                                                 client_key=client_key, client_cert=client_cert)
 | |
| 73 | +        context.channel, details = setup_channel(remote, auth_token=auth_token,
 | |
| 74 | +                                                 server_cert=server_cert,
 | |
| 75 | +                                                 client_key=client_key,
 | |
| 76 | +                                                 client_cert=client_cert)
 | |
| 75 | 77 |  | 
| 76 | 78 |          if remote_cas is not None and remote_cas != remote:
 | 
| 77 | -            context.cas_channel, details = setup_channel(remote_cas, server_cert=cas_server_cert,
 | |
| 78 | -                                                         client_key=cas_client_key, client_cert=cas_client_cert)
 | |
| 79 | +            context.cas_channel, details = setup_channel(remote_cas,
 | |
| 80 | +                                                         server_cert=cas_server_cert,
 | |
| 81 | +                                                         client_key=cas_client_key,
 | |
| 82 | +                                                         client_cert=cas_client_cert)
 | |
| 79 | 83 |              context.remote_cas_url = remote_cas
 | 
| 80 | 84 |  | 
| 81 | 85 |          else:
 | 
| ... | ... | @@ -18,7 +18,7 @@ import sys | 
| 18 | 18 |  import click
 | 
| 19 | 19 |  from google.protobuf import json_format
 | 
| 20 | 20 |  | 
| 21 | -from buildgrid.client.authentication import setup_channel
 | |
| 21 | +from buildgrid.client.channel import setup_channel
 | |
| 22 | 22 |  from buildgrid.client.capabilities import CapabilitiesInterface
 | 
| 23 | 23 |  from buildgrid._exceptions import InvalidArgumentError
 | 
| 24 | 24 |  | 
| ... | ... | @@ -38,13 +38,26 @@ from ..cli import pass_context | 
| 38 | 38 |                help="Public server certificate for TLS (PEM-encoded).")
 | 
| 39 | 39 |  @click.option('--instance-name', type=click.STRING, default=None, show_default=True,
 | 
| 40 | 40 |                help="Targeted farm instance name.")
 | 
| 41 | +@click.option('-t', '--tool-name', type=str, help='Tool name.')
 | |
| 42 | +@click.option('-n', '--tool-version', type=str, help='Tool version.')
 | |
| 43 | +@click.option('-a', '--action-id', type=str, help='Action ID.')
 | |
| 44 | +@click.option('-i', '--invocation-id', type=str, help='Tool invocation ID.')
 | |
| 45 | +@click.option('-c', '--correlation-id', type=str, help='Correlated invocation ID.')
 | |
| 41 | 46 |  @pass_context
 | 
| 42 | -def cli(context, remote, instance_name, auth_token, client_key, client_cert, server_cert):
 | |
| 47 | +def cli(context, remote, instance_name, auth_token, client_key, client_cert,
 | |
| 48 | +        server_cert, tool_name, tool_version, action_id, invocation_id,
 | |
| 49 | +        correlation_id):
 | |
| 43 | 50 |      """Entry point for the bgd-capabilities CLI command group."""
 | 
| 44 | 51 |      try:
 | 
| 45 | 52 |          context.channel, _ = setup_channel(remote, auth_token=auth_token,
 | 
| 46 | -                                           client_key=client_key, client_cert=client_cert,
 | |
| 47 | -                                           server_cert=server_cert)
 | |
| 53 | +                                           client_key=client_key,
 | |
| 54 | +                                           client_cert=client_cert,
 | |
| 55 | +                                           server_cert=server_cert,
 | |
| 56 | +                                           tool_name=tool_name,
 | |
| 57 | +                                           tool_version=tool_version,
 | |
| 58 | +                                           action_id=action_id,
 | |
| 59 | +                                           tool_invocation_id=invocation_id,
 | |
| 60 | +                                           correlated_invocations_id=correlation_id)
 | |
| 48 | 61 |  | 
| 49 | 62 |      except InvalidArgumentError as e:
 | 
| 50 | 63 |          click.echo("Error: {}.".format(e), err=True)
 | 
| ... | ... | @@ -25,7 +25,7 @@ import sys | 
| 25 | 25 |  | 
| 26 | 26 |  import click
 | 
| 27 | 27 |  | 
| 28 | -from buildgrid.client.authentication import setup_channel
 | |
| 28 | +from buildgrid.client.channel import setup_channel
 | |
| 29 | 29 |  from buildgrid.client.cas import download, upload
 | 
| 30 | 30 |  from buildgrid._exceptions import InvalidArgumentError
 | 
| 31 | 31 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| ... | ... | @@ -47,13 +47,25 @@ from ..cli import pass_context | 
| 47 | 47 |                help="Public server certificate for TLS (PEM-encoded)")
 | 
| 48 | 48 |  @click.option('--instance-name', type=click.STRING, default=None, show_default=True,
 | 
| 49 | 49 |                help="Targeted farm instance name.")
 | 
| 50 | +@click.option('-t', '--tool-name', type=str, help='Tool name.')
 | |
| 51 | +@click.option('-n', '--tool-version', type=str, help='Tool version.')
 | |
| 52 | +@click.option('-a', '--action-id', type=str, help='Action ID.')
 | |
| 53 | +@click.option('-i', '--invocation-id', type=str, help='Tool invocation ID.')
 | |
| 54 | +@click.option('-c', '--correlation-id', type=str, help='Correlated invocation ID.')
 | |
| 50 | 55 |  @pass_context
 | 
| 51 | -def cli(context, remote, instance_name, auth_token, client_key, client_cert, server_cert):
 | |
| 56 | +def cli(context, remote, instance_name, auth_token, client_key, client_cert,
 | |
| 57 | +        server_cert, tool_name, tool_version, action_id, invocation_id, correlation_id):
 | |
| 52 | 58 |      """Entry point for the bgd-cas CLI command group."""
 | 
| 53 | 59 |      try:
 | 
| 54 | 60 |          context.channel, _ = setup_channel(remote, auth_token=auth_token,
 | 
| 55 | -                                           client_key=client_key, client_cert=client_cert,
 | |
| 56 | -                                           server_cert=server_cert)
 | |
| 61 | +                                           client_key=client_key,
 | |
| 62 | +                                           client_cert=client_cert,
 | |
| 63 | +                                           server_cert=server_cert,
 | |
| 64 | +                                           tool_name=tool_name,
 | |
| 65 | +                                           tool_version=tool_version,
 | |
| 66 | +                                           action_id=action_id,
 | |
| 67 | +                                           tool_invocation_id=invocation_id,
 | |
| 68 | +                                           correlated_invocations_id=correlation_id)
 | |
| 57 | 69 |  | 
| 58 | 70 |      except InvalidArgumentError as e:
 | 
| 59 | 71 |          click.echo("Error: {}.".format(e), err=True)
 | 
| ... | ... | @@ -26,10 +26,11 @@ import sys | 
| 26 | 26 |  | 
| 27 | 27 |  import click
 | 
| 28 | 28 |  | 
| 29 | -from buildgrid.client.authentication import setup_channel
 | |
| 29 | +from buildgrid.client.channel import setup_channel
 | |
| 30 | 30 |  from buildgrid.client.cas import download, upload
 | 
| 31 | 31 |  from buildgrid._exceptions import InvalidArgumentError
 | 
| 32 | 32 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | 
| 33 | +from buildgrid.settings import REQUEST_METADATA_TOOL_NAME, REQUEST_METADATA_TOOL_VERSION
 | |
| 33 | 34 |  from buildgrid.utils import create_digest
 | 
| 34 | 35 |  | 
| 35 | 36 |  from ..cli import pass_context
 | 
| ... | ... | @@ -48,13 +49,28 @@ from ..cli import pass_context | 
| 48 | 49 |                help="Public server certificate for TLS (PEM-encoded).")
 | 
| 49 | 50 |  @click.option('--instance-name', type=click.STRING, default=None, show_default=True,
 | 
| 50 | 51 |                help="Targeted farm instance name.")
 | 
| 52 | +@click.option('-t', '--tool-name', type=str, default=REQUEST_METADATA_TOOL_NAME,
 | |
| 53 | +              help='Tool name.')
 | |
| 54 | +@click.option('-n', '--tool-version', type=str, default=REQUEST_METADATA_TOOL_VERSION,
 | |
| 55 | +              help='Tool version.')
 | |
| 56 | +@click.option('-a', '--action-id', type=str, help='Action ID.')
 | |
| 57 | +@click.option('-i', '--invocation-id', type=str, help='Tool invocation ID.')
 | |
| 58 | +@click.option('-c', '--correlation-id', type=str, help='Correlated invocation ID.')
 | |
| 51 | 59 |  @pass_context
 | 
| 52 | -def cli(context, remote, instance_name, auth_token, client_key, client_cert, server_cert):
 | |
| 60 | +def cli(context, remote, instance_name, auth_token, client_key, client_cert,
 | |
| 61 | +        server_cert, tool_name, tool_version, action_id, invocation_id,
 | |
| 62 | +        correlation_id):
 | |
| 53 | 63 |      """Entry point for the bgd-execute CLI command group."""
 | 
| 54 | 64 |      try:
 | 
| 55 | 65 |          context.channel, _ = setup_channel(remote, auth_token=auth_token,
 | 
| 56 | -                                           client_key=client_key, client_cert=client_cert,
 | |
| 57 | -                                           server_cert=server_cert)
 | |
| 66 | +                                           client_key=client_key,
 | |
| 67 | +                                           client_cert=client_cert,
 | |
| 68 | +                                           server_cert=server_cert,
 | |
| 69 | +                                           tool_name=tool_name,
 | |
| 70 | +                                           tool_version=tool_version,
 | |
| 71 | +                                           action_id=action_id,
 | |
| 72 | +                                           tool_invocation_id=invocation_id,
 | |
| 73 | +                                           correlated_invocations_id=correlation_id)
 | |
| 58 | 74 |  | 
| 59 | 75 |      except InvalidArgumentError as e:
 | 
| 60 | 76 |          click.echo("Error: {}.".format(e), err=True)
 | 
| ... | ... | @@ -90,7 +106,6 @@ def request_dummy(context, number, wait_for_completion): | 
| 90 | 106 |          responses.append(stub.Execute(request))
 | 
| 91 | 107 |  | 
| 92 | 108 |      for response in responses:
 | 
| 93 | - | |
| 94 | 109 |          if wait_for_completion:
 | 
| 95 | 110 |              result = None
 | 
| 96 | 111 |              for stream in response:
 | 
| ... | ... | @@ -121,6 +136,7 @@ def run_command(context, input_root, commands, output_file, output_directory, | 
| 121 | 136 |      stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
 | 
| 122 | 137 |  | 
| 123 | 138 |      output_executables = []
 | 
| 139 | + | |
| 124 | 140 |      with upload(context.channel, instance=context.instance_name) as uploader:
 | 
| 125 | 141 |          command = remote_execution_pb2.Command()
 | 
| 126 | 142 |  | 
| ... | ... | @@ -157,6 +173,7 @@ def run_command(context, input_root, commands, output_file, output_directory, | 
| 157 | 173 |      request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
 | 
| 158 | 174 |                                                    action_digest=action_digest,
 | 
| 159 | 175 |                                                    skip_cache_lookup=True)
 | 
| 176 | + | |
| 160 | 177 |      response = stub.Execute(request)
 | 
| 161 | 178 |  | 
| 162 | 179 |      stream = None
 | 
| ... | ... | @@ -29,7 +29,7 @@ import click | 
| 29 | 29 |  from google.protobuf import json_format
 | 
| 30 | 30 |  import grpc
 | 
| 31 | 31 |  | 
| 32 | -from buildgrid.client.authentication import setup_channel
 | |
| 32 | +from buildgrid.client.channel import setup_channel
 | |
| 33 | 33 |  from buildgrid._enums import OperationStage
 | 
| 34 | 34 |  from buildgrid._exceptions import InvalidArgumentError
 | 
| 35 | 35 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | 
| ... | ... | @@ -52,13 +52,26 @@ from ..cli import pass_context | 
| 52 | 52 |                help="Public server certificate for TLS (PEM-encoded).")
 | 
| 53 | 53 |  @click.option('--instance-name', type=click.STRING, default=None, show_default=True,
 | 
| 54 | 54 |                help="Targeted farm instance name.")
 | 
| 55 | +@click.option('-t', '--tool-name', type=str, help='Tool name.')
 | |
| 56 | +@click.option('-n', '--tool-version', type=str, help='Tool version.')
 | |
| 57 | +@click.option('-a', '--action-id', type=str, help='Action ID.')
 | |
| 58 | +@click.option('-i', '--invocation-id', type=str, help='Tool invocation ID.')
 | |
| 59 | +@click.option('-c', '--correlation-id', type=str, help='Correlated invocation ID.')
 | |
| 55 | 60 |  @pass_context
 | 
| 56 | -def cli(context, remote, instance_name, auth_token, client_key, client_cert, server_cert):
 | |
| 61 | +def cli(context, remote, instance_name, auth_token, client_key, client_cert,
 | |
| 62 | +        server_cert, tool_name, tool_version, action_id, invocation_id,
 | |
| 63 | +        correlation_id):
 | |
| 57 | 64 |      """Entry point for the bgd-operation CLI command group."""
 | 
| 58 | 65 |      try:
 | 
| 59 | 66 |          context.channel, _ = setup_channel(remote, auth_token=auth_token,
 | 
| 60 | -                                           client_key=client_key, client_cert=client_cert,
 | |
| 61 | -                                           server_cert=server_cert)
 | |
| 67 | +                                           client_key=client_key,
 | |
| 68 | +                                           client_cert=client_cert,
 | |
| 69 | +                                           server_cert=server_cert,
 | |
| 70 | +                                           tool_name=tool_name,
 | |
| 71 | +                                           tool_version=tool_version,
 | |
| 72 | +                                           action_id=action_id,
 | |
| 73 | +                                           tool_invocation_id=invocation_id,
 | |
| 74 | +                                           correlated_invocations_id=correlation_id)
 | |
| 62 | 75 |  | 
| 63 | 76 |      except InvalidArgumentError as e:
 | 
| 64 | 77 |          click.echo("Error: {}.".format(e), err=True)
 | 
| ... | ... | @@ -15,12 +15,12 @@ | 
| 15 | 15 |  | 
| 16 | 16 |  import base64
 | 
| 17 | 17 |  from collections import namedtuple
 | 
| 18 | -from urllib.parse import urlparse
 | |
| 19 | 18 |  import os
 | 
| 20 | 19 |  | 
| 21 | 20 |  import grpc
 | 
| 22 | 21 |  | 
| 23 | 22 |  from buildgrid._exceptions import InvalidArgumentError
 | 
| 23 | +from buildgrid.client.requestmetadata import RequestMetadataInterceptor
 | |
| 24 | 24 |  from buildgrid.utils import read_file
 | 
| 25 | 25 |  | 
| 26 | 26 |  | 
| ... | ... | @@ -78,54 +78,6 @@ def load_channel_authorization_token(auth_token=None): | 
| 78 | 78 |      return None
 | 
| 79 | 79 |  | 
| 80 | 80 |  | 
| 81 | -def setup_channel(remote_url, auth_token=None,
 | |
| 82 | -                  client_key=None, client_cert=None, server_cert=None):
 | |
| 83 | -    """Creates a new gRPC client communication chanel.
 | |
| 84 | - | |
| 85 | -    If `remote_url` does not specifies a port number, defaults 50051.
 | |
| 86 | - | |
| 87 | -    Args:
 | |
| 88 | -        remote_url (str): URL for the remote, including port and protocol.
 | |
| 89 | -        auth_token (str): Authorization token file path.
 | |
| 90 | -        server_cert(str): TLS certificate chain file path.
 | |
| 91 | -        client_key(str): TLS root certificate file path.
 | |
| 92 | -        client_cert(str): TLS private key file path.
 | |
| 93 | - | |
| 94 | -    Returns:
 | |
| 95 | -        Channel: Client Channel to be used in order to access the server
 | |
| 96 | -            at `remote_url`.
 | |
| 97 | - | |
| 98 | -    Raises:
 | |
| 99 | -        InvalidArgumentError: On any input parsing error.
 | |
| 100 | -    """
 | |
| 101 | -    url = urlparse(remote_url)
 | |
| 102 | -    remote = '{}:{}'.format(url.hostname, url.port or 50051)
 | |
| 103 | -    details = None, None, None
 | |
| 104 | - | |
| 105 | -    if url.scheme == 'http':
 | |
| 106 | -        channel = grpc.insecure_channel(remote)
 | |
| 107 | - | |
| 108 | -    elif url.scheme == 'https':
 | |
| 109 | -        credentials, details = load_tls_channel_credentials(client_key, client_cert, server_cert)
 | |
| 110 | -        if not credentials:
 | |
| 111 | -            raise InvalidArgumentError("Given TLS details (or defaults) could be loaded")
 | |
| 112 | - | |
| 113 | -        channel = grpc.secure_channel(remote, credentials)
 | |
| 114 | - | |
| 115 | -    else:
 | |
| 116 | -        raise InvalidArgumentError("Given remote does not specify a protocol")
 | |
| 117 | - | |
| 118 | -    if auth_token is not None:
 | |
| 119 | -        token = load_channel_authorization_token(auth_token)
 | |
| 120 | -        if not token:
 | |
| 121 | -            raise InvalidArgumentError("Given authorization token could be loaded")
 | |
| 122 | - | |
| 123 | -        interpector = AuthMetadataClientInterceptor(auth_token=token)
 | |
| 124 | -        channel = grpc.intercept_channel(channel, interpector)
 | |
| 125 | - | |
| 126 | -    return channel, details
 | |
| 127 | - | |
| 128 | - | |
| 129 | 81 |  class AuthMetadataClientInterceptor(
 | 
| 130 | 82 |          grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
 | 
| 131 | 83 |          grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
 | 
| 1 | +# Copyright (C) 2019 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | +import grpc
 | |
| 16 | +from urllib.parse import urlparse
 | |
| 17 | + | |
| 18 | +from buildgrid.client.authentication import AuthMetadataClientInterceptor, \
 | |
| 19 | +    load_channel_authorization_token, load_tls_channel_credentials
 | |
| 20 | +from buildgrid.client.requestmetadata import RequestMetadataInterceptor
 | |
| 21 | +from buildgrid._exceptions import InvalidArgumentError
 | |
| 22 | + | |
| 23 | + | |
| 24 | +def setup_channel(remote_url, auth_token=None,
 | |
| 25 | +                  client_key=None, client_cert=None, server_cert=None,
 | |
| 26 | +                  tool_name=None, tool_version=None, action_id=None,
 | |
| 27 | +                  tool_invocation_id=None, correlated_invocations_id=None):
 | |
| 28 | +    """Creates a new gRPC client communication chanel.
 | |
| 29 | + | |
| 30 | +    If `remote_url` does not specifies a port number, defaults 50051.
 | |
| 31 | + | |
| 32 | +    Args:
 | |
| 33 | +        remote_url (str): URL for the remote, including port and protocol.
 | |
| 34 | +        auth_token (str): Authorization token file path.
 | |
| 35 | +        server_cert(str): TLS certificate chain file path.
 | |
| 36 | +        client_key (str): TLS root certificate file path.
 | |
| 37 | +        client_cert (str): TLS private key file path.
 | |
| 38 | +        tool_name (str): Name of the tool generating the request.
 | |
| 39 | +        tool_version (str): Version of the tool generating the request.
 | |
| 40 | +        action_id (str): Action identifier to which the request belongs to.
 | |
| 41 | +        tool_invocation_id (str): Identifier for a related group of Actions.
 | |
| 42 | +        correlated_invocations_id (str): Identifier ties invocations together.
 | |
| 43 | + | |
| 44 | +    Returns:
 | |
| 45 | +        Channel: Client Channel to be used in order to access the server
 | |
| 46 | +            at `remote_url`.
 | |
| 47 | + | |
| 48 | +    Raises:
 | |
| 49 | +        InvalidArgumentError: On any input parsing error.
 | |
| 50 | +    """
 | |
| 51 | +    url = urlparse(remote_url)
 | |
| 52 | +    remote = '{}:{}'.format(url.hostname, url.port or 50051)
 | |
| 53 | +    details = None, None, None
 | |
| 54 | + | |
| 55 | +    if url.scheme == 'http':
 | |
| 56 | +        channel = grpc.insecure_channel(remote)
 | |
| 57 | + | |
| 58 | +    elif url.scheme == 'https':
 | |
| 59 | +        credentials, details = load_tls_channel_credentials(client_key, client_cert, server_cert)
 | |
| 60 | +        if not credentials:
 | |
| 61 | +            raise InvalidArgumentError("Given TLS details (or defaults) could be loaded")
 | |
| 62 | + | |
| 63 | +        channel = grpc.secure_channel(remote, credentials)
 | |
| 64 | + | |
| 65 | +    else:
 | |
| 66 | +        raise InvalidArgumentError("Given remote does not specify a protocol")
 | |
| 67 | + | |
| 68 | +    request_metadata_interceptor \
 | |
| 69 | +        = RequestMetadataInterceptor(tool_name=tool_name,
 | |
| 70 | +                                     tool_version=tool_version,
 | |
| 71 | +                                     action_id=action_id,
 | |
| 72 | +                                     tool_invocation_id=tool_invocation_id,
 | |
| 73 | +                                     correlated_invocations_id=correlated_invocations_id)
 | |
| 74 | + | |
| 75 | +    channel = grpc.intercept_channel(channel, request_metadata_interceptor)
 | |
| 76 | + | |
| 77 | +    if auth_token is not None:
 | |
| 78 | +        token = load_channel_authorization_token(auth_token)
 | |
| 79 | +        if not token:
 | |
| 80 | +            raise InvalidArgumentError("Given authorization token could be loaded")
 | |
| 81 | + | |
| 82 | +        auth_interceptor = AuthMetadataClientInterceptor(auth_token=token)
 | |
| 83 | + | |
| 84 | +        channel = grpc.intercept_channel(channel, auth_interceptor)
 | |
| 85 | + | |
| 86 | +    return channel, details | 
| 1 | +# Copyright (C) 2019 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | +from collections import namedtuple
 | |
| 16 | +import grpc
 | |
| 17 | + | |
| 18 | +from buildgrid.settings import REQUEST_METADATA_HEADER_NAME, \
 | |
| 19 | +    REQUEST_METADATA_TOOL_NAME, REQUEST_METADATA_TOOL_VERSION
 | |
| 20 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 21 | + | |
| 22 | + | |
| 23 | +class RequestMetadataInterceptor(grpc.UnaryUnaryClientInterceptor,
 | |
| 24 | +                                 grpc.UnaryStreamClientInterceptor,
 | |
| 25 | +                                 grpc.StreamUnaryClientInterceptor,
 | |
| 26 | +                                 grpc.StreamStreamClientInterceptor):
 | |
| 27 | + | |
| 28 | +    def __init__(self, tool_name=None, tool_version=None, action_id=None,
 | |
| 29 | +                 tool_invocation_id=None, correlated_invocations_id=None):
 | |
| 30 | +        """Appends optional `RequestMetadata` header values to each call.
 | |
| 31 | + | |
| 32 | +        Args:
 | |
| 33 | +            tool_name (str): If not provided, uses the value set in the config.
 | |
| 34 | +            tool_version (str): If not provided, uses the value set in the config.
 | |
| 35 | +            action_id (str)
 | |
| 36 | +            tool_invocation_id (str)
 | |
| 37 | +            correlated_invocations_id (str)
 | |
| 38 | +        """
 | |
| 39 | +        self._tool_name = tool_name if tool_name else REQUEST_METADATA_TOOL_NAME
 | |
| 40 | +        self._tool_version = tool_version if tool_version else REQUEST_METADATA_TOOL_VERSION
 | |
| 41 | + | |
| 42 | +        self._action_id = action_id
 | |
| 43 | +        self._tool_invocation_id = tool_invocation_id
 | |
| 44 | +        self._correlated_invocations_id = correlated_invocations_id
 | |
| 45 | + | |
| 46 | +        self.__header_field_name = REQUEST_METADATA_HEADER_NAME
 | |
| 47 | +        self.__header_field_value = self._request_metadata()
 | |
| 48 | + | |
| 49 | +    # --- Public API ---
 | |
| 50 | +    def intercept_unary_unary(self, continuation, client_call_details, request):
 | |
| 51 | +        new_details = self._amend_call_details(client_call_details)
 | |
| 52 | + | |
| 53 | +        return continuation(new_details, request)
 | |
| 54 | + | |
| 55 | +    def intercept_unary_stream(self, continuation, client_call_details, request):
 | |
| 56 | +        new_details = self._amend_call_details(client_call_details)
 | |
| 57 | + | |
| 58 | +        return continuation(new_details, request)
 | |
| 59 | + | |
| 60 | +    def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
 | |
| 61 | +        new_details = self._amend_call_details(client_call_details)
 | |
| 62 | + | |
| 63 | +        return continuation(new_details, request_iterator)
 | |
| 64 | + | |
| 65 | +    def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
 | |
| 66 | +        new_details = self._amend_call_details(client_call_details)
 | |
| 67 | + | |
| 68 | +        return continuation(new_details, request_iterator)
 | |
| 69 | + | |
| 70 | +    # --- Private API ---
 | |
| 71 | +    def _request_metadata(self):
 | |
| 72 | +        """Creates a serialized RequestMetadata entry to attach to a gRPC
 | |
| 73 | +        call header. Arguments should be of type str or None.
 | |
| 74 | +        """
 | |
| 75 | +        request_metadata = remote_execution_pb2.RequestMetadata()
 | |
| 76 | +        if self._action_id:
 | |
| 77 | +            request_metadata.action_id = self._action_id
 | |
| 78 | +        if self._tool_invocation_id:
 | |
| 79 | +            request_metadata.tool_invocation_id = self._tool_invocation_id
 | |
| 80 | +        if self._correlated_invocations_id:
 | |
| 81 | +            request_metadata.correlated_invocations_id = self._correlated_invocations_id
 | |
| 82 | +        if self._tool_name:
 | |
| 83 | +            request_metadata.tool_details.tool_name = self._tool_name
 | |
| 84 | +        if self._tool_version:
 | |
| 85 | +            request_metadata.tool_details.tool_version = self._tool_version
 | |
| 86 | + | |
| 87 | +        return request_metadata.SerializeToString()
 | |
| 88 | + | |
| 89 | +    def _amend_call_details(self, client_call_details):
 | |
| 90 | +        if client_call_details.metadata is not None:
 | |
| 91 | +            new_metadata = list(client_call_details.metadata)
 | |
| 92 | +        else:
 | |
| 93 | +            new_metadata = []
 | |
| 94 | + | |
| 95 | +        new_metadata.append((self.__header_field_name,
 | |
| 96 | +                             self.__header_field_value))
 | |
| 97 | + | |
| 98 | +        class _ClientCallDetails(
 | |
| 99 | +                namedtuple('_ClientCallDetails',
 | |
| 100 | +                           ('method', 'timeout', 'credentials', 'metadata',)),
 | |
| 101 | +                grpc.ClientCallDetails):
 | |
| 102 | +            pass
 | |
| 103 | + | |
| 104 | +        return _ClientCallDetails(client_call_details.method,
 | |
| 105 | +                                  client_call_details.timeout,
 | |
| 106 | +                                  client_call_details.credentials,
 | |
| 107 | +                                  new_metadata) | 
| ... | ... | @@ -27,9 +27,11 @@ from functools import partial | 
| 27 | 27 |  import grpc
 | 
| 28 | 28 |  | 
| 29 | 29 |  from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
 | 
| 30 | -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
 | |
| 30 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | |
| 31 | 31 |  from buildgrid._protos.google.longrunning import operations_pb2
 | 
| 32 | 32 |  from buildgrid.server._authentication import AuthContext, authorize
 | 
| 33 | +from buildgrid.server.peer import Peer
 | |
| 34 | +from buildgrid.server.requestmetadata import context_extract_request_metadata
 | |
| 33 | 35 |  | 
| 34 | 36 |  | 
| 35 | 37 |  class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
 | 
| ... | ... | @@ -94,7 +96,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 94 | 96 |  | 
| 95 | 97 |          instance_name = request.instance_name
 | 
| 96 | 98 |          message_queue = queue.Queue()
 | 
| 97 | -        peer = context.peer()
 | |
| 99 | +        peer_id = context.peer()
 | |
| 100 | + | |
| 101 | +        request_metadata = context_extract_request_metadata(context)
 | |
| 102 | +        peer = Peer(uid=peer_id, request_metadata=request_metadata)
 | |
| 98 | 103 |  | 
| 99 | 104 |          try:
 | 
| 100 | 105 |              instance = self._get_instance(instance_name)
 | 
| ... | ... | @@ -102,8 +107,8 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 102 | 107 |              job_name = instance.execute(request.action_digest,
 | 
| 103 | 108 |                                          request.skip_cache_lookup)
 | 
| 104 | 109 |  | 
| 105 | -            operation_name = instance.register_job_peer(job_name,
 | |
| 106 | -                                                        peer, message_queue)
 | |
| 110 | +            operation_name = instance.register_job_peer(job_name, peer,
 | |
| 111 | +                                                        message_queue)
 | |
| 107 | 112 |  | 
| 108 | 113 |              context.add_callback(partial(self._rpc_termination_callback,
 | 
| 109 | 114 |                                           peer, instance_name, operation_name))
 | 
| ... | ... | @@ -161,8 +166,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 161 | 166 |          try:
 | 
| 162 | 167 |              instance = self._get_instance(instance_name)
 | 
| 163 | 168 |  | 
| 164 | -            instance.register_operation_peer(operation_name,
 | |
| 165 | -                                             peer, message_queue)
 | |
| 169 | +            instance.register_operation_peer(operation_name, peer, message_queue)
 | |
| 166 | 170 |  | 
| 167 | 171 |              context.add_callback(partial(self._rpc_termination_callback,
 | 
| 168 | 172 |                                           peer, instance_name, operation_name))
 | 
| ... | ... | @@ -12,9 +12,9 @@ | 
| 12 | 12 |  # See the License for the specific language governing permissions and
 | 
| 13 | 13 |  # limitations under the License.
 | 
| 14 | 14 |  | 
| 15 | - | |
| 16 | 15 |  from datetime import datetime
 | 
| 17 | 16 |  import logging
 | 
| 17 | +from threading import Lock
 | |
| 18 | 18 |  import uuid
 | 
| 19 | 19 |  | 
| 20 | 20 |  from google.protobuf import duration_pb2, timestamp_pb2
 | 
| ... | ... | @@ -175,7 +175,7 @@ class Job: | 
| 175 | 175 |          """Subscribes to a new job's :class:`Operation` stage changes.
 | 
| 176 | 176 |  | 
| 177 | 177 |          Args:
 | 
| 178 | -            peer (str): a unique string identifying the client.
 | |
| 178 | +            peer (Peer): an object that represents the client.
 | |
| 179 | 179 |              message_queue (queue.Queue): the event queue to register.
 | 
| 180 | 180 |  | 
| 181 | 181 |          Returns:
 | 
| ... | ... | @@ -194,10 +194,10 @@ class Job: | 
| 194 | 194 |                              self._name, new_operation.name)
 | 
| 195 | 195 |  | 
| 196 | 196 |          self.__operations_by_name[new_operation.name] = new_operation
 | 
| 197 | -        self.__operations_by_peer[peer] = new_operation
 | |
| 198 | -        self.__operations_message_queues[peer] = message_queue
 | |
| 197 | +        self.__operations_by_peer[peer.uid] = new_operation
 | |
| 198 | +        self.__operations_message_queues[peer.uid] = message_queue
 | |
| 199 | 199 |  | 
| 200 | -        self._send_operations_updates(peers=[peer])
 | |
| 200 | +        self._send_operations_updates(peers=[peer.uid])
 | |
| 201 | 201 |  | 
| 202 | 202 |          return new_operation.name
 | 
| 203 | 203 |  | 
| ... | ... | @@ -206,7 +206,7 @@ class Job: | 
| 206 | 206 |  | 
| 207 | 207 |          Args:
 | 
| 208 | 208 |              operation_name (str): an existing operation's name to subscribe to.
 | 
| 209 | -            peer (str): a unique string identifying the client.
 | |
| 209 | +            peer (Peer): an object that represents the client.
 | |
| 210 | 210 |              message_queue (queue.Queue): the event queue to register.
 | 
| 211 | 211 |  | 
| 212 | 212 |          Returns:
 | 
| ... | ... | @@ -222,18 +222,17 @@ class Job: | 
| 222 | 222 |              raise NotFoundError("Operation name does not exist: [{}]"
 | 
| 223 | 223 |                                  .format(operation_name))
 | 
| 224 | 224 |  | 
| 225 | -        self.__operations_by_peer[peer] = operation
 | |
| 226 | -        self.__operations_message_queues[peer] = message_queue
 | |
| 225 | +        self.__operations_by_peer[peer.uid] = operation
 | |
| 226 | +        self.__operations_message_queues[peer.uid] = message_queue
 | |
| 227 | 227 |  | 
| 228 | -        self._send_operations_updates(peers=[peer])
 | |
| 228 | +        self._send_operations_updates(peers=[peer.uid])
 | |
| 229 | 229 |  | 
| 230 | 230 |      def unregister_operation_peer(self, operation_name, peer):
 | 
| 231 | 231 |          """Unsubscribes to the job's :class:`Operation` stage change.
 | 
| 232 | 232 |  | 
| 233 | 233 |          Args:
 | 
| 234 | 234 |              operation_name (str): an existing operation's name to unsubscribe from.
 | 
| 235 | -            peer (str): a unique string identifying the client.
 | |
| 236 | - | |
| 235 | +            peer (Peer): an object that represents the client.
 | |
| 237 | 236 |          Raises:
 | 
| 238 | 237 |              NotFoundError: If no operation with `operation_name` exists.
 | 
| 239 | 238 |          """
 | 
| ... | ... | @@ -244,10 +243,10 @@ class Job: | 
| 244 | 243 |              raise NotFoundError("Operation name does not exist: [{}]"
 | 
| 245 | 244 |                                  .format(operation_name))
 | 
| 246 | 245 |  | 
| 247 | -        if peer in self.__operations_message_queues:
 | |
| 248 | -            del self.__operations_message_queues[peer]
 | |
| 246 | +        if peer.uid in self.__operations_message_queues:
 | |
| 247 | +            del self.__operations_message_queues[peer.uid]
 | |
| 249 | 248 |  | 
| 250 | -        del self.__operations_by_peer[peer]
 | |
| 249 | +        del self.__operations_by_peer[peer.uid]
 | |
| 251 | 250 |  | 
| 252 | 251 |          # Drop the operation if nobody is watching it anymore:
 | 
| 253 | 252 |          if operation not in self.__operations_by_peer.values():
 | 
| 1 | +# Copyright (C) 2019 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | +from collections import defaultdict
 | |
| 16 | +import logging
 | |
| 17 | +from threading import Lock
 | |
| 18 | + | |
| 19 | + | |
| 20 | +class Peer:
 | |
| 21 | +    """Represents a client during a session."""
 | |
| 22 | + | |
| 23 | +    # We will keep a global list of Peers indexed by their `Peer.uid`
 | |
| 24 | +    # and a reference counter so that we can clean `Peer`s that do not
 | |
| 25 | +    # exist anymore. Both are protected by the same lock:
 | |
| 26 | +    __peers_by_uid = {}
 | |
| 27 | +    __peers_ref_count = defaultdict(int)
 | |
| 28 | +    __peers_lock = Lock()
 | |
| 29 | + | |
| 30 | +    def __init__(self, uid, token=None, request_metadata=None):
 | |
| 31 | +        self._uid = uid  # This uniquely identifies a client
 | |
| 32 | +        self._token = token
 | |
| 33 | + | |
| 34 | +        self.__request_metadata = request_metadata
 | |
| 35 | + | |
| 36 | +        self.__logger = logging.getLogger(__name__)
 | |
| 37 | + | |
| 38 | +        with self.__peers_lock:
 | |
| 39 | +            if self._uid in self.__peers_by_uid and \
 | |
| 40 | +                    self.__peers_by_uid[self._uid] != self:
 | |
| 41 | +                self.__logger.debug('Creating another '
 | |
| 42 | +                                    'instance of Peer with uid {} '
 | |
| 43 | +                                    'with different attributes', self._uid)
 | |
| 44 | + | |
| 45 | +            self.__peers_by_uid[self._uid] = self
 | |
| 46 | +            self.__peers_ref_count[self._uid] += 1
 | |
| 47 | + | |
| 48 | +    def __del__(self):
 | |
| 49 | +        with self.__peers_lock:
 | |
| 50 | +            assert self.uid in self.__peers_by_uid
 | |
| 51 | +            assert self.__peers_by_uid[self.uid] > 0
 | |
| 52 | + | |
| 53 | +            self.__peers_ref_count[self.uid] -= 1
 | |
| 54 | + | |
| 55 | +            if self.__peers_ref_count[self.uid] < 1:
 | |
| 56 | +                del self.__peers_by_uid
 | |
| 57 | +                del self.__peers_ref_count[self.uid]
 | |
| 58 | + | |
| 59 | +    def __eq__(self, other):
 | |
| 60 | +        if isinstance(other, Peer):
 | |
| 61 | +            return self.uid == other.uid
 | |
| 62 | +        return False
 | |
| 63 | + | |
| 64 | +    def __hash__(self):
 | |
| 65 | +        return hash(self.uid)  # This string is unique for each peer
 | |
| 66 | + | |
| 67 | +    @property
 | |
| 68 | +    def uid(self):
 | |
| 69 | +        return self._uid
 | |
| 70 | + | |
| 71 | +    @property
 | |
| 72 | +    def token(self):
 | |
| 73 | +        return self._token
 | |
| 74 | + | |
| 75 | +    # -- `RequestMetadata` optional values (attached to the Execute() call) --
 | |
| 76 | +    @property
 | |
| 77 | +    def request_metadata(self):
 | |
| 78 | +        return self.__request_metadata
 | |
| 79 | + | |
| 80 | +    @property
 | |
| 81 | +    def tool_name(self):
 | |
| 82 | +        if self.__request_metadata and self.__request_metadata.tool_details:
 | |
| 83 | +            return self.__request_metadata.tool_details.tool_name
 | |
| 84 | +        return None
 | |
| 85 | + | |
| 86 | +    def tool_version(self):
 | |
| 87 | +        if self.__request_metadata and self.__request_metadata.tool_details:
 | |
| 88 | +            return self.__request_metadata.tool_details.tool_version
 | |
| 89 | +        return None
 | |
| 90 | + | |
| 91 | +    @property
 | |
| 92 | +    def action_id(self):
 | |
| 93 | +        if self.__request_metadata:
 | |
| 94 | +            return self.__request_metadata.action_id
 | |
| 95 | +        return None
 | |
| 96 | + | |
| 97 | +    @property
 | |
| 98 | +    def tool_invocation_id(self):
 | |
| 99 | +        if self.__request_metadata:
 | |
| 100 | +            return self.__request_metadata.tool_invocation_id
 | |
| 101 | +        return None
 | |
| 102 | + | |
| 103 | +    @property
 | |
| 104 | +    def correlated_invocations_id(self):
 | |
| 105 | +        if self.__request_metadata:
 | |
| 106 | +            return self.__request_metadata.correlated_invocations_id
 | |
| 107 | +        return None | 
| 1 | +# Copyright (C) 2019 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 16 | +from buildgrid.settings import REQUEST_METADATA_HEADER_NAME
 | |
| 17 | + | |
| 18 | + | |
| 19 | +def context_extract_request_metadata(context):
 | |
| 20 | +    """Given a `grpc.ServicerContext` object, extract the RequestMetadata
 | |
| 21 | +    header values if they are present. If they were not provided,
 | |
| 22 | +    returns None.
 | |
| 23 | + | |
| 24 | +    Args:
 | |
| 25 | +        context (grpc.ServicerContext): Context for a RPC call.
 | |
| 26 | + | |
| 27 | +    Returns:
 | |
| 28 | +        A `RequestMetadata` proto if RequestMetadata values are present,
 | |
| 29 | +        otherwise None.
 | |
| 30 | +    """
 | |
| 31 | +    invocation_metadata = context.invocation_metadata()
 | |
| 32 | +    request_metadata_entry = next((entry for entry in invocation_metadata
 | |
| 33 | +                                   if entry.key == REQUEST_METADATA_HEADER_NAME),
 | |
| 34 | +                                  None)
 | |
| 35 | +    if not request_metadata_entry:
 | |
| 36 | +        return None
 | |
| 37 | + | |
| 38 | +    request_metadata = remote_execution_pb2.RequestMetadata()
 | |
| 39 | +    request_metadata.ParseFromString(request_metadata_entry.value)
 | |
| 40 | + | |
| 41 | +    return request_metadata | 
| ... | ... | @@ -54,9 +54,8 @@ class Scheduler: | 
| 54 | 54 |  | 
| 55 | 55 |          self.__queue = []
 | 
| 56 | 56 |  | 
| 57 | -        self._is_instrumented = monitor
 | |
| 58 | - | |
| 59 | -        if self._is_instrumented:
 | |
| 57 | +        self._is_instrumented = False
 | |
| 58 | +        if monitor:
 | |
| 60 | 59 |              self.activate_monitoring()
 | 
| 61 | 60 |  | 
| 62 | 61 |      # --- Public API ---
 | 
| ... | ... | @@ -87,7 +86,7 @@ class Scheduler: | 
| 87 | 86 |  | 
| 88 | 87 |          Args:
 | 
| 89 | 88 |              job_name (str): name of the job to subscribe to.
 | 
| 90 | -            peer (str): a unique string identifying the client.
 | |
| 89 | +            peer (Peer): object that represents the client
 | |
| 91 | 90 |              message_queue (queue.Queue): the event queue to register.
 | 
| 92 | 91 |  | 
| 93 | 92 |          Returns:
 | 
| ... | ... | @@ -114,7 +113,7 @@ class Scheduler: | 
| 114 | 113 |  | 
| 115 | 114 |          Args:
 | 
| 116 | 115 |              operation_name (str): name of the operation to subscribe to.
 | 
| 117 | -            peer (str): a unique string identifying the client.
 | |
| 116 | +            peer (Peer): an object that represents the client.
 | |
| 118 | 117 |              message_queue (queue.Queue): the event queue to register.
 | 
| 119 | 118 |  | 
| 120 | 119 |          Returns:
 | 
| ... | ... | @@ -137,7 +136,7 @@ class Scheduler: | 
| 137 | 136 |  | 
| 138 | 137 |          Args:
 | 
| 139 | 138 |              operation_name (str): name of the operation to unsubscribe from.
 | 
| 140 | -            peer (str): a unique string identifying the client.
 | |
| 139 | +            peer (Peer): object that represents the client
 | |
| 141 | 140 |  | 
| 142 | 141 |          Raises:
 | 
| 143 | 142 |              NotFoundError: If no operation with `operation_name` exists.
 | 
| ... | ... | @@ -14,6 +14,7 @@ | 
| 14 | 14 |  | 
| 15 | 15 |  | 
| 16 | 16 |  import hashlib
 | 
| 17 | +from _version import __version__
 | |
| 17 | 18 |  | 
| 18 | 19 |  | 
| 19 | 20 |  # Hash function used for computing digests:
 | 
| ... | ... | @@ -43,3 +44,13 @@ BROWSER_URL_FORMAT = '%(type)s/%(instance)s/%(hash)s/%(sizebytes)s/' | 
| 43 | 44 |  #  type       - Type of CAS object, eg. 'action_result', 'command'...
 | 
| 44 | 45 |  #  hash       - Object's digest hash.
 | 
| 45 | 46 |  #  sizebytes  - Object's digest size in bytes.
 | 
| 47 | + | |
| 48 | + | |
| 49 | +# Name of the header key to attach optional `RequestMetadata`values.
 | |
| 50 | +# (This is defined in the REAPI specification.)
 | |
| 51 | +REQUEST_METADATA_HEADER_NAME = 'requestmetadata-bin'
 | |
| 52 | + | |
| 53 | +# 'RequestMetadata' header values. These values will be used when
 | |
| 54 | +# attaching optional metadata to a gRPC request's header:
 | |
| 55 | +REQUEST_METADATA_TOOL_NAME = 'buildgrid'
 | |
| 56 | +REQUEST_METADATA_TOOL_VERSION = __version__ | 
| ... | ... | @@ -20,7 +20,7 @@ import os | 
| 20 | 20 |  import grpc
 | 
| 21 | 21 |  import pytest
 | 
| 22 | 22 |  | 
| 23 | -from buildgrid.client.authentication import setup_channel
 | |
| 23 | +from buildgrid.client.channel import setup_channel
 | |
| 24 | 24 |  from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
 | 
| 25 | 25 |  from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm
 | 
| 26 | 26 |  | 
