Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid
Commits:
-
158760b1
by Martin Blanchard at 2018-10-29T17:13:26Z
-
1c106889
by Martin Blanchard at 2018-10-30T09:47:53Z
-
5c3d1f04
by Martin Blanchard at 2018-10-30T11:13:26Z
-
adccb84c
by Martin Blanchard at 2018-10-30T13:08:06Z
6 changed files:
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/scheduler.py
- setup.py
Changes:
... | ... | @@ -21,11 +21,9 @@ An instance of the Remote Execution Service. |
21 | 21 |
|
22 | 22 |
import logging
|
23 | 23 |
|
24 |
-from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
|
24 |
+from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
|
|
25 | 25 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
|
26 | 26 |
|
27 |
-from ..job import Job
|
|
28 |
- |
|
29 | 27 |
|
30 | 28 |
class ExecutionInstance:
|
31 | 29 |
|
... | ... | @@ -37,7 +35,7 @@ class ExecutionInstance: |
37 | 35 |
def register_instance_with_server(self, instance_name, server):
|
38 | 36 |
server.add_execution_instance(self, instance_name)
|
39 | 37 |
|
40 |
- def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
|
38 |
+ def execute(self, action_digest, skip_cache_lookup, peer=None, message_queue=None):
|
|
41 | 39 |
""" Sends a job for execution.
|
42 | 40 |
Queues an action and creates an Operation instance to be associated with
|
43 | 41 |
this action.
|
... | ... | @@ -48,28 +46,27 @@ class ExecutionInstance: |
48 | 46 |
if not action:
|
49 | 47 |
raise FailedPreconditionError("Could not get action from storage.")
|
50 | 48 |
|
51 |
- job = Job(action, action_digest)
|
|
52 |
- if message_queue is not None:
|
|
53 |
- job.register_client(message_queue)
|
|
49 |
+ job = self._scheduler.queue_job(action, action_digest, skip_cache_lookup)
|
|
54 | 50 |
|
55 | 51 |
self.logger.info("Operation name: [{}]".format(job.name))
|
56 | 52 |
|
57 |
- self._scheduler.queue_job(job, skip_cache_lookup)
|
|
53 |
+ if peer is not None and message_queue is not None:
|
|
54 |
+ job.register_client(peer, message_queue)
|
|
58 | 55 |
|
59 | 56 |
return job.operation
|
60 | 57 |
|
61 |
- def register_message_client(self, name, queue):
|
|
58 |
+ def register_message_client(self, job_name, peer, message_queue):
|
|
62 | 59 |
try:
|
63 |
- self._scheduler.register_client(name, queue)
|
|
60 |
+ self._scheduler.register_client(job_name, peer, message_queue)
|
|
64 | 61 |
|
65 |
- except KeyError:
|
|
62 |
+ except NotFoundError:
|
|
66 | 63 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
67 | 64 |
|
68 |
- def unregister_message_client(self, name, queue):
|
|
65 |
+ def unregister_message_client(self, job_name, peer):
|
|
69 | 66 |
try:
|
70 |
- self._scheduler.unregister_client(name, queue)
|
|
67 |
+ self._scheduler.unregister_client(job_name, peer)
|
|
71 | 68 |
|
72 |
- except KeyError:
|
|
69 |
+ except NotFoundError:
|
|
73 | 70 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
74 | 71 |
|
75 | 72 |
def stream_operation_updates(self, message_queue, operation_name):
|
... | ... | @@ -47,10 +47,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
47 | 47 |
instance = self._get_instance(request.instance_name)
|
48 | 48 |
operation = instance.execute(request.action_digest,
|
49 | 49 |
request.skip_cache_lookup,
|
50 |
- message_queue)
|
|
50 |
+ peer=context.peer(),
|
|
51 |
+ message_queue=message_queue)
|
|
51 | 52 |
|
52 | 53 |
context.add_callback(partial(instance.unregister_message_client,
|
53 |
- operation.name, message_queue))
|
|
54 |
+ operation.name, context.peer()))
|
|
54 | 55 |
|
55 | 56 |
yield from instance.stream_operation_updates(message_queue,
|
56 | 57 |
operation.name)
|
... | ... | @@ -26,10 +26,11 @@ from buildgrid._protos.google.longrunning import operations_pb2 |
26 | 26 |
|
27 | 27 |
class Job:
|
28 | 28 |
|
29 |
- def __init__(self, action, action_digest):
|
|
29 |
+ def __init__(self, action, action_digest, priority=0):
|
|
30 | 30 |
self.logger = logging.getLogger(__name__)
|
31 | 31 |
|
32 | 32 |
self._name = str(uuid.uuid4())
|
33 |
+ self._priority = priority
|
|
33 | 34 |
self._action = remote_execution_pb2.Action()
|
34 | 35 |
self._operation = operations_pb2.Operation()
|
35 | 36 |
self._lease = None
|
... | ... | @@ -45,7 +46,7 @@ class Job: |
45 | 46 |
|
46 | 47 |
self._action.CopyFrom(action)
|
47 | 48 |
self._do_not_cache = self._action.do_not_cache
|
48 |
- self._operation_update_queues = []
|
|
49 |
+ self._operation_update_queues = {}
|
|
49 | 50 |
self._operation.name = self._name
|
50 | 51 |
self._operation.done = False
|
51 | 52 |
self._n_tries = 0
|
... | ... | @@ -54,6 +55,10 @@ class Job: |
54 | 55 |
def name(self):
|
55 | 56 |
return self._name
|
56 | 57 |
|
58 |
+ @property
|
|
59 |
+ def priority(self):
|
|
60 |
+ return self._priority
|
|
61 |
+ |
|
57 | 62 |
@property
|
58 | 63 |
def do_not_cache(self):
|
59 | 64 |
return self._do_not_cache
|
... | ... | @@ -100,22 +105,26 @@ class Job: |
100 | 105 |
def n_clients(self):
|
101 | 106 |
return len(self._operation_update_queues)
|
102 | 107 |
|
103 |
- def register_client(self, queue):
|
|
104 |
- """Subscribes to the job's :class:`Operation` stage change events.
|
|
108 |
+ def register_client(self, peer, message_queue):
|
|
109 |
+ """Subscribes to the job's :class:`Operation` stage changes.
|
|
105 | 110 |
|
106 | 111 |
Args:
|
107 |
- queue (queue.Queue): the event queue to register.
|
|
112 |
+ peer (str): a unique string identifying the client.
|
|
113 |
+ message_queue (queue.Queue): the event queue to register.
|
|
108 | 114 |
"""
|
109 |
- self._operation_update_queues.append(queue)
|
|
110 |
- queue.put(self._operation)
|
|
115 |
+ if peer not in self._operation_update_queues:
|
|
116 |
+ self._operation_update_queues[peer] = message_queue
|
|
117 |
+ |
|
118 |
+ message_queue.put(self._operation)
|
|
111 | 119 |
|
112 |
- def unregister_client(self, queue):
|
|
113 |
- """Unsubscribes to the job's :class:`Operation` stage change events.
|
|
120 |
+ def unregister_client(self, peer):
|
|
121 |
+ """Unsubscribes to the job's :class:`Operation` stage change.
|
|
114 | 122 |
|
115 | 123 |
Args:
|
116 |
- queue (queue.Queue): the event queue to unregister.
|
|
124 |
+ peer (str): a unique string identifying the client.
|
|
117 | 125 |
"""
|
118 |
- self._operation_update_queues.remove(queue)
|
|
126 |
+ if peer not in self._operation_update_queues:
|
|
127 |
+ del self._operation_update_queues[peer]
|
|
119 | 128 |
|
120 | 129 |
def set_cached_result(self, action_result):
|
121 | 130 |
"""Allows specifying an action result form the action cache for the job.
|
... | ... | @@ -211,5 +220,5 @@ class Job: |
211 | 220 |
|
212 | 221 |
self._operation.metadata.Pack(self.__operation_metadata)
|
213 | 222 |
|
214 |
- for queue in self._operation_update_queues:
|
|
223 |
+ for queue in self._operation_update_queues.values():
|
|
215 | 224 |
queue.put(self._operation)
|
... | ... | @@ -58,18 +58,18 @@ class OperationsInstance: |
58 | 58 |
except KeyError:
|
59 | 59 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
60 | 60 |
|
61 |
- def register_message_client(self, name, queue):
|
|
61 |
+ def register_message_client(self, job_name, peer, message_queue):
|
|
62 | 62 |
try:
|
63 |
- self._scheduler.register_client(name, queue)
|
|
63 |
+ self._scheduler.register_client(job_name, peer, message_queue)
|
|
64 | 64 |
|
65 |
- except KeyError:
|
|
65 |
+ except NotFoundError:
|
|
66 | 66 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
67 | 67 |
|
68 |
- def unregister_message_client(self, name, queue):
|
|
68 |
+ def unregister_message_client(self, job_name, peer):
|
|
69 | 69 |
try:
|
70 |
- self._scheduler.unregister_client(name, queue)
|
|
70 |
+ self._scheduler.unregister_client(job_name, peer)
|
|
71 | 71 |
|
72 |
- except KeyError:
|
|
72 |
+ except NotFoundError:
|
|
73 | 73 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
74 | 74 |
|
75 | 75 |
def stream_operation_updates(self, message_queue, operation_name):
|
... | ... | @@ -23,7 +23,7 @@ from collections import deque |
23 | 23 |
|
24 | 24 |
from buildgrid._exceptions import NotFoundError
|
25 | 25 |
|
26 |
-from .job import OperationStage, LeaseState
|
|
26 |
+from .job import Job, OperationStage, LeaseState
|
|
27 | 27 |
|
28 | 28 |
|
29 | 29 |
class Scheduler:
|
... | ... | @@ -32,25 +32,76 @@ class Scheduler: |
32 | 32 |
|
33 | 33 |
def __init__(self, action_cache=None):
|
34 | 34 |
self._action_cache = action_cache
|
35 |
- self.jobs = {}
|
|
35 |
+ self.__jobs_by_action = {}
|
|
36 |
+ self.__jobs_by_name = {}
|
|
36 | 37 |
self.queue = deque()
|
37 | 38 |
|
38 |
- def register_client(self, job_name, queue):
|
|
39 |
- self.jobs[job_name].register_client(queue)
|
|
39 |
+ def register_client(self, job_name, peer, message_queue):
|
|
40 |
+ """Subscribes to one of the job's :class:`Operation` stage changes.
|
|
40 | 41 |
|
41 |
- def unregister_client(self, job_name, queue):
|
|
42 |
- self.jobs[job_name].unregister_client(queue)
|
|
42 |
+ Args:
|
|
43 |
+ job_name (str): name of the job to subcribe to.
|
|
44 |
+ peer (str): a unique string identifying the client.
|
|
45 |
+ message_queue (queue.Queue): the event queue to register.
|
|
46 |
+ |
|
47 |
+ Raises:
|
|
48 |
+ NotFoundError: If no job with `job_name` exists.
|
|
49 |
+ """
|
|
50 |
+ try:
|
|
51 |
+ self.__jobs_by_name[job_name].register_client(peer, message_queue)
|
|
52 |
+ |
|
53 |
+ except KeyError:
|
|
54 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
55 |
+ |
|
56 |
+ def unregister_client(self, job_name, peer):
|
|
57 |
+ """Unsubscribes to one of the job's :class:`Operation` stage changes.
|
|
58 |
+ |
|
59 |
+ Args:
|
|
60 |
+ job_name (str): name of the job to unsubcribe from.
|
|
61 |
+ peer (str): a unique string identifying the client.
|
|
62 |
+ |
|
63 |
+ Raises:
|
|
64 |
+ NotFoundError: If no job with `job_name` exists.
|
|
65 |
+ """
|
|
66 |
+ try:
|
|
67 |
+ self.__jobs_by_name[job_name].unregister_client(peer)
|
|
68 |
+ |
|
69 |
+ except KeyError:
|
|
70 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
43 | 71 |
|
44 |
- if not self.jobs[job_name].n_clients and self.jobs[job_name].operation.done:
|
|
45 |
- del self.jobs[job_name]
|
|
72 |
+ if (self.__jobs_by_name[job_name].n_clients == 0 and
|
|
73 |
+ self.__jobs_by_name[job_name].operation.done):
|
|
74 |
+ del self.__jobs_by_name[job_name]
|
|
46 | 75 |
|
47 |
- def queue_job(self, job, skip_cache_lookup=False):
|
|
48 |
- self.jobs[job.name] = job
|
|
76 |
+ def queue_job(self, action, action_digest, priority=0, skip_cache_lookup=False):
|
|
77 |
+ """Inserts a newly created job into the execution queue.
|
|
78 |
+ |
|
79 |
+ Args:
|
|
80 |
+ action (Action): the given action to queue for execution.
|
|
81 |
+ action_digest (Digest): the digest of the given action.
|
|
82 |
+ priority (int): the execution job's priority.
|
|
83 |
+ skip_cache_lookup (bool): whether or not to look for pre-computed
|
|
84 |
+ result for the given action.
|
|
85 |
+ """
|
|
86 |
+ if action_digest.hash in self.__jobs_by_action:
|
|
87 |
+ job = self.__jobs_by_action[action_digest.hash]
|
|
88 |
+ |
|
89 |
+ if priority < job.priority:
|
|
90 |
+ #TODO: We need to requeue here
|
|
91 |
+ job.priority = priority
|
|
92 |
+ |
|
93 |
+ return job
|
|
94 |
+ |
|
95 |
+ job = Job(action, action_digest, priority=priority)
|
|
96 |
+ |
|
97 |
+ self.__jobs_by_action[job.action_digest.hash] = job
|
|
98 |
+ self.__jobs_by_name[job.name] = job
|
|
49 | 99 |
|
50 | 100 |
operation_stage = None
|
51 | 101 |
if self._action_cache is not None and not skip_cache_lookup:
|
52 | 102 |
try:
|
53 | 103 |
action_result = self._action_cache.get_action_result(job.action_digest)
|
104 |
+ |
|
54 | 105 |
except NotFoundError:
|
55 | 106 |
operation_stage = OperationStage.QUEUED
|
56 | 107 |
self.queue.append(job)
|
... | ... | @@ -65,9 +116,11 @@ class Scheduler: |
65 | 116 |
|
66 | 117 |
job.update_operation_stage(operation_stage)
|
67 | 118 |
|
119 |
+ return job
|
|
120 |
+ |
|
68 | 121 |
def retry_job(self, job_name):
|
69 |
- if job_name in self.jobs:
|
|
70 |
- job = self.jobs[job_name]
|
|
122 |
+ if job_name in self.__jobs_by_name:
|
|
123 |
+ job = self.__jobs_by_name[job_name]
|
|
71 | 124 |
if job.n_tries >= self.MAX_N_TRIES:
|
72 | 125 |
# TODO: Decide what to do with these jobs
|
73 | 126 |
job.update_operation_stage(OperationStage.COMPLETED)
|
... | ... | @@ -77,7 +130,7 @@ class Scheduler: |
77 | 130 |
self.queue.appendleft(job)
|
78 | 131 |
|
79 | 132 |
def list_jobs(self):
|
80 |
- return self.jobs.values()
|
|
133 |
+ return self.__jobs_by_name.values()
|
|
81 | 134 |
|
82 | 135 |
def request_job_leases(self, worker_capabilities):
|
83 | 136 |
"""Generates a list of the highest priority leases to be run.
|
... | ... | @@ -107,7 +160,7 @@ class Scheduler: |
107 | 160 |
lease_result (google.protobuf.Any): the lease execution result, only
|
108 | 161 |
required if `lease_state` is `COMPLETED`.
|
109 | 162 |
"""
|
110 |
- job = self.jobs[job_name]
|
|
163 |
+ job = self.__jobs_by_name[job_name]
|
|
111 | 164 |
|
112 | 165 |
if lease_state == LeaseState.PENDING:
|
113 | 166 |
job.update_lease_state(LeaseState.PENDING)
|
... | ... | @@ -128,8 +181,8 @@ class Scheduler: |
128 | 181 |
|
129 | 182 |
def get_job_lease(self, job_name):
|
130 | 183 |
"""Returns the lease associated to job, if any have been emitted yet."""
|
131 |
- return self.jobs[job_name].lease
|
|
184 |
+ return self.__jobs_by_name[job_name].lease
|
|
132 | 185 |
|
133 | 186 |
def get_job_operation(self, job_name):
|
134 | 187 |
"""Returns the operation associated to job."""
|
135 |
- return self.jobs[job_name].operation
|
|
188 |
+ return self.__jobs_by_name[job_name].operation
|
... | ... | @@ -86,7 +86,7 @@ def get_cmdclass(): |
86 | 86 |
return cmdclass
|
87 | 87 |
|
88 | 88 |
tests_require = [
|
89 |
- 'coverage == 4.4.0',
|
|
89 |
+ 'coverage >= 4.5.0',
|
|
90 | 90 |
'moto',
|
91 | 91 |
'pep8',
|
92 | 92 |
'psutil',
|