Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid
Commits:
-
81eece16
by Martin Blanchard at 2018-10-31T16:05:09Z
-
c9aba4f5
by Finn at 2018-11-01T10:23:02Z
-
ebc3b0c4
by Finn at 2018-11-01T10:23:02Z
-
537c89ac
by Finn at 2018-11-01T13:31:19Z
-
af4189e3
by Finn at 2018-11-01T13:31:29Z
-
17174883
by Finn at 2018-11-01T13:31:29Z
-
77b95cdd
by Finn at 2018-11-01T13:31:29Z
-
cfd36752
by Finn at 2018-11-01T13:31:29Z
-
09f1f7cc
by Finn at 2018-11-01T13:31:29Z
-
129e2d71
by Martin Blanchard at 2018-11-01T17:03:22Z
-
634b3a80
by Martin Blanchard at 2018-11-01T17:07:00Z
-
825e7da3
by Martin Blanchard at 2018-11-01T17:07:50Z
12 changed files:
- buildgrid/_app/commands/cmd_operation.py
- buildgrid/_exceptions.py
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/operations/service.py
- buildgrid/server/scheduler.py
- setup.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
- tests/integration/operations_service.py
Changes:
... | ... | @@ -155,6 +155,19 @@ def status(context, operation_name, json): |
155 | 155 |
click.echo(json_format.MessageToJson(operation))
|
156 | 156 |
|
157 | 157 |
|
158 |
+@cli.command('cancel', short_help="Cancel an operation.")
|
|
159 |
+@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
|
|
160 |
+@pass_context
|
|
161 |
+def cancel(context, operation_name):
|
|
162 |
+ context.logger.info("Cancelling an operation...")
|
|
163 |
+ stub = operations_pb2_grpc.OperationsStub(context.channel)
|
|
164 |
+ |
|
165 |
+ request = operations_pb2.CancelOperationRequest(name=operation_name)
|
|
166 |
+ |
|
167 |
+ stub.CancelOperation(request)
|
|
168 |
+ context.logger.info("Operation cancelled: [{}]".format(request))
|
|
169 |
+ |
|
170 |
+ |
|
158 | 171 |
@cli.command('list', short_help="List operations.")
|
159 | 172 |
@click.option('--json', is_flag=True, show_default=True,
|
160 | 173 |
help="Print operations list in JSON format.")
|
... | ... | @@ -52,6 +52,12 @@ class BotError(BgdError): |
52 | 52 |
super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
|
53 | 53 |
|
54 | 54 |
|
55 |
+class CancelledError(BgdError):
|
|
56 |
+ """The job was cancelled and any callers should be notified"""
|
|
57 |
+ def __init__(self, message, detail=None, reason=None):
|
|
58 |
+ super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
|
|
59 |
+ |
|
60 |
+ |
|
55 | 61 |
class InvalidArgumentError(BgdError):
|
56 | 62 |
"""A bad argument was passed, such as a name which doesn't exist."""
|
57 | 63 |
def __init__(self, message, detail=None, reason=None):
|
... | ... | @@ -21,11 +21,9 @@ An instance of the Remote Execution Service. |
21 | 21 |
|
22 | 22 |
import logging
|
23 | 23 |
|
24 |
-from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
|
24 |
+from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
|
|
25 | 25 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
|
26 | 26 |
|
27 |
-from ..job import Job
|
|
28 |
- |
|
29 | 27 |
|
30 | 28 |
class ExecutionInstance:
|
31 | 29 |
|
... | ... | @@ -37,7 +35,7 @@ class ExecutionInstance: |
37 | 35 |
def register_instance_with_server(self, instance_name, server):
|
38 | 36 |
server.add_execution_instance(self, instance_name)
|
39 | 37 |
|
40 |
- def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
|
38 |
+ def execute(self, action_digest, skip_cache_lookup, peer=None, message_queue=None):
|
|
41 | 39 |
""" Sends a job for execution.
|
42 | 40 |
Queues an action and creates an Operation instance to be associated with
|
43 | 41 |
this action.
|
... | ... | @@ -48,27 +46,26 @@ class ExecutionInstance: |
48 | 46 |
if not action:
|
49 | 47 |
raise FailedPreconditionError("Could not get action from storage.")
|
50 | 48 |
|
51 |
- job = Job(action, action_digest)
|
|
52 |
- if message_queue is not None:
|
|
53 |
- job.register_client(message_queue)
|
|
49 |
+ job = self._scheduler.queue_job(action, action_digest, skip_cache_lookup)
|
|
54 | 50 |
|
55 |
- self._scheduler.queue_job(job, skip_cache_lookup)
|
|
51 |
+ if peer is not None and message_queue is not None:
|
|
52 |
+ job.register_client(peer, message_queue)
|
|
56 | 53 |
|
57 | 54 |
return job.operation
|
58 | 55 |
|
59 |
- def register_message_client(self, name, queue):
|
|
56 |
+ def register_message_client(self, job_name, peer, message_queue):
|
|
60 | 57 |
try:
|
61 |
- self._scheduler.register_client(name, queue)
|
|
58 |
+ self._scheduler.register_client(job_name, peer, message_queue)
|
|
62 | 59 |
|
63 |
- except KeyError:
|
|
64 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
60 |
+ except NotFoundError:
|
|
61 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
|
65 | 62 |
|
66 |
- def unregister_message_client(self, name, queue):
|
|
63 |
+ def unregister_message_client(self, job_name, peer):
|
|
67 | 64 |
try:
|
68 |
- self._scheduler.unregister_client(name, queue)
|
|
65 |
+ self._scheduler.unregister_client(job_name, peer)
|
|
69 | 66 |
|
70 |
- except KeyError:
|
|
71 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
67 |
+ except NotFoundError:
|
|
68 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
|
72 | 69 |
|
73 | 70 |
def stream_operation_updates(self, message_queue, operation_name):
|
74 | 71 |
operation = message_queue.get()
|
... | ... | @@ -26,7 +26,7 @@ from functools import partial |
26 | 26 |
|
27 | 27 |
import grpc
|
28 | 28 |
|
29 |
-from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
|
29 |
+from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
|
|
30 | 30 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
31 | 31 |
from buildgrid._protos.google.longrunning import operations_pb2
|
32 | 32 |
|
... | ... | @@ -47,10 +47,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
47 | 47 |
instance = self._get_instance(request.instance_name)
|
48 | 48 |
operation = instance.execute(request.action_digest,
|
49 | 49 |
request.skip_cache_lookup,
|
50 |
- message_queue)
|
|
50 |
+ peer=context.peer(),
|
|
51 |
+ message_queue=message_queue)
|
|
51 | 52 |
|
52 | 53 |
context.add_callback(partial(instance.unregister_message_client,
|
53 |
- operation.name, message_queue))
|
|
54 |
+ operation.name, context.peer()))
|
|
54 | 55 |
|
55 | 56 |
instanced_op_name = "{}/{}".format(request.instance_name,
|
56 | 57 |
operation.name)
|
... | ... | @@ -76,6 +77,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
76 | 77 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
77 | 78 |
yield operations_pb2.Operation()
|
78 | 79 |
|
80 |
+ except CancelledError as e:
|
|
81 |
+ self.logger.error(e)
|
|
82 |
+ context.set_details(str(e))
|
|
83 |
+ context.set_code(grpc.StatusCode.CANCELLED)
|
|
84 |
+ yield operations_pb2.Operation()
|
|
85 |
+ |
|
79 | 86 |
def WaitExecution(self, request, context):
|
80 | 87 |
try:
|
81 | 88 |
names = request.name.split("/")
|
... | ... | @@ -88,10 +95,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
88 | 95 |
operation_name = names[-1]
|
89 | 96 |
instance = self._get_instance(instance_name)
|
90 | 97 |
|
91 |
- instance.register_message_client(operation_name, message_queue)
|
|
98 |
+ instance.register_message_client(operation_name,
|
|
99 |
+ context.peer(), message_queue)
|
|
92 | 100 |
|
93 | 101 |
context.add_callback(partial(instance.unregister_message_client,
|
94 |
- operation_name, message_queue))
|
|
102 |
+ operation_name, context.peer()))
|
|
95 | 103 |
|
96 | 104 |
for operation in instance.stream_operation_updates(message_queue,
|
97 | 105 |
operation_name):
|
... | ... | @@ -106,6 +114,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
106 | 114 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
107 | 115 |
yield operations_pb2.Operation()
|
108 | 116 |
|
117 |
+ except CancelledError as e:
|
|
118 |
+ self.logger.error(e)
|
|
119 |
+ context.set_details(str(e))
|
|
120 |
+ context.set_code(grpc.StatusCode.CANCELLED)
|
|
121 |
+ yield operations_pb2.Operation()
|
|
122 |
+ |
|
109 | 123 |
def _get_instance(self, name):
|
110 | 124 |
try:
|
111 | 125 |
return self._instances[name]
|
... | ... | @@ -19,33 +19,40 @@ import uuid |
19 | 19 |
from google.protobuf import timestamp_pb2
|
20 | 20 |
|
21 | 21 |
from buildgrid._enums import LeaseState, OperationStage
|
22 |
+from buildgrid._exceptions import CancelledError
|
|
22 | 23 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
23 | 24 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
24 | 25 |
from buildgrid._protos.google.longrunning import operations_pb2
|
26 |
+from buildgrid._protos.google.rpc import code_pb2
|
|
25 | 27 |
|
26 | 28 |
|
27 | 29 |
class Job:
|
28 | 30 |
|
29 |
- def __init__(self, action, action_digest):
|
|
31 |
+ def __init__(self, action, action_digest, priority=0):
|
|
30 | 32 |
self.logger = logging.getLogger(__name__)
|
31 | 33 |
|
32 | 34 |
self._name = str(uuid.uuid4())
|
35 |
+ self._priority = priority
|
|
33 | 36 |
self._action = remote_execution_pb2.Action()
|
34 | 37 |
self._operation = operations_pb2.Operation()
|
35 | 38 |
self._lease = None
|
36 | 39 |
|
37 | 40 |
self.__execute_response = None
|
38 | 41 |
self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
42 |
+ |
|
39 | 43 |
self.__queued_timestamp = timestamp_pb2.Timestamp()
|
40 | 44 |
self.__worker_start_timestamp = timestamp_pb2.Timestamp()
|
41 | 45 |
self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
|
42 | 46 |
|
47 |
+ self.__operation_cancelled = False
|
|
48 |
+ self.__lease_cancelled = False
|
|
49 |
+ |
|
43 | 50 |
self.__operation_metadata.action_digest.CopyFrom(action_digest)
|
44 | 51 |
self.__operation_metadata.stage = OperationStage.UNKNOWN.value
|
45 | 52 |
|
46 | 53 |
self._action.CopyFrom(action)
|
47 | 54 |
self._do_not_cache = self._action.do_not_cache
|
48 |
- self._operation_update_queues = []
|
|
55 |
+ self._operation_update_queues = {}
|
|
49 | 56 |
self._operation.name = self._name
|
50 | 57 |
self._operation.done = False
|
51 | 58 |
self._n_tries = 0
|
... | ... | @@ -54,6 +61,10 @@ class Job: |
54 | 61 |
def name(self):
|
55 | 62 |
return self._name
|
56 | 63 |
|
64 |
+ @property
|
|
65 |
+ def priority(self):
|
|
66 |
+ return self._priority
|
|
67 |
+ |
|
57 | 68 |
@property
|
58 | 69 |
def do_not_cache(self):
|
59 | 70 |
return self._do_not_cache
|
... | ... | @@ -100,22 +111,34 @@ class Job: |
100 | 111 |
def n_clients(self):
|
101 | 112 |
return len(self._operation_update_queues)
|
102 | 113 |
|
103 |
- def register_client(self, queue):
|
|
104 |
- """Subscribes to the job's :class:`Operation` stage change events.
|
|
114 |
+ def __eq__(self, other):
|
|
115 |
+ if isinstance(other, Job):
|
|
116 |
+ return self.name == other.name
|
|
117 |
+ return False
|
|
118 |
+ |
|
119 |
+ def __ne__(self, other):
|
|
120 |
+ return not self.__eq__(other)
|
|
121 |
+ |
|
122 |
+ def register_client(self, peer, message_queue):
|
|
123 |
+ """Subscribes to the job's :class:`Operation` stage changes.
|
|
105 | 124 |
|
106 | 125 |
Args:
|
107 |
- queue (queue.Queue): the event queue to register.
|
|
126 |
+ peer (str): a unique string identifying the client.
|
|
127 |
+ message_queue (queue.Queue): the event queue to register.
|
|
108 | 128 |
"""
|
109 |
- self._operation_update_queues.append(queue)
|
|
110 |
- queue.put(self._operation)
|
|
129 |
+ if peer not in self._operation_update_queues:
|
|
130 |
+ self._operation_update_queues[peer] = message_queue
|
|
111 | 131 |
|
112 |
- def unregister_client(self, queue):
|
|
113 |
- """Unsubscribes to the job's :class:`Operation` stage change events.
|
|
132 |
+ message_queue.put(self._operation)
|
|
133 |
+ |
|
134 |
+ def unregister_client(self, peer):
|
|
135 |
+ """Unsubscribes to the job's :class:`Operation` stage change.
|
|
114 | 136 |
|
115 | 137 |
Args:
|
116 |
- queue (queue.Queue): the event queue to unregister.
|
|
138 |
+ peer (str): a unique string identifying the client.
|
|
117 | 139 |
"""
|
118 |
- self._operation_update_queues.remove(queue)
|
|
140 |
+ if peer not in self._operation_update_queues:
|
|
141 |
+ del self._operation_update_queues[peer]
|
|
119 | 142 |
|
120 | 143 |
def set_cached_result(self, action_result):
|
121 | 144 |
"""Allows specifying an action result form the action cache for the job.
|
... | ... | @@ -130,7 +153,9 @@ class Job: |
130 | 153 |
Only one :class:`Lease` can be emitted for a given job. This method
|
131 | 154 |
should only be used once, any furhter calls are ignored.
|
132 | 155 |
"""
|
133 |
- if self._lease is not None:
|
|
156 |
+ if self.__operation_cancelled:
|
|
157 |
+ return None
|
|
158 |
+ elif self._lease is not None:
|
|
134 | 159 |
return None
|
135 | 160 |
|
136 | 161 |
self._lease = bots_pb2.Lease()
|
... | ... | @@ -171,7 +196,7 @@ class Job: |
171 | 196 |
action_result = remote_execution_pb2.ActionResult()
|
172 | 197 |
|
173 | 198 |
# TODO: Make a distinction between build and bot failures!
|
174 |
- if status.code != 0:
|
|
199 |
+ if status.code != code_pb2.OK:
|
|
175 | 200 |
self._do_not_cache = True
|
176 | 201 |
|
177 | 202 |
if result is not None:
|
... | ... | @@ -188,6 +213,15 @@ class Job: |
188 | 213 |
self.__execute_response.cached_result = False
|
189 | 214 |
self.__execute_response.status.CopyFrom(status)
|
190 | 215 |
|
216 |
+ def cancel_lease(self):
|
|
217 |
+ """Triggers a job's :class:Lease cancellation.
|
|
218 |
+ |
|
219 |
+ This will not cancel the job's :class:Operation.
|
|
220 |
+ """
|
|
221 |
+ self.__lease_cancelled = True
|
|
222 |
+ if self._lease is not None:
|
|
223 |
+ self.update_lease_state(LeaseState.CANCELLED)
|
|
224 |
+ |
|
191 | 225 |
def update_operation_stage(self, stage):
|
192 | 226 |
"""Operates a stage transition for the job's :class:Operation.
|
193 | 227 |
|
... | ... | @@ -211,5 +245,22 @@ class Job: |
211 | 245 |
|
212 | 246 |
self._operation.metadata.Pack(self.__operation_metadata)
|
213 | 247 |
|
214 |
- for queue in self._operation_update_queues:
|
|
248 |
+ for queue in self._operation_update_queues.values():
|
|
215 | 249 |
queue.put(self._operation)
|
250 |
+ |
|
251 |
+ def cancel_operation(self):
|
|
252 |
+ """Triggers a job's :class:Operation cancellation.
|
|
253 |
+ |
|
254 |
+ This will also cancel any job's :class:Lease that may have been issued.
|
|
255 |
+ """
|
|
256 |
+ self.__operation_cancelled = True
|
|
257 |
+ if self._lease is not None:
|
|
258 |
+ self.cancel_lease()
|
|
259 |
+ |
|
260 |
+ self.__execute_response = remote_execution_pb2.ExecuteResponse()
|
|
261 |
+ self.__execute_response.status.code = code_pb2.CANCELLED
|
|
262 |
+ self.__execute_response.status.message = "Operation cancelled by client."
|
|
263 |
+ |
|
264 |
+ self.update_operation_stage(OperationStage.COMPLETED)
|
|
265 |
+ |
|
266 |
+ raise CancelledError("Operation cancelled: {}".format(self._name))
|
... | ... | @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service. |
21 | 21 |
|
22 | 22 |
import logging
|
23 | 23 |
|
24 |
-from buildgrid._exceptions import InvalidArgumentError
|
|
24 |
+from buildgrid._exceptions import InvalidArgumentError, NotFoundError
|
|
25 | 25 |
from buildgrid._protos.google.longrunning import operations_pb2
|
26 | 26 |
|
27 | 27 |
|
... | ... | @@ -34,14 +34,14 @@ class OperationsInstance: |
34 | 34 |
def register_instance_with_server(self, instance_name, server):
|
35 | 35 |
server.add_operations_instance(self, instance_name)
|
36 | 36 |
|
37 |
- def get_operation(self, name):
|
|
38 |
- job = self._scheduler.jobs.get(name)
|
|
37 |
+ def get_operation(self, job_name):
|
|
38 |
+ try:
|
|
39 |
+ operation = self._scheduler.get_job_operation(job_name)
|
|
39 | 40 |
|
40 |
- if job is None:
|
|
41 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
41 |
+ except NotFoundError:
|
|
42 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
|
42 | 43 |
|
43 |
- else:
|
|
44 |
- return job.operation
|
|
44 |
+ return operation
|
|
45 | 45 |
|
46 | 46 |
def list_operations(self, list_filter, page_size, page_token):
|
47 | 47 |
# TODO: Pages
|
... | ... | @@ -57,34 +57,16 @@ class OperationsInstance: |
57 | 57 |
|
58 | 58 |
return response
|
59 | 59 |
|
60 |
- def delete_operation(self, name):
|
|
61 |
- try:
|
|
62 |
- self._scheduler.jobs.pop(name)
|
|
63 |
- |
|
64 |
- except KeyError:
|
|
65 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
66 |
- |
|
67 |
- def register_message_client(self, name, queue):
|
|
60 |
+ def delete_operation(self, job_name, peer):
|
|
68 | 61 |
try:
|
69 |
- self._scheduler.register_client(name, queue)
|
|
62 |
+ self._scheduler.unregister_client(job_name, peer)
|
|
70 | 63 |
|
71 |
- except KeyError:
|
|
72 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
64 |
+ except NotFoundError:
|
|
65 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
|
73 | 66 |
|
74 |
- def unregister_message_client(self, name, queue):
|
|
67 |
+ def cancel_operation(self, job_name):
|
|
75 | 68 |
try:
|
76 |
- self._scheduler.unregister_client(name, queue)
|
|
77 |
- |
|
78 |
- except KeyError:
|
|
79 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
80 |
- |
|
81 |
- def stream_operation_updates(self, message_queue, operation_name):
|
|
82 |
- operation = message_queue.get()
|
|
83 |
- while not operation.done:
|
|
84 |
- yield operation
|
|
85 |
- operation = message_queue.get()
|
|
86 |
- yield operation
|
|
69 |
+ self._scheduler.cancel_job_operation(job_name)
|
|
87 | 70 |
|
88 |
- def cancel_operation(self, name):
|
|
89 |
- # TODO: Cancel leases
|
|
90 |
- raise NotImplementedError("Cancelled operations not supported")
|
|
71 |
+ except NotFoundError:
|
|
72 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
... | ... | @@ -25,7 +25,7 @@ import grpc |
25 | 25 |
|
26 | 26 |
from google.protobuf.empty_pb2 import Empty
|
27 | 27 |
|
28 |
-from buildgrid._exceptions import InvalidArgumentError
|
|
28 |
+from buildgrid._exceptions import CancelledError, InvalidArgumentError
|
|
29 | 29 |
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
|
30 | 30 |
|
31 | 31 |
|
... | ... | @@ -93,7 +93,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
93 | 93 |
instance = self._get_instance(instance_name)
|
94 | 94 |
|
95 | 95 |
operation_name = self._parse_operation_name(name)
|
96 |
- instance.delete_operation(operation_name)
|
|
96 |
+ instance.delete_operation(operation_name, context.peer())
|
|
97 | 97 |
|
98 | 98 |
except InvalidArgumentError as e:
|
99 | 99 |
self.logger.error(e)
|
... | ... | @@ -112,10 +112,10 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
112 | 112 |
operation_name = self._parse_operation_name(name)
|
113 | 113 |
instance.cancel_operation(operation_name)
|
114 | 114 |
|
115 |
- except NotImplementedError as e:
|
|
115 |
+ except CancelledError as e:
|
|
116 | 116 |
self.logger.error(e)
|
117 | 117 |
context.set_details(str(e))
|
118 |
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
118 |
+ context.set_code(grpc.StatusCode.CANCELLED)
|
|
119 | 119 |
|
120 | 120 |
except InvalidArgumentError as e:
|
121 | 121 |
self.logger.error(e)
|
... | ... | @@ -23,7 +23,7 @@ from collections import deque |
23 | 23 |
|
24 | 24 |
from buildgrid._exceptions import NotFoundError
|
25 | 25 |
|
26 |
-from .job import OperationStage, LeaseState
|
|
26 |
+from .job import Job, OperationStage, LeaseState
|
|
27 | 27 |
|
28 | 28 |
|
29 | 29 |
class Scheduler:
|
... | ... | @@ -32,28 +32,97 @@ class Scheduler: |
32 | 32 |
|
33 | 33 |
def __init__(self, action_cache=None):
|
34 | 34 |
self._action_cache = action_cache
|
35 |
- self.jobs = {}
|
|
36 |
- self.queue = deque()
|
|
37 | 35 |
|
38 |
- def register_client(self, job_name, queue):
|
|
39 |
- self.jobs[job_name].register_client(queue)
|
|
36 |
+ self.__jobs_by_action = {}
|
|
37 |
+ self.__jobs_by_name = {}
|
|
38 |
+ self.__queue = deque()
|
|
40 | 39 |
|
41 |
- def unregister_client(self, job_name, queue):
|
|
42 |
- self.jobs[job_name].unregister_client(queue)
|
|
40 |
+ def register_client(self, job_name, peer, message_queue):
|
|
41 |
+ """Subscribes to one of the job's :class:`Operation` stage changes.
|
|
43 | 42 |
|
44 |
- if not self.jobs[job_name].n_clients and self.jobs[job_name].operation.done:
|
|
45 |
- del self.jobs[job_name]
|
|
43 |
+ Args:
|
|
44 |
+ job_name (str): name of the job subscribe to.
|
|
45 |
+ peer (str): a unique string identifying the client.
|
|
46 |
+ message_queue (queue.Queue): the event queue to register.
|
|
47 |
+ |
|
48 |
+ Raises:
|
|
49 |
+ NotFoundError: If no job with `job_name` exists.
|
|
50 |
+ """
|
|
51 |
+ try:
|
|
52 |
+ job = self.__jobs_by_name[job_name]
|
|
53 |
+ except KeyError:
|
|
54 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
55 |
+ |
|
56 |
+ job.register_client(peer, message_queue)
|
|
57 |
+ |
|
58 |
+ def unregister_client(self, job_name, peer):
|
|
59 |
+ """Unsubscribes to one of the job's :class:`Operation` stage change.
|
|
60 |
+ |
|
61 |
+ Args:
|
|
62 |
+ job_name (str): name of the job to unsubscribe from.
|
|
63 |
+ peer (str): a unique string identifying the client.
|
|
64 |
+ |
|
65 |
+ Raises:
|
|
66 |
+ NotFoundError: If no job with `job_name` exists.
|
|
67 |
+ """
|
|
68 |
+ try:
|
|
69 |
+ job = self.__jobs_by_name[job_name]
|
|
70 |
+ except KeyError:
|
|
71 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
72 |
+ |
|
73 |
+ job.unregister_client(peer)
|
|
74 |
+ |
|
75 |
+ if job.n_clients == 0 and job.operation.done:
|
|
76 |
+ del self.__jobs_by_action[job.action_digest]
|
|
77 |
+ del self.__jobs_by_name[job.name]
|
|
78 |
+ |
|
79 |
+ def queue_job(self, action, action_digest, priority=0, skip_cache_lookup=False):
|
|
80 |
+ """Inserts a newly created job into the execution queue.
|
|
81 |
+ |
|
82 |
+ Args:
|
|
83 |
+ action (Action): the given action to queue for execution.
|
|
84 |
+ action_digest (Digest): the digest of the given action.
|
|
85 |
+ priority (int): the execution job's priority.
|
|
86 |
+ skip_cache_lookup (bool): whether or not to look for pre-computed
|
|
87 |
+ result for the given action.
|
|
88 |
+ """
|
|
89 |
+ def __queue_job(queue, new_job):
|
|
90 |
+ index = 0
|
|
91 |
+ for queued_job in reversed(queue):
|
|
92 |
+ if new_job.priority < queued_job.priority:
|
|
93 |
+ index += 1
|
|
94 |
+ else:
|
|
95 |
+ break
|
|
96 |
+ |
|
97 |
+ index = len(queue) - index
|
|
98 |
+ |
|
99 |
+ queue.insert(index, new_job)
|
|
100 |
+ |
|
101 |
+ if action_digest.hash in self.__jobs_by_action:
|
|
102 |
+ job = self.__jobs_by_action[action_digest.hash]
|
|
103 |
+ |
|
104 |
+ if priority < job.priority:
|
|
105 |
+ job.priority = priority
|
|
106 |
+ |
|
107 |
+ if job in self.__queue:
|
|
108 |
+ self.__queue.remove(job)
|
|
109 |
+ __queue_job(self.__queue, job)
|
|
110 |
+ |
|
111 |
+ return job
|
|
112 |
+ |
|
113 |
+ job = Job(action, action_digest, priority=priority)
|
|
46 | 114 |
|
47 |
- def queue_job(self, job, skip_cache_lookup=False):
|
|
48 |
- self.jobs[job.name] = job
|
|
115 |
+ self.__jobs_by_action[job.action_digest.hash] = job
|
|
116 |
+ self.__jobs_by_name[job.name] = job
|
|
49 | 117 |
|
50 | 118 |
operation_stage = None
|
51 | 119 |
if self._action_cache is not None and not skip_cache_lookup:
|
52 | 120 |
try:
|
53 | 121 |
action_result = self._action_cache.get_action_result(job.action_digest)
|
122 |
+ |
|
54 | 123 |
except NotFoundError:
|
55 | 124 |
operation_stage = OperationStage.QUEUED
|
56 |
- self.queue.append(job)
|
|
125 |
+ __queue_job(self.__queue, job)
|
|
57 | 126 |
|
58 | 127 |
else:
|
59 | 128 |
job.set_cached_result(action_result)
|
... | ... | @@ -61,23 +130,25 @@ class Scheduler: |
61 | 130 |
|
62 | 131 |
else:
|
63 | 132 |
operation_stage = OperationStage.QUEUED
|
64 |
- self.queue.append(job)
|
|
133 |
+ __queue_job(self.__queue, job)
|
|
65 | 134 |
|
66 | 135 |
job.update_operation_stage(operation_stage)
|
67 | 136 |
|
137 |
+ return job
|
|
138 |
+ |
|
68 | 139 |
def retry_job(self, job_name):
|
69 |
- if job_name in self.jobs:
|
|
70 |
- job = self.jobs[job_name]
|
|
140 |
+ if job_name in self.__jobs_by_name:
|
|
141 |
+ job = self.__jobs_by_name[job_name]
|
|
71 | 142 |
if job.n_tries >= self.MAX_N_TRIES:
|
72 | 143 |
# TODO: Decide what to do with these jobs
|
73 | 144 |
job.update_operation_stage(OperationStage.COMPLETED)
|
74 | 145 |
# TODO: Mark these jobs as done
|
75 | 146 |
else:
|
76 | 147 |
job.update_operation_stage(OperationStage.QUEUED)
|
77 |
- self.queue.appendleft(job)
|
|
148 |
+ self.__queue.appendleft(job)
|
|
78 | 149 |
|
79 | 150 |
def list_jobs(self):
|
80 |
- return self.jobs.values()
|
|
151 |
+ return self.__jobs_by_name.values()
|
|
81 | 152 |
|
82 | 153 |
def request_job_leases(self, worker_capabilities):
|
83 | 154 |
"""Generates a list of the highest priority leases to be run.
|
... | ... | @@ -87,14 +158,17 @@ class Scheduler: |
87 | 158 |
worker properties, configuration and state at the time of the
|
88 | 159 |
request.
|
89 | 160 |
"""
|
90 |
- if not self.queue:
|
|
161 |
+ if not self.__queue:
|
|
91 | 162 |
return []
|
92 | 163 |
|
93 |
- job = self.queue.popleft()
|
|
164 |
+ job = self.__queue.popleft()
|
|
94 | 165 |
# For now, one lease at a time:
|
95 | 166 |
lease = job.create_lease()
|
96 | 167 |
|
97 |
- return [lease]
|
|
168 |
+ if lease:
|
|
169 |
+ return [lease]
|
|
170 |
+ |
|
171 |
+ return None
|
|
98 | 172 |
|
99 | 173 |
def update_job_lease_state(self, job_name, lease_state, lease_status=None, lease_result=None):
|
100 | 174 |
"""Requests a state transition for a job's current :class:Lease.
|
... | ... | @@ -107,7 +181,7 @@ class Scheduler: |
107 | 181 |
lease_result (google.protobuf.Any): the lease execution result, only
|
108 | 182 |
required if `lease_state` is `COMPLETED`.
|
109 | 183 |
"""
|
110 |
- job = self.jobs[job_name]
|
|
184 |
+ job = self.__jobs_by_name[job_name]
|
|
111 | 185 |
|
112 | 186 |
if lease_state == LeaseState.PENDING:
|
113 | 187 |
job.update_lease_state(LeaseState.PENDING)
|
... | ... | @@ -127,9 +201,48 @@ class Scheduler: |
127 | 201 |
job.update_operation_stage(OperationStage.COMPLETED)
|
128 | 202 |
|
129 | 203 |
def get_job_lease(self, job_name):
|
130 |
- """Returns the lease associated to job, if any have been emitted yet."""
|
|
131 |
- return self.jobs[job_name].lease
|
|
204 |
+ """Returns the lease associated to job, if any have been emitted yet.
|
|
205 |
+ |
|
206 |
+ Args:
|
|
207 |
+ job_name (str): name of the job to query.
|
|
208 |
+ |
|
209 |
+ Raises:
|
|
210 |
+ NotFoundError: If no job with `job_name` exists.
|
|
211 |
+ """
|
|
212 |
+ try:
|
|
213 |
+ job = self.__jobs_by_name[job_name]
|
|
214 |
+ except KeyError:
|
|
215 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
216 |
+ |
|
217 |
+ return job.lease
|
|
218 |
+ |
|
219 |
+ def cancel_job_operation(self, job_name):
|
|
220 |
+ """"Cancels the underlying operation of a given job.
|
|
221 |
+ |
|
222 |
+ This will also cancel any job's lease that may have been issued.
|
|
223 |
+ |
|
224 |
+ Args:
|
|
225 |
+ job_name (str): name of the job holding the operation to cancel.
|
|
226 |
+ """
|
|
227 |
+ try:
|
|
228 |
+ job = self.__jobs_by_name[job_name]
|
|
229 |
+ except KeyError:
|
|
230 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
231 |
+ |
|
232 |
+ job.cancel_operation()
|
|
132 | 233 |
|
133 | 234 |
def get_job_operation(self, job_name):
|
134 |
- """Returns the operation associated to job."""
|
|
135 |
- return self.jobs[job_name].operation
|
|
235 |
+ """Returns the operation associated to job.
|
|
236 |
+ |
|
237 |
+ Args:
|
|
238 |
+ job_name (str): name of the job to query.
|
|
239 |
+ |
|
240 |
+ Raises:
|
|
241 |
+ NotFoundError: If no job with `job_name` exists.
|
|
242 |
+ """
|
|
243 |
+ try:
|
|
244 |
+ job = self.__jobs_by_name[job_name]
|
|
245 |
+ except KeyError:
|
|
246 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
247 |
+ |
|
248 |
+ return job.operation
|
... | ... | @@ -116,7 +116,7 @@ setup( |
116 | 116 |
'protobuf',
|
117 | 117 |
'grpcio',
|
118 | 118 |
'Click',
|
119 |
- 'pyaml',
|
|
119 |
+ 'PyYAML',
|
|
120 | 120 |
'boto3 < 1.8.0',
|
121 | 121 |
'botocore < 1.11.0',
|
122 | 122 |
],
|
... | ... | @@ -26,7 +26,6 @@ import pytest |
26 | 26 |
|
27 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
28 | 28 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
29 |
-from buildgrid.server import job
|
|
30 | 29 |
from buildgrid.server.controller import ExecutionController
|
31 | 30 |
from buildgrid.server.job import LeaseState
|
32 | 31 |
from buildgrid.server.bots import service
|
... | ... | @@ -283,7 +282,8 @@ def test_post_bot_event_temp(context, instance): |
283 | 282 |
def _inject_work(scheduler, action=None, action_digest=None):
|
284 | 283 |
if not action:
|
285 | 284 |
action = remote_execution_pb2.Action()
|
285 |
+ |
|
286 | 286 |
if not action_digest:
|
287 | 287 |
action_digest = remote_execution_pb2.Digest()
|
288 |
- j = job.Job(action, action_digest)
|
|
289 |
- scheduler.queue_job(j, True)
|
|
288 |
+ |
|
289 |
+ scheduler.queue_job(action, action_digest, skip_cache_lookup=True)
|
... | ... | @@ -106,13 +106,13 @@ def test_no_action_digest_in_storage(instance, context): |
106 | 106 |
|
107 | 107 |
|
108 | 108 |
def test_wait_execution(instance, controller, context):
|
109 |
- j = job.Job(action, action_digest)
|
|
109 |
+ j = controller.execution_instance._scheduler.queue_job(action,
|
|
110 |
+ action_digest,
|
|
111 |
+ skip_cache_lookup=True)
|
|
110 | 112 |
j._operation.done = True
|
111 | 113 |
|
112 | 114 |
request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
|
113 | 115 |
|
114 |
- controller.execution_instance._scheduler.jobs[j.name] = j
|
|
115 |
- |
|
116 | 116 |
action_result_any = any_pb2.Any()
|
117 | 117 |
action_result = remote_execution_pb2.ActionResult()
|
118 | 118 |
action_result_any.Pack(action_result)
|
... | ... | @@ -24,6 +24,7 @@ import grpc |
24 | 24 |
from grpc._server import _Context
|
25 | 25 |
import pytest
|
26 | 26 |
|
27 |
+from buildgrid._enums import OperationStage
|
|
27 | 28 |
from buildgrid._exceptions import InvalidArgumentError
|
28 | 29 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
29 | 30 |
from buildgrid._protos.google.longrunning import operations_pb2
|
... | ... | @@ -173,7 +174,7 @@ def test_list_operations_with_result(instance, controller, execute_request, cont |
173 | 174 |
output_file = remote_execution_pb2.OutputFile(path='unicorn')
|
174 | 175 |
action_result.output_files.extend([output_file])
|
175 | 176 |
|
176 |
- controller.operations_instance._scheduler.jobs[response_execute.name].create_lease()
|
|
177 |
+ controller.operations_instance._scheduler.request_job_leases({})
|
|
177 | 178 |
controller.operations_instance._scheduler.update_job_lease_state(response_execute.name,
|
178 | 179 |
LeaseState.COMPLETED,
|
179 | 180 |
lease_status=status_pb2.Status(),
|
... | ... | @@ -236,12 +237,26 @@ def test_delete_operation_fail(instance, context): |
236 | 237 |
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
237 | 238 |
|
238 | 239 |
|
239 |
-def test_cancel_operation(instance, context):
|
|
240 |
+def test_cancel_operation(instance, controller, execute_request, context):
|
|
241 |
+ response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
242 |
+ execute_request.skip_cache_lookup)
|
|
243 |
+ |
|
240 | 244 |
request = operations_pb2.CancelOperationRequest()
|
241 |
- request.name = "{}/{}".format(instance_name, "runner")
|
|
245 |
+ request.name = "{}/{}".format(instance_name, response_execute.name)
|
|
246 |
+ |
|
242 | 247 |
instance.CancelOperation(request, context)
|
243 | 248 |
|
244 |
- context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
|
249 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.CANCELLED)
|
|
250 |
+ |
|
251 |
+ request = operations_pb2.ListOperationsRequest(name=instance_name)
|
|
252 |
+ response = instance.ListOperations(request, context)
|
|
253 |
+ |
|
254 |
+ assert len(response.operations) is 1
|
|
255 |
+ |
|
256 |
+ for operation in response.operations:
|
|
257 |
+ operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
|
258 |
+ operation.metadata.Unpack(operation_metadata)
|
|
259 |
+ assert operation_metadata.stage == OperationStage.COMPLETED.value
|
|
245 | 260 |
|
246 | 261 |
|
247 | 262 |
def test_cancel_operation_blank(blank_instance, context):
|
... | ... | @@ -249,7 +264,7 @@ def test_cancel_operation_blank(blank_instance, context): |
249 | 264 |
request.name = "runner"
|
250 | 265 |
blank_instance.CancelOperation(request, context)
|
251 | 266 |
|
252 |
- context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
|
267 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
|
253 | 268 |
|
254 | 269 |
|
255 | 270 |
def test_cancel_operation_instance_fail(instance, context):
|