Santiago Gil pushed to branch santigl/106-request-metadata at BuildGrid / buildgrid
Commits:
-
e5d0a4d6
by Santiago Gil at 2019-02-27T15:18:35Z
5 changed files:
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- + buildgrid/server/peer.py
- + buildgrid/server/requestmetadata.py
- buildgrid/server/scheduler.py
Changes:
... | ... | @@ -30,6 +30,8 @@ from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, |
30 | 30 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
31 | 31 |
from buildgrid._protos.google.longrunning import operations_pb2
|
32 | 32 |
from buildgrid.server._authentication import AuthContext, authorize
|
33 |
+from buildgrid.server.peer import Peer
|
|
34 |
+from buildgrid.server.requestmetadata import context_extract_request_metadata
|
|
33 | 35 |
|
34 | 36 |
|
35 | 37 |
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
|
... | ... | @@ -94,7 +96,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
94 | 96 |
|
95 | 97 |
instance_name = request.instance_name
|
96 | 98 |
message_queue = queue.Queue()
|
97 |
- peer = context.peer()
|
|
99 |
+ peer_id = context.peer()
|
|
100 |
+ |
|
101 |
+ request_metadata = context_extract_request_metadata(context)
|
|
102 |
+ peer = Peer(uid=peer_id, request_metadata=request_metadata)
|
|
98 | 103 |
|
99 | 104 |
try:
|
100 | 105 |
instance = self._get_instance(instance_name)
|
... | ... | @@ -102,8 +107,8 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
102 | 107 |
job_name = instance.execute(request.action_digest,
|
103 | 108 |
request.skip_cache_lookup)
|
104 | 109 |
|
105 |
- operation_name = instance.register_job_peer(job_name,
|
|
106 |
- peer, message_queue)
|
|
110 |
+ operation_name = instance.register_job_peer(job_name, peer,
|
|
111 |
+ message_queue)
|
|
107 | 112 |
|
108 | 113 |
context.add_callback(partial(self._rpc_termination_callback,
|
109 | 114 |
peer, instance_name, operation_name))
|
... | ... | @@ -161,8 +166,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
161 | 166 |
try:
|
162 | 167 |
instance = self._get_instance(instance_name)
|
163 | 168 |
|
164 |
- instance.register_operation_peer(operation_name,
|
|
165 |
- peer, message_queue)
|
|
169 |
+ instance.register_operation_peer(operation_name, peer, message_queue)
|
|
166 | 170 |
|
167 | 171 |
context.add_callback(partial(self._rpc_termination_callback,
|
168 | 172 |
peer, instance_name, operation_name))
|
... | ... | @@ -12,9 +12,9 @@ |
12 | 12 |
# See the License for the specific language governing permissions and
|
13 | 13 |
# limitations under the License.
|
14 | 14 |
|
15 |
- |
|
16 | 15 |
from datetime import datetime
|
17 | 16 |
import logging
|
17 |
+from threading import Lock
|
|
18 | 18 |
import uuid
|
19 | 19 |
|
20 | 20 |
from google.protobuf import duration_pb2, timestamp_pb2
|
... | ... | @@ -175,7 +175,7 @@ class Job: |
175 | 175 |
"""Subscribes to a new job's :class:`Operation` stage changes.
|
176 | 176 |
|
177 | 177 |
Args:
|
178 |
- peer (str): a unique string identifying the client.
|
|
178 |
+ peer (Peer): an object that represents the client.
|
|
179 | 179 |
message_queue (queue.Queue): the event queue to register.
|
180 | 180 |
|
181 | 181 |
Returns:
|
... | ... | @@ -194,10 +194,10 @@ class Job: |
194 | 194 |
self._name, new_operation.name)
|
195 | 195 |
|
196 | 196 |
self.__operations_by_name[new_operation.name] = new_operation
|
197 |
- self.__operations_by_peer[peer] = new_operation
|
|
198 |
- self.__operations_message_queues[peer] = message_queue
|
|
197 |
+ self.__operations_by_peer[peer.uid] = new_operation
|
|
198 |
+ self.__operations_message_queues[peer.uid] = message_queue
|
|
199 | 199 |
|
200 |
- self._send_operations_updates(peers=[peer])
|
|
200 |
+ self._send_operations_updates(peers=[peer.uid])
|
|
201 | 201 |
|
202 | 202 |
return new_operation.name
|
203 | 203 |
|
... | ... | @@ -206,7 +206,7 @@ class Job: |
206 | 206 |
|
207 | 207 |
Args:
|
208 | 208 |
operation_name (str): an existing operation's name to subscribe to.
|
209 |
- peer (str): a unique string identifying the client.
|
|
209 |
+ peer (Peer): an object that represents the client.
|
|
210 | 210 |
message_queue (queue.Queue): the event queue to register.
|
211 | 211 |
|
212 | 212 |
Returns:
|
... | ... | @@ -222,18 +222,17 @@ class Job: |
222 | 222 |
raise NotFoundError("Operation name does not exist: [{}]"
|
223 | 223 |
.format(operation_name))
|
224 | 224 |
|
225 |
- self.__operations_by_peer[peer] = operation
|
|
226 |
- self.__operations_message_queues[peer] = message_queue
|
|
225 |
+ self.__operations_by_peer[peer.uid] = operation
|
|
226 |
+ self.__operations_message_queues[peer.uid] = message_queue
|
|
227 | 227 |
|
228 |
- self._send_operations_updates(peers=[peer])
|
|
228 |
+ self._send_operations_updates(peers=[peer.uid])
|
|
229 | 229 |
|
230 | 230 |
def unregister_operation_peer(self, operation_name, peer):
|
231 | 231 |
"""Unsubscribes to the job's :class:`Operation` stage change.
|
232 | 232 |
|
233 | 233 |
Args:
|
234 | 234 |
operation_name (str): an existing operation's name to unsubscribe from.
|
235 |
- peer (str): a unique string identifying the client.
|
|
236 |
- |
|
235 |
+ peer (Peer): an object that represents the client.
|
|
237 | 236 |
Raises:
|
238 | 237 |
NotFoundError: If no operation with `operation_name` exists.
|
239 | 238 |
"""
|
... | ... | @@ -244,10 +243,10 @@ class Job: |
244 | 243 |
raise NotFoundError("Operation name does not exist: [{}]"
|
245 | 244 |
.format(operation_name))
|
246 | 245 |
|
247 |
- if peer in self.__operations_message_queues:
|
|
248 |
- del self.__operations_message_queues[peer]
|
|
246 |
+ if peer.uid in self.__operations_message_queues:
|
|
247 |
+ del self.__operations_message_queues[peer.uid]
|
|
249 | 248 |
|
250 |
- del self.__operations_by_peer[peer]
|
|
249 |
+ del self.__operations_by_peer[peer.uid]
|
|
251 | 250 |
|
252 | 251 |
# Drop the operation if nobody is watching it anymore:
|
253 | 252 |
if operation not in self.__operations_by_peer.values():
|
1 |
+# Copyright (C) 2019 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+from collections import defaultdict
|
|
16 |
+import logging
|
|
17 |
+from threading import Lock
|
|
18 |
+ |
|
19 |
+ |
|
20 |
+class Peer:
|
|
21 |
+ """Represents a client during a session."""
|
|
22 |
+ |
|
23 |
+ # We will keep a global list of Peers indexed by their `Peer.uid`
|
|
24 |
+ # and a reference counter so that we can clean `Peer`s that do not
|
|
25 |
+ # exist anymore. Both are protected by the same lock:
|
|
26 |
+ __peers_by_uid = {}
|
|
27 |
+ __peers_ref_count = defaultdict(int)
|
|
28 |
+ __peers_lock = Lock()
|
|
29 |
+ |
|
30 |
+ def __init__(self, uid, token=None, request_metadata=None):
|
|
31 |
+ self._uid = uid # This uniquely identifies a client
|
|
32 |
+ self._token = token
|
|
33 |
+ |
|
34 |
+ self.__request_metadata = request_metadata
|
|
35 |
+ |
|
36 |
+ self.__logger = logging.getLogger(__name__)
|
|
37 |
+ |
|
38 |
+ with self.__peers_lock:
|
|
39 |
+ if self._uid in self.__peers_by_uid and \
|
|
40 |
+ self.__peers_by_uid[self._uid] != self:
|
|
41 |
+ self.__logger.debug('Creating another '
|
|
42 |
+ 'instance of Peer with uid {} '
|
|
43 |
+ 'with different attributes', self._uid)
|
|
44 |
+ |
|
45 |
+ self.__peers_by_uid[self._uid] = self
|
|
46 |
+ self.__peers_ref_count[self._uid] += 1
|
|
47 |
+ |
|
48 |
+ def __del__(self):
|
|
49 |
+ with self.__peers_lock:
|
|
50 |
+ assert self.uid in self.__peers_by_uid
|
|
51 |
+ assert self.__peers_by_uid[self.uid] > 0
|
|
52 |
+ |
|
53 |
+ self.__peers_ref_count[self.uid] -= 1
|
|
54 |
+ |
|
55 |
+ if self.__peers_ref_count[self.uid] < 1:
|
|
56 |
+ del self.__peers_by_uid
|
|
57 |
+ del self.__peers_ref_count[self.uid]
|
|
58 |
+ |
|
59 |
+ def __eq__(self, other):
|
|
60 |
+ if not isinstance(other, Peer):
|
|
61 |
+ return False
|
|
62 |
+ |
|
63 |
+ return self.uid == other.uid and self.token == other.token and \
|
|
64 |
+ self.request_metadata == other.request_metadata
|
|
65 |
+ |
|
66 |
+ def __hash__(self):
|
|
67 |
+ return hash(self.uid) # This string is unique for each peer
|
|
68 |
+ |
|
69 |
+ @property
|
|
70 |
+ def uid(self):
|
|
71 |
+ return self._uid
|
|
72 |
+ |
|
73 |
+ @property
|
|
74 |
+ def token(self):
|
|
75 |
+ return self._token
|
|
76 |
+ |
|
77 |
+ # -- `RequestMetadata` optional values (attached to the Execute() call) --
|
|
78 |
+ @property
|
|
79 |
+ def request_metadata(self):
|
|
80 |
+ return self.__request_metadata
|
|
81 |
+ |
|
82 |
+ @property
|
|
83 |
+ def tool_name(self):
|
|
84 |
+ if self.__request_metadata and self.__request_metadata.tool_details:
|
|
85 |
+ return self.__request_metadata.tool_details.tool_name
|
|
86 |
+ return None
|
|
87 |
+ |
|
88 |
+ def tool_version(self):
|
|
89 |
+ if self.__request_metadata and self.__request_metadata.tool_details:
|
|
90 |
+ return self.__request_metadata.tool_details.tool_version
|
|
91 |
+ return None
|
|
92 |
+ |
|
93 |
+ @property
|
|
94 |
+ def action_id(self):
|
|
95 |
+ if self.__request_metadata:
|
|
96 |
+ return self.__request_metadata.action_id
|
|
97 |
+ return None
|
|
98 |
+ |
|
99 |
+ @property
|
|
100 |
+ def tool_invocation_id(self):
|
|
101 |
+ if self.__request_metadata:
|
|
102 |
+ return self.__request_metadata.tool_invocation_id
|
|
103 |
+ return None
|
|
104 |
+ |
|
105 |
+ @property
|
|
106 |
+ def correlated_invocations_id(self):
|
|
107 |
+ if self.__request_metadata:
|
|
108 |
+ return self.__request_metadata.correlated_invocations_id
|
|
109 |
+ return None
|
1 |
+# Copyright (C) 2019 Bloomberg LP
|
|
2 |
+#
|
|
3 |
+# Licensed under the Apache License, Version 2.0 (the "License");
|
|
4 |
+# you may not use this file except in compliance with the License.
|
|
5 |
+# You may obtain a copy of the License at
|
|
6 |
+#
|
|
7 |
+# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
8 |
+#
|
|
9 |
+# Unless required by applicable law or agreed to in writing, software
|
|
10 |
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
11 |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
12 |
+# See the License for the specific language governing permissions and
|
|
13 |
+# limitations under the License.
|
|
14 |
+ |
|
15 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
16 |
+from buildgrid.settings import REQUEST_METADATA_HEADER_NAME
|
|
17 |
+ |
|
18 |
+ |
|
19 |
+def context_extract_request_metadata(context):
|
|
20 |
+ """Given a `grpc.ServicerContext` object, extract the RequestMetadata
|
|
21 |
+ header values if they are present. If they were not provided,
|
|
22 |
+ returns None.
|
|
23 |
+ |
|
24 |
+ Args:
|
|
25 |
+ context (grpc.ServicerContext): Context for a RPC call.
|
|
26 |
+ |
|
27 |
+ Returns:
|
|
28 |
+ A `RequestMetadata` proto if RequestMetadata values are present,
|
|
29 |
+ otherwise None.
|
|
30 |
+ """
|
|
31 |
+ invocation_metadata = context.invocation_metadata()
|
|
32 |
+ request_metadata_entry = next((entry for entry in invocation_metadata
|
|
33 |
+ if entry.key == REQUEST_METADATA_HEADER_NAME),
|
|
34 |
+ None)
|
|
35 |
+ if not request_metadata_entry:
|
|
36 |
+ return None
|
|
37 |
+ |
|
38 |
+ request_metadata = remote_execution_pb2.RequestMetadata()
|
|
39 |
+ request_metadata.ParseFromString(request_metadata_entry.value)
|
|
40 |
+ |
|
41 |
+ return request_metadata
|
... | ... | @@ -54,9 +54,8 @@ class Scheduler: |
54 | 54 |
|
55 | 55 |
self.__queue = []
|
56 | 56 |
|
57 |
- self._is_instrumented = monitor
|
|
58 |
- |
|
59 |
- if self._is_instrumented:
|
|
57 |
+ self._is_instrumented = False
|
|
58 |
+ if monitor:
|
|
60 | 59 |
self.activate_monitoring()
|
61 | 60 |
|
62 | 61 |
# --- Public API ---
|
... | ... | @@ -87,7 +86,7 @@ class Scheduler: |
87 | 86 |
|
88 | 87 |
Args:
|
89 | 88 |
job_name (str): name of the job to subscribe to.
|
90 |
- peer (str): a unique string identifying the client.
|
|
89 |
+ peer (Peer): object that represents the client
|
|
91 | 90 |
message_queue (queue.Queue): the event queue to register.
|
92 | 91 |
|
93 | 92 |
Returns:
|
... | ... | @@ -114,7 +113,7 @@ class Scheduler: |
114 | 113 |
|
115 | 114 |
Args:
|
116 | 115 |
operation_name (str): name of the operation to subscribe to.
|
117 |
- peer (str): a unique string identifying the client.
|
|
116 |
+ peer (Peer): an object that represents the client.
|
|
118 | 117 |
message_queue (queue.Queue): the event queue to register.
|
119 | 118 |
|
120 | 119 |
Returns:
|
... | ... | @@ -137,7 +136,7 @@ class Scheduler: |
137 | 136 |
|
138 | 137 |
Args:
|
139 | 138 |
operation_name (str): name of the operation to unsubscribe from.
|
140 |
- peer (str): a unique string identifying the client.
|
|
139 |
+ peer (Peer): object that represents the client
|
|
141 | 140 |
|
142 | 141 |
Raises:
|
143 | 142 |
NotFoundError: If no operation with `operation_name` exists.
|