Santiago Gil pushed to branch santigl/106-request-metadata at BuildGrid / buildgrid
Commits:
-
01971cd0
by Santiago Gil at 2019-02-22T09:33:31Z
7 changed files:
- buildgrid/_app/commands/cmd_execute.py
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- + buildgrid/server/peer.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
Changes:
... | ... | @@ -30,6 +30,8 @@ from buildgrid.client.authentication import setup_channel |
30 | 30 |
from buildgrid.client.cas import download, upload
|
31 | 31 |
from buildgrid._exceptions import InvalidArgumentError
|
32 | 32 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
33 |
+from buildgrid.settings import REQUEST_METADATA_HEADER_NAME, \
|
|
34 |
+ REQUEST_METADATA_TOOL_NAME, REQUEST_METADATA_TOOL_VERSION
|
|
33 | 35 |
from buildgrid.utils import create_digest
|
34 | 36 |
|
35 | 37 |
from ..cli import pass_context
|
... | ... | @@ -66,10 +68,12 @@ def cli(context, remote, instance_name, auth_token, client_key, client_cert, ser |
66 | 68 |
@cli.command('request-dummy', short_help="Send a dummy action.")
|
67 | 69 |
@click.option('--number', type=click.INT, default=1, show_default=True,
|
68 | 70 |
help="Number of request to send.")
|
71 |
+@click.option('--request-metadata', is_flag=True,
|
|
72 |
+ help="Attach RequestMetadata to the request header.")
|
|
69 | 73 |
@click.option('--wait-for-completion', is_flag=True,
|
70 | 74 |
help="Stream updates until jobs are completed.")
|
71 | 75 |
@pass_context
|
72 |
-def request_dummy(context, number, wait_for_completion):
|
|
76 |
+def request_dummy(context, number, request_metadata, wait_for_completion):
|
|
73 | 77 |
|
74 | 78 |
click.echo("Sending execution request...")
|
75 | 79 |
command = remote_execution_pb2.Command()
|
... | ... | @@ -85,12 +89,21 @@ def request_dummy(context, number, wait_for_completion): |
85 | 89 |
action_digest=action_digest,
|
86 | 90 |
skip_cache_lookup=True)
|
87 | 91 |
|
92 |
+ # If enabled, we attach some `RequestMetadata` information to the request:
|
|
93 |
+ execute_arguments = {}
|
|
94 |
+ if request_metadata:
|
|
95 |
+ metadata = request_metadata_header_entry(tool_name=REQUEST_METADATA_TOOL_NAME,
|
|
96 |
+ tool_version=REQUEST_METADATA_TOOL_VERSION,
|
|
97 |
+ action_id='2',
|
|
98 |
+ tool_invocation_id='3',
|
|
99 |
+ correlated_invocations_id='4')
|
|
100 |
+ execute_arguments['metadata'] = metadata
|
|
101 |
+ |
|
88 | 102 |
responses = []
|
89 | 103 |
for _ in range(0, number):
|
90 |
- responses.append(stub.Execute(request))
|
|
104 |
+ responses.append(stub.Execute(request, **execute_arguments))
|
|
91 | 105 |
|
92 | 106 |
for response in responses:
|
93 |
- |
|
94 | 107 |
if wait_for_completion:
|
95 | 108 |
result = None
|
96 | 109 |
for stream in response:
|
... | ... | @@ -113,14 +126,22 @@ def request_dummy(context, number, wait_for_completion): |
113 | 126 |
help="Output directory for the output files.")
|
114 | 127 |
@click.option('-p', '--platform-property', nargs=2, type=(click.STRING, click.STRING), multiple=True,
|
115 | 128 |
help="List of key-value pairs of required platform properties.")
|
129 |
+@click.option('-t', '--tool-details', nargs=2, type=str,
|
|
130 |
+ default=(REQUEST_METADATA_TOOL_NAME, REQUEST_METADATA_TOOL_VERSION),
|
|
131 |
+ help="Tool name and version.")
|
|
132 |
+@click.option('-a', '--action-id', type=str, help='Action ID.')
|
|
133 |
+@click.option('-i', '--invocation-id', type=str, help='Tool invocation ID.')
|
|
134 |
+@click.option('-c', '--correlation-id', type=str, help='Correlated invocation ID.')
|
|
116 | 135 |
@click.argument('input-root', nargs=1, type=click.Path(), required=True)
|
117 | 136 |
@click.argument('commands', nargs=-1, type=click.STRING, required=True)
|
118 | 137 |
@pass_context
|
119 | 138 |
def run_command(context, input_root, commands, output_file, output_directory,
|
120 |
- platform_property):
|
|
139 |
+ platform_property, tool_details, action_id, invocation_id,
|
|
140 |
+ correlation_id):
|
|
121 | 141 |
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
|
122 | 142 |
|
123 | 143 |
output_executables = []
|
144 |
+ |
|
124 | 145 |
with upload(context.channel, instance=context.instance_name) as uploader:
|
125 | 146 |
command = remote_execution_pb2.Command()
|
126 | 147 |
|
... | ... | @@ -157,7 +178,14 @@ def run_command(context, input_root, commands, output_file, output_directory, |
157 | 178 |
request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
|
158 | 179 |
action_digest=action_digest,
|
159 | 180 |
skip_cache_lookup=True)
|
160 |
- response = stub.Execute(request)
|
|
181 |
+ |
|
182 |
+ metadata = request_metadata_header_entry(tool_name=tool_details[0],
|
|
183 |
+ tool_version=tool_details[1],
|
|
184 |
+ action_id=action_id,
|
|
185 |
+ tool_invocation_id=invocation_id,
|
|
186 |
+ correlated_invocations_id=correlation_id)
|
|
187 |
+ |
|
188 |
+ response = stub.Execute(request, metadata=metadata)
|
|
161 | 189 |
|
162 | 190 |
stream = None
|
163 | 191 |
for stream in response:
|
... | ... | @@ -180,3 +208,25 @@ def run_command(context, input_root, commands, output_file, output_directory, |
180 | 208 |
if output_file_response.path in output_executables:
|
181 | 209 |
st = os.stat(path)
|
182 | 210 |
os.chmod(path, st.st_mode | stat.S_IXUSR)
|
211 |
+ |
|
212 |
+ |
|
213 |
+def request_metadata_header_entry(tool_name=None, tool_version=None,
|
|
214 |
+ action_id=None, tool_invocation_id=None,
|
|
215 |
+ correlated_invocations_id=None):
|
|
216 |
+ """Creates a serialized RequestMetadata entry to attach to a gRPC
|
|
217 |
+ call header. Arguments should be of type str or None.
|
|
218 |
+ """
|
|
219 |
+ request_metadata = remote_execution_pb2.RequestMetadata()
|
|
220 |
+ if action_id:
|
|
221 |
+ request_metadata.action_id = action_id
|
|
222 |
+ if tool_invocation_id:
|
|
223 |
+ request_metadata.tool_invocation_id = tool_invocation_id
|
|
224 |
+ if correlated_invocations_id:
|
|
225 |
+ request_metadata.correlated_invocations_id = correlated_invocations_id
|
|
226 |
+ if tool_name:
|
|
227 |
+ request_metadata.tool_details.tool_name = tool_name
|
|
228 |
+ if tool_version:
|
|
229 |
+ request_metadata.tool_details.tool_version = tool_version
|
|
230 |
+ |
|
231 |
+ return ((REQUEST_METADATA_HEADER_NAME,
|
|
232 |
+ request_metadata.SerializeToString()),)
|
... | ... | @@ -62,7 +62,7 @@ class ExecutionInstance: |
62 | 62 |
return get_hash_type()
|
63 | 63 |
|
64 | 64 |
def execute(self, action_digest, skip_cache_lookup):
|
65 |
- """ Sends a job for execution.
|
|
65 |
+ """Sends a job for execution.
|
|
66 | 66 |
Queues an action and creates an Operation instance to be associated with
|
67 | 67 |
this action.
|
68 | 68 |
"""
|
... | ... | @@ -27,9 +27,11 @@ from functools import partial |
27 | 27 |
import grpc
|
28 | 28 |
|
29 | 29 |
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
|
30 |
-from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
|
30 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
|
31 | 31 |
from buildgrid._protos.google.longrunning import operations_pb2
|
32 |
+from buildgrid.server.peer import Peer
|
|
32 | 33 |
from buildgrid.server._authentication import AuthContext, authorize
|
34 |
+from buildgrid.settings import REQUEST_METADATA_HEADER_NAME
|
|
33 | 35 |
|
34 | 36 |
|
35 | 37 |
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
|
... | ... | @@ -94,7 +96,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
94 | 96 |
|
95 | 97 |
instance_name = request.instance_name
|
96 | 98 |
message_queue = queue.Queue()
|
97 |
- peer = context.peer()
|
|
99 |
+ peer_id = context.peer()
|
|
100 |
+ |
|
101 |
+ request_metadata = self._context_extract_request_metadata(context)
|
|
102 |
+ peer = Peer(peer_id=peer_id, request_metadata=request_metadata)
|
|
98 | 103 |
|
99 | 104 |
try:
|
100 | 105 |
instance = self._get_instance(instance_name)
|
... | ... | @@ -102,8 +107,8 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
102 | 107 |
job_name = instance.execute(request.action_digest,
|
103 | 108 |
request.skip_cache_lookup)
|
104 | 109 |
|
105 |
- operation_name = instance.register_job_peer(job_name,
|
|
106 |
- peer, message_queue)
|
|
110 |
+ operation_name = instance.register_job_peer(job_name, peer,
|
|
111 |
+ message_queue)
|
|
107 | 112 |
|
108 | 113 |
context.add_callback(partial(self._rpc_termination_callback,
|
109 | 114 |
peer, instance_name, operation_name))
|
... | ... | @@ -161,8 +166,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
161 | 166 |
try:
|
162 | 167 |
instance = self._get_instance(instance_name)
|
163 | 168 |
|
164 |
- instance.register_operation_peer(operation_name,
|
|
165 |
- peer, message_queue)
|
|
169 |
+ instance.register_operation_peer(operation_name, peer, message_queue)
|
|
166 | 170 |
|
167 | 171 |
context.add_callback(partial(self._rpc_termination_callback,
|
168 | 172 |
peer, instance_name, operation_name))
|
... | ... | @@ -231,3 +235,21 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
231 | 235 |
|
232 | 236 |
except KeyError:
|
233 | 237 |
raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))
|
238 |
+ |
|
239 |
+ @classmethod
|
|
240 |
+ def _context_extract_request_metadata(cls, context):
|
|
241 |
+ """Given a context object, extract the RequestMetadata header
|
|
242 |
+ values if they are present. If they were not provided,
|
|
243 |
+ returns None.
|
|
244 |
+ """
|
|
245 |
+ invocation_metadata = context.invocation_metadata()
|
|
246 |
+ request_metadata_entry = next((entry for entry in invocation_metadata
|
|
247 |
+ if entry.key == REQUEST_METADATA_HEADER_NAME),
|
|
248 |
+ None)
|
|
249 |
+ if not request_metadata_entry:
|
|
250 |
+ return None
|
|
251 |
+ |
|
252 |
+ request_metadata = remote_execution_pb2.RequestMetadata()
|
|
253 |
+ request_metadata.ParseFromString(request_metadata_entry.value)
|
|
254 |
+ |
|
255 |
+ return request_metadata
|
... | ... | @@ -12,9 +12,9 @@ |
12 | 12 |
# See the License for the specific language governing permissions and
|
13 | 13 |
# limitations under the License.
|
14 | 14 |
|
15 |
- |
|
16 | 15 |
from datetime import datetime
|
17 | 16 |
import logging
|
17 |
+from threading import Lock
|
|
18 | 18 |
import uuid
|
19 | 19 |
|
20 | 20 |
from google.protobuf import duration_pb2, timestamp_pb2
|
... | ... | @@ -62,6 +62,9 @@ class Job: |
62 | 62 |
self._platform_requirements = platform_requirements \
|
63 | 63 |
if platform_requirements else dict()
|
64 | 64 |
|
65 |
+ self.__peers_lock = Lock()
|
|
66 |
+ self.__peers = set()
|
|
67 |
+ |
|
65 | 68 |
self._done = False
|
66 | 69 |
|
67 | 70 |
def __lt__(self, other):
|
... | ... | @@ -175,7 +178,7 @@ class Job: |
175 | 178 |
"""Subscribes to a new job's :class:`Operation` stage changes.
|
176 | 179 |
|
177 | 180 |
Args:
|
178 |
- peer (str): a unique string identifying the client.
|
|
181 |
+ peer (Peer): an object that represents the client.
|
|
179 | 182 |
message_queue (queue.Queue): the event queue to register.
|
180 | 183 |
|
181 | 184 |
Returns:
|
... | ... | @@ -194,10 +197,13 @@ class Job: |
194 | 197 |
self._name, new_operation.name)
|
195 | 198 |
|
196 | 199 |
self.__operations_by_name[new_operation.name] = new_operation
|
197 |
- self.__operations_by_peer[peer] = new_operation
|
|
198 |
- self.__operations_message_queues[peer] = message_queue
|
|
200 |
+ self.__operations_by_peer[peer.peer_id] = new_operation
|
|
201 |
+ self.__operations_message_queues[peer.peer_id] = message_queue
|
|
202 |
+ |
|
203 |
+ self._send_operations_updates(peers=[peer.peer_id])
|
|
199 | 204 |
|
200 |
- self._send_operations_updates(peers=[peer])
|
|
205 |
+ with self.__peers_lock:
|
|
206 |
+ self.__peers.add(peer)
|
|
201 | 207 |
|
202 | 208 |
return new_operation.name
|
203 | 209 |
|
... | ... | @@ -206,7 +212,7 @@ class Job: |
206 | 212 |
|
207 | 213 |
Args:
|
208 | 214 |
operation_name (str): an existing operation's name to subscribe to.
|
209 |
- peer (str): a unique string identifying the client.
|
|
215 |
+ peer (Peer): an object that represents the client.
|
|
210 | 216 |
message_queue (queue.Queue): the event queue to register.
|
211 | 217 |
|
212 | 218 |
Returns:
|
... | ... | @@ -222,18 +228,20 @@ class Job: |
222 | 228 |
raise NotFoundError("Operation name does not exist: [{}]"
|
223 | 229 |
.format(operation_name))
|
224 | 230 |
|
225 |
- self.__operations_by_peer[peer] = operation
|
|
226 |
- self.__operations_message_queues[peer] = message_queue
|
|
231 |
+ self.__operations_by_peer[peer.peer_id] = operation
|
|
232 |
+ self.__operations_message_queues[peer.peer_id] = message_queue
|
|
227 | 233 |
|
228 |
- self._send_operations_updates(peers=[peer])
|
|
234 |
+ self._send_operations_updates(peers=[peer.peer_id])
|
|
235 |
+ |
|
236 |
+ with self.__peers_lock:
|
|
237 |
+ self.__peers.add(peer)
|
|
229 | 238 |
|
230 | 239 |
def unregister_operation_peer(self, operation_name, peer):
|
231 | 240 |
"""Unsubscribes to the job's :class:`Operation` stage change.
|
232 | 241 |
|
233 | 242 |
Args:
|
234 | 243 |
operation_name (str): an existing operation's name to unsubscribe from.
|
235 |
- peer (str): a unique string identifying the client.
|
|
236 |
- |
|
244 |
+ peer (Peer): an object that represents the client.
|
|
237 | 245 |
Raises:
|
238 | 246 |
NotFoundError: If no operation with `operation_name` exists.
|
239 | 247 |
"""
|
... | ... | @@ -244,10 +252,10 @@ class Job: |
244 | 252 |
raise NotFoundError("Operation name does not exist: [{}]"
|
245 | 253 |
.format(operation_name))
|
246 | 254 |
|
247 |
- if peer in self.__operations_message_queues:
|
|
248 |
- del self.__operations_message_queues[peer]
|
|
255 |
+ if peer.peer_id in self.__operations_message_queues:
|
|
256 |
+ del self.__operations_message_queues[peer.peer_id]
|
|
249 | 257 |
|
250 |
- del self.__operations_by_peer[peer]
|
|
258 |
+ del self.__operations_by_peer[peer.peer_id]
|
|
251 | 259 |
|
252 | 260 |
# Drop the operation if nobody is watching it anymore:
|
253 | 261 |
if operation not in self.__operations_by_peer.values():
|
... | ... | @@ -258,6 +266,9 @@ class Job: |
258 | 266 |
self.__logger.debug("Operation deleted for job [%s]: [%s]",
|
259 | 267 |
self._name, operation.name)
|
260 | 268 |
|
269 |
+ with self.__peers_lock:
|
|
270 |
+ self.__peers.remove(peer)
|
|
271 |
+ |
|
261 | 272 |
def list_operations(self):
|
262 | 273 |
"""Lists the :class:`Operation` related to a job.
|
263 | 274 |
|
1 |
+# Copyright (C) 2019 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+ |
|
16 |
+class Peer:
|
|
17 |
+ """Represents a client during a session."""
|
|
18 |
+ def __init__(self, peer_id, token=None, request_metadata=None):
|
|
19 |
+ self._id = peer_id # This uniquely identifies a client
|
|
20 |
+ self._token = token
|
|
21 |
+ |
|
22 |
+ self.__request_metadata = request_metadata
|
|
23 |
+ |
|
24 |
+ def __eq__(self, other):
|
|
25 |
+ if isinstance(other, Peer):
|
|
26 |
+ return self.peer_id == other.peer_id
|
|
27 |
+ return False
|
|
28 |
+ |
|
29 |
+ def __hash__(self):
|
|
30 |
+ return hash(self.peer_id) # This string is unique for each peer
|
|
31 |
+ |
|
32 |
+ @property
|
|
33 |
+ def peer_id(self):
|
|
34 |
+ return self._id
|
|
35 |
+ |
|
36 |
+ @property
|
|
37 |
+ def token(self):
|
|
38 |
+ return self._token
|
|
39 |
+ |
|
40 |
+ # -- `RequestMetadata` optional values (attached to the Execute() call) --
|
|
41 |
+ @property
|
|
42 |
+ def request_metadata(self):
|
|
43 |
+ return self.__request_metadata
|
|
44 |
+ |
|
45 |
+ @property
|
|
46 |
+ def tool_name(self):
|
|
47 |
+ if self.__request_metadata and self.__request_metadata.tool_details:
|
|
48 |
+ return self.__request_metadata.tool_details.tool_name
|
|
49 |
+ return None
|
|
50 |
+ |
|
51 |
+ def tool_version(self):
|
|
52 |
+ if self.__request_metadata and self.__request_metadata.tool_details:
|
|
53 |
+ return self.__request_metadata.tool_details.tool_version
|
|
54 |
+ return None
|
|
55 |
+ |
|
56 |
+ @property
|
|
57 |
+ def action_id(self):
|
|
58 |
+ if self.__request_metadata:
|
|
59 |
+ return self.__request_metadata.action_id
|
|
60 |
+ return None
|
|
61 |
+ |
|
62 |
+ @property
|
|
63 |
+ def tool_invocation_id(self):
|
|
64 |
+ if self.__request_metadata:
|
|
65 |
+ return self.__request_metadata.tool_invocation_id
|
|
66 |
+ return None
|
|
67 |
+ |
|
68 |
+ @property
|
|
69 |
+ def correlated_invocations_id(self):
|
|
70 |
+ if self.__request_metadata:
|
|
71 |
+ return self.__request_metadata.correlated_invocations_id
|
|
72 |
+ return None
|
... | ... | @@ -54,9 +54,8 @@ class Scheduler: |
54 | 54 |
|
55 | 55 |
self.__queue = []
|
56 | 56 |
|
57 |
- self._is_instrumented = monitor
|
|
58 |
- |
|
59 |
- if self._is_instrumented:
|
|
57 |
+ self._is_instrumented = False
|
|
58 |
+ if monitor:
|
|
60 | 59 |
self.activate_monitoring()
|
61 | 60 |
|
62 | 61 |
# --- Public API ---
|
... | ... | @@ -87,7 +86,7 @@ class Scheduler: |
87 | 86 |
|
88 | 87 |
Args:
|
89 | 88 |
job_name (str): name of the job to subscribe to.
|
90 |
- peer (str): a unique string identifying the client.
|
|
89 |
+ peer (Peer): object that represents the client
|
|
91 | 90 |
message_queue (queue.Queue): the event queue to register.
|
92 | 91 |
|
93 | 92 |
Returns:
|
... | ... | @@ -114,7 +113,7 @@ class Scheduler: |
114 | 113 |
|
115 | 114 |
Args:
|
116 | 115 |
operation_name (str): name of the operation to subscribe to.
|
117 |
- peer (str): a unique string identifying the client.
|
|
116 |
+ peer (Peer): an object that represents the client.
|
|
118 | 117 |
message_queue (queue.Queue): the event queue to register.
|
119 | 118 |
|
120 | 119 |
Returns:
|
... | ... | @@ -137,7 +136,7 @@ class Scheduler: |
137 | 136 |
|
138 | 137 |
Args:
|
139 | 138 |
operation_name (str): name of the operation to unsubscribe from.
|
140 |
- peer (str): a unique string identifying the client.
|
|
139 |
+ peer (Peer): object that represents the client
|
|
141 | 140 |
|
142 | 141 |
Raises:
|
143 | 142 |
NotFoundError: If no operation with `operation_name` exists.
|
... | ... | @@ -43,3 +43,13 @@ BROWSER_URL_FORMAT = '%(type)s/%(instance)s/%(hash)s/%(sizebytes)s/' |
43 | 43 |
# type - Type of CAS object, eg. 'action_result', 'command'...
|
44 | 44 |
# hash - Object's digest hash.
|
45 | 45 |
# sizebytes - Object's digest size in bytes.
|
46 |
+ |
|
47 |
+ |
|
48 |
+# Name of the header key to attach optional `RequestMetadata`values.
|
|
49 |
+# (This is defined in the REAPI specification.)
|
|
50 |
+REQUEST_METADATA_HEADER_NAME = 'requestmetadata-bin'
|
|
51 |
+ |
|
52 |
+# 'RequestMetadata' header values. These values will be used when
|
|
53 |
+# attaching optional metadata to a gRPC request's header:
|
|
54 |
+REQUEST_METADATA_TOOL_NAME = 'BuildGrid'
|
|
55 |
+REQUEST_METADATA_TOOL_VERSION = '0.1'
|