Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid
Commits:
-
daab6b01
by Martin Blanchard at 2018-10-30T17:07:58Z
6 changed files:
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/operations/service.py
- buildgrid/server/scheduler.py
Changes:
| ... | ... | @@ -21,7 +21,7 @@ An instance of the Remote Execution Service. |
| 21 | 21 |
|
| 22 | 22 |
import logging
|
| 23 | 23 |
|
| 24 |
-from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
|
| 24 |
+from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
|
|
| 25 | 25 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
|
| 26 | 26 |
|
| 27 | 27 |
|
| ... | ... | @@ -35,7 +35,7 @@ class ExecutionInstance: |
| 35 | 35 |
def register_instance_with_server(self, instance_name, server):
|
| 36 | 36 |
server.add_execution_instance(self, instance_name)
|
| 37 | 37 |
|
| 38 |
- def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
|
| 38 |
+ def execute(self, action_digest, skip_cache_lookup, peer=None, message_queue=None):
|
|
| 39 | 39 |
""" Sends a job for execution.
|
| 40 | 40 |
Queues an action and creates an Operation instance to be associated with
|
| 41 | 41 |
this action.
|
| ... | ... | @@ -48,24 +48,24 @@ class ExecutionInstance: |
| 48 | 48 |
|
| 49 | 49 |
job = self._scheduler.queue_job(action, action_digest, skip_cache_lookup)
|
| 50 | 50 |
|
| 51 |
- if message_queue is not None:
|
|
| 52 |
- job.register_client(message_queue)
|
|
| 51 |
+ if peer is not None and message_queue is not None:
|
|
| 52 |
+ job.register_client(peer, message_queue)
|
|
| 53 | 53 |
|
| 54 | 54 |
return job.operation
|
| 55 | 55 |
|
| 56 |
- def register_message_client(self, name, queue):
|
|
| 56 |
+ def register_message_client(self, job_name, peer, message_queue):
|
|
| 57 | 57 |
try:
|
| 58 |
- self._scheduler.register_client(name, queue)
|
|
| 58 |
+ self._scheduler.register_client(job_name, peer, message_queue)
|
|
| 59 | 59 |
|
| 60 |
- except KeyError:
|
|
| 61 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
| 60 |
+ except NotFoundError:
|
|
| 61 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
|
| 62 | 62 |
|
| 63 |
- def unregister_message_client(self, name, queue):
|
|
| 63 |
+ def unregister_message_client(self, job_name, peer):
|
|
| 64 | 64 |
try:
|
| 65 |
- self._scheduler.unregister_client(name, queue)
|
|
| 65 |
+ self._scheduler.unregister_client(job_name, peer)
|
|
| 66 | 66 |
|
| 67 |
- except KeyError:
|
|
| 68 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
| 67 |
+ except NotFoundError:
|
|
| 68 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
|
| 69 | 69 |
|
| 70 | 70 |
def stream_operation_updates(self, message_queue, operation_name):
|
| 71 | 71 |
operation = message_queue.get()
|
| ... | ... | @@ -47,10 +47,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 47 | 47 |
instance = self._get_instance(request.instance_name)
|
| 48 | 48 |
operation = instance.execute(request.action_digest,
|
| 49 | 49 |
request.skip_cache_lookup,
|
| 50 |
- message_queue)
|
|
| 50 |
+ peer=context.peer(),
|
|
| 51 |
+ message_queue=message_queue)
|
|
| 51 | 52 |
|
| 52 | 53 |
context.add_callback(partial(instance.unregister_message_client,
|
| 53 |
- operation.name, message_queue))
|
|
| 54 |
+ operation.name, context.peer()))
|
|
| 54 | 55 |
|
| 55 | 56 |
instanced_op_name = "{}/{}".format(request.instance_name,
|
| 56 | 57 |
operation.name)
|
| ... | ... | @@ -88,10 +89,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 88 | 89 |
operation_name = names[-1]
|
| 89 | 90 |
instance = self._get_instance(instance_name)
|
| 90 | 91 |
|
| 91 |
- instance.register_message_client(operation_name, message_queue)
|
|
| 92 |
+ instance.register_message_client(operation_name,
|
|
| 93 |
+ context.peer(), message_queue)
|
|
| 92 | 94 |
|
| 93 | 95 |
context.add_callback(partial(instance.unregister_message_client,
|
| 94 |
- operation_name, message_queue))
|
|
| 96 |
+ operation_name, context.peer()))
|
|
| 95 | 97 |
|
| 96 | 98 |
for operation in instance.stream_operation_updates(message_queue,
|
| 97 | 99 |
operation_name):
|
| ... | ... | @@ -46,7 +46,7 @@ class Job: |
| 46 | 46 |
|
| 47 | 47 |
self._action.CopyFrom(action)
|
| 48 | 48 |
self._do_not_cache = self._action.do_not_cache
|
| 49 |
- self._operation_update_queues = []
|
|
| 49 |
+ self._operation_update_queues = {}
|
|
| 50 | 50 |
self._operation.name = self._name
|
| 51 | 51 |
self._operation.done = False
|
| 52 | 52 |
self._n_tries = 0
|
| ... | ... | @@ -113,22 +113,26 @@ class Job: |
| 113 | 113 |
def __ne__(self, other):
|
| 114 | 114 |
return not self.__eq__(other)
|
| 115 | 115 |
|
| 116 |
- def register_client(self, queue):
|
|
| 117 |
- """Subscribes to the job's :class:`Operation` stage change events.
|
|
| 116 |
+ def register_client(self, peer, message_queue):
|
|
| 117 |
+ """Subscribes to the job's :class:`Operation` stage changes.
|
|
| 118 | 118 |
|
| 119 | 119 |
Args:
|
| 120 |
- queue (queue.Queue): the event queue to register.
|
|
| 120 |
+ peer (str): a unique string identifying the client.
|
|
| 121 |
+ message_queue (queue.Queue): the event queue to register.
|
|
| 121 | 122 |
"""
|
| 122 |
- self._operation_update_queues.append(queue)
|
|
| 123 |
- queue.put(self._operation)
|
|
| 123 |
+ if peer not in self._operation_update_queues:
|
|
| 124 |
+ self._operation_update_queues[peer] = message_queue
|
|
| 124 | 125 |
|
| 125 |
- def unregister_client(self, queue):
|
|
| 126 |
- """Unsubscribes to the job's :class:`Operation` stage change events.
|
|
| 126 |
+ message_queue.put(self._operation)
|
|
| 127 |
+ |
|
| 128 |
+ def unregister_client(self, peer):
|
|
| 129 |
+ """Unsubscribes to the job's :class:`Operation` stage change.
|
|
| 127 | 130 |
|
| 128 | 131 |
Args:
|
| 129 |
- queue (queue.Queue): the event queue to unregister.
|
|
| 132 |
+ peer (str): a unique string identifying the client.
|
|
| 130 | 133 |
"""
|
| 131 |
- self._operation_update_queues.remove(queue)
|
|
| 134 |
+ if peer not in self._operation_update_queues:
|
|
| 135 |
+ del self._operation_update_queues[peer]
|
|
| 132 | 136 |
|
| 133 | 137 |
def set_cached_result(self, action_result):
|
| 134 | 138 |
"""Allows specifying an action result form the action cache for the job.
|
| ... | ... | @@ -224,5 +228,5 @@ class Job: |
| 224 | 228 |
|
| 225 | 229 |
self._operation.metadata.Pack(self.__operation_metadata)
|
| 226 | 230 |
|
| 227 |
- for queue in self._operation_update_queues:
|
|
| 231 |
+ for queue in self._operation_update_queues.values():
|
|
| 228 | 232 |
queue.put(self._operation)
|
| ... | ... | @@ -57,13 +57,12 @@ class OperationsInstance: |
| 57 | 57 |
|
| 58 | 58 |
return response
|
| 59 | 59 |
|
| 60 |
- def delete_operation(self, name):
|
|
| 60 |
+ def delete_operation(self, job_name, peer):
|
|
| 61 | 61 |
try:
|
| 62 |
- # TODO: Unregister the caller client
|
|
| 63 |
- pass
|
|
| 62 |
+ self._scheduler.unregister_client(job_name, peer)
|
|
| 64 | 63 |
|
| 65 | 64 |
except NotFoundError:
|
| 66 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
| 65 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
|
| 67 | 66 |
|
| 68 | 67 |
def register_message_client(self, job_name, peer, message_queue):
|
| 69 | 68 |
try:
|
| ... | ... | @@ -93,7 +93,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
| 93 | 93 |
instance = self._get_instance(instance_name)
|
| 94 | 94 |
|
| 95 | 95 |
operation_name = self._parse_operation_name(name)
|
| 96 |
- instance.delete_operation(operation_name)
|
|
| 96 |
+ instance.delete_operation(operation_name, context.peer())
|
|
| 97 | 97 |
|
| 98 | 98 |
except InvalidArgumentError as e:
|
| 99 | 99 |
self.logger.error(e)
|
| ... | ... | @@ -37,12 +37,40 @@ class Scheduler: |
| 37 | 37 |
self.__jobs_by_name = {}
|
| 38 | 38 |
self.__queue = deque()
|
| 39 | 39 |
|
| 40 |
- def register_client(self, job_name, queue):
|
|
| 41 |
- self.__jobs_by_name[job_name].register_client(queue)
|
|
| 40 |
+ def register_client(self, job_name, peer, message_queue):
|
|
| 41 |
+ """Subscribes to one of the job's :class:`Operation` stage changes.
|
|
| 42 | 42 |
|
| 43 |
- def unregister_client(self, job_name, queue):
|
|
| 44 |
- job = self.__jobs_by_name[job_name]
|
|
| 45 |
- job.unregister_client(queue)
|
|
| 43 |
+ Args:
|
|
| 44 |
+ job_name (str): name of the job subscribe to.
|
|
| 45 |
+ peer (str): a unique string identifying the client.
|
|
| 46 |
+ message_queue (queue.Queue): the event queue to register.
|
|
| 47 |
+ |
|
| 48 |
+ Raises:
|
|
| 49 |
+ NotFoundError: If no job with `job_name` exists.
|
|
| 50 |
+ """
|
|
| 51 |
+ try:
|
|
| 52 |
+ job = self.__jobs_by_name[job_name]
|
|
| 53 |
+ except KeyError:
|
|
| 54 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
| 55 |
+ |
|
| 56 |
+ job.register_client(peer, message_queue)
|
|
| 57 |
+ |
|
| 58 |
+ def unregister_client(self, job_name, peer):
|
|
| 59 |
+ """Unsubscribes to one of the job's :class:`Operation` stage change.
|
|
| 60 |
+ |
|
| 61 |
+ Args:
|
|
| 62 |
+ job_name (str): name of the job to unsubscribe from.
|
|
| 63 |
+ peer (str): a unique string identifying the client.
|
|
| 64 |
+ |
|
| 65 |
+ Raises:
|
|
| 66 |
+ NotFoundError: If no job with `job_name` exists.
|
|
| 67 |
+ """
|
|
| 68 |
+ try:
|
|
| 69 |
+ job = self.__jobs_by_name[job_name]
|
|
| 70 |
+ except KeyError:
|
|
| 71 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
| 72 |
+ |
|
| 73 |
+ job.unregister_client(peer)
|
|
| 46 | 74 |
|
| 47 | 75 |
if job.n_clients == 0 and job.operation.done:
|
| 48 | 76 |
del self.__jobs_by_action[job.action_digest]
|
