Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid
Commits:
-
daab6b01
by Martin Blanchard at 2018-10-30T17:07:58Z
6 changed files:
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/operations/service.py
- buildgrid/server/scheduler.py
Changes:
... | ... | @@ -21,7 +21,7 @@ An instance of the Remote Execution Service. |
21 | 21 |
|
22 | 22 |
import logging
|
23 | 23 |
|
24 |
-from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
|
24 |
+from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
|
|
25 | 25 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
|
26 | 26 |
|
27 | 27 |
|
... | ... | @@ -35,7 +35,7 @@ class ExecutionInstance: |
35 | 35 |
def register_instance_with_server(self, instance_name, server):
|
36 | 36 |
server.add_execution_instance(self, instance_name)
|
37 | 37 |
|
38 |
- def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
|
38 |
+ def execute(self, action_digest, skip_cache_lookup, peer=None, message_queue=None):
|
|
39 | 39 |
""" Sends a job for execution.
|
40 | 40 |
Queues an action and creates an Operation instance to be associated with
|
41 | 41 |
this action.
|
... | ... | @@ -48,24 +48,24 @@ class ExecutionInstance: |
48 | 48 |
|
49 | 49 |
job = self._scheduler.queue_job(action, action_digest, skip_cache_lookup)
|
50 | 50 |
|
51 |
- if message_queue is not None:
|
|
52 |
- job.register_client(message_queue)
|
|
51 |
+ if peer is not None and message_queue is not None:
|
|
52 |
+ job.register_client(peer, message_queue)
|
|
53 | 53 |
|
54 | 54 |
return job.operation
|
55 | 55 |
|
56 |
- def register_message_client(self, name, queue):
|
|
56 |
+ def register_message_client(self, job_name, peer, message_queue):
|
|
57 | 57 |
try:
|
58 |
- self._scheduler.register_client(name, queue)
|
|
58 |
+ self._scheduler.register_client(job_name, peer, message_queue)
|
|
59 | 59 |
|
60 |
- except KeyError:
|
|
61 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
60 |
+ except NotFoundError:
|
|
61 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
|
62 | 62 |
|
63 |
- def unregister_message_client(self, name, queue):
|
|
63 |
+ def unregister_message_client(self, job_name, peer):
|
|
64 | 64 |
try:
|
65 |
- self._scheduler.unregister_client(name, queue)
|
|
65 |
+ self._scheduler.unregister_client(job_name, peer)
|
|
66 | 66 |
|
67 |
- except KeyError:
|
|
68 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
67 |
+ except NotFoundError:
|
|
68 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
|
69 | 69 |
|
70 | 70 |
def stream_operation_updates(self, message_queue, operation_name):
|
71 | 71 |
operation = message_queue.get()
|
... | ... | @@ -47,10 +47,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
47 | 47 |
instance = self._get_instance(request.instance_name)
|
48 | 48 |
operation = instance.execute(request.action_digest,
|
49 | 49 |
request.skip_cache_lookup,
|
50 |
- message_queue)
|
|
50 |
+ peer=context.peer(),
|
|
51 |
+ message_queue=message_queue)
|
|
51 | 52 |
|
52 | 53 |
context.add_callback(partial(instance.unregister_message_client,
|
53 |
- operation.name, message_queue))
|
|
54 |
+ operation.name, context.peer()))
|
|
54 | 55 |
|
55 | 56 |
instanced_op_name = "{}/{}".format(request.instance_name,
|
56 | 57 |
operation.name)
|
... | ... | @@ -88,10 +89,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
88 | 89 |
operation_name = names[-1]
|
89 | 90 |
instance = self._get_instance(instance_name)
|
90 | 91 |
|
91 |
- instance.register_message_client(operation_name, message_queue)
|
|
92 |
+ instance.register_message_client(operation_name,
|
|
93 |
+ context.peer(), message_queue)
|
|
92 | 94 |
|
93 | 95 |
context.add_callback(partial(instance.unregister_message_client,
|
94 |
- operation_name, message_queue))
|
|
96 |
+ operation_name, context.peer()))
|
|
95 | 97 |
|
96 | 98 |
for operation in instance.stream_operation_updates(message_queue,
|
97 | 99 |
operation_name):
|
... | ... | @@ -46,7 +46,7 @@ class Job: |
46 | 46 |
|
47 | 47 |
self._action.CopyFrom(action)
|
48 | 48 |
self._do_not_cache = self._action.do_not_cache
|
49 |
- self._operation_update_queues = []
|
|
49 |
+ self._operation_update_queues = {}
|
|
50 | 50 |
self._operation.name = self._name
|
51 | 51 |
self._operation.done = False
|
52 | 52 |
self._n_tries = 0
|
... | ... | @@ -113,22 +113,26 @@ class Job: |
113 | 113 |
def __ne__(self, other):
|
114 | 114 |
return not self.__eq__(other)
|
115 | 115 |
|
116 |
- def register_client(self, queue):
|
|
117 |
- """Subscribes to the job's :class:`Operation` stage change events.
|
|
116 |
+ def register_client(self, peer, message_queue):
|
|
117 |
+ """Subscribes to the job's :class:`Operation` stage changes.
|
|
118 | 118 |
|
119 | 119 |
Args:
|
120 |
- queue (queue.Queue): the event queue to register.
|
|
120 |
+ peer (str): a unique string identifying the client.
|
|
121 |
+ message_queue (queue.Queue): the event queue to register.
|
|
121 | 122 |
"""
|
122 |
- self._operation_update_queues.append(queue)
|
|
123 |
- queue.put(self._operation)
|
|
123 |
+ if peer not in self._operation_update_queues:
|
|
124 |
+ self._operation_update_queues[peer] = message_queue
|
|
124 | 125 |
|
125 |
- def unregister_client(self, queue):
|
|
126 |
- """Unsubscribes to the job's :class:`Operation` stage change events.
|
|
126 |
+ message_queue.put(self._operation)
|
|
127 |
+ |
|
128 |
+ def unregister_client(self, peer):
|
|
129 |
+ """Unsubscribes to the job's :class:`Operation` stage change.
|
|
127 | 130 |
|
128 | 131 |
Args:
|
129 |
- queue (queue.Queue): the event queue to unregister.
|
|
132 |
+ peer (str): a unique string identifying the client.
|
|
130 | 133 |
"""
|
131 |
- self._operation_update_queues.remove(queue)
|
|
134 |
+ if peer not in self._operation_update_queues:
|
|
135 |
+ del self._operation_update_queues[peer]
|
|
132 | 136 |
|
133 | 137 |
def set_cached_result(self, action_result):
|
134 | 138 |
"""Allows specifying an action result form the action cache for the job.
|
... | ... | @@ -224,5 +228,5 @@ class Job: |
224 | 228 |
|
225 | 229 |
self._operation.metadata.Pack(self.__operation_metadata)
|
226 | 230 |
|
227 |
- for queue in self._operation_update_queues:
|
|
231 |
+ for queue in self._operation_update_queues.values():
|
|
228 | 232 |
queue.put(self._operation)
|
... | ... | @@ -57,13 +57,12 @@ class OperationsInstance: |
57 | 57 |
|
58 | 58 |
return response
|
59 | 59 |
|
60 |
- def delete_operation(self, name):
|
|
60 |
+ def delete_operation(self, job_name, peer):
|
|
61 | 61 |
try:
|
62 |
- # TODO: Unregister the caller client
|
|
63 |
- pass
|
|
62 |
+ self._scheduler.unregister_client(job_name, peer)
|
|
64 | 63 |
|
65 | 64 |
except NotFoundError:
|
66 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
65 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
|
67 | 66 |
|
68 | 67 |
def register_message_client(self, job_name, peer, message_queue):
|
69 | 68 |
try:
|
... | ... | @@ -93,7 +93,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
93 | 93 |
instance = self._get_instance(instance_name)
|
94 | 94 |
|
95 | 95 |
operation_name = self._parse_operation_name(name)
|
96 |
- instance.delete_operation(operation_name)
|
|
96 |
+ instance.delete_operation(operation_name, context.peer())
|
|
97 | 97 |
|
98 | 98 |
except InvalidArgumentError as e:
|
99 | 99 |
self.logger.error(e)
|
... | ... | @@ -37,12 +37,40 @@ class Scheduler: |
37 | 37 |
self.__jobs_by_name = {}
|
38 | 38 |
self.__queue = deque()
|
39 | 39 |
|
40 |
- def register_client(self, job_name, queue):
|
|
41 |
- self.__jobs_by_name[job_name].register_client(queue)
|
|
40 |
+ def register_client(self, job_name, peer, message_queue):
|
|
41 |
+ """Subscribes to one of the job's :class:`Operation` stage changes.
|
|
42 | 42 |
|
43 |
- def unregister_client(self, job_name, queue):
|
|
44 |
- job = self.__jobs_by_name[job_name]
|
|
45 |
- job.unregister_client(queue)
|
|
43 |
+ Args:
|
|
44 |
+ job_name (str): name of the job subscribe to.
|
|
45 |
+ peer (str): a unique string identifying the client.
|
|
46 |
+ message_queue (queue.Queue): the event queue to register.
|
|
47 |
+ |
|
48 |
+ Raises:
|
|
49 |
+ NotFoundError: If no job with `job_name` exists.
|
|
50 |
+ """
|
|
51 |
+ try:
|
|
52 |
+ job = self.__jobs_by_name[job_name]
|
|
53 |
+ except KeyError:
|
|
54 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
55 |
+ |
|
56 |
+ job.register_client(peer, message_queue)
|
|
57 |
+ |
|
58 |
+ def unregister_client(self, job_name, peer):
|
|
59 |
+ """Unsubscribes to one of the job's :class:`Operation` stage change.
|
|
60 |
+ |
|
61 |
+ Args:
|
|
62 |
+ job_name (str): name of the job to unsubscribe from.
|
|
63 |
+ peer (str): a unique string identifying the client.
|
|
64 |
+ |
|
65 |
+ Raises:
|
|
66 |
+ NotFoundError: If no job with `job_name` exists.
|
|
67 |
+ """
|
|
68 |
+ try:
|
|
69 |
+ job = self.__jobs_by_name[job_name]
|
|
70 |
+ except KeyError:
|
|
71 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
72 |
+ |
|
73 |
+ job.unregister_client(peer)
|
|
46 | 74 |
|
47 | 75 |
if job.n_clients == 0 and job.operation.done:
|
48 | 76 |
del self.__jobs_by_action[job.action_digest]
|