finnball pushed to branch finn/async at BuildGrid / buildgrid
Commits:
-
01c6fbc3
by finn at 2018-08-02T11:54:47Z
4 changed files:
- buildgrid/server/execution/execution_instance.py
- buildgrid/server/execution/execution_service.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
Changes:
... | ... | @@ -34,12 +34,12 @@ class ExecutionInstance(): |
34 | 34 |
self.logger = logging.getLogger(__name__)
|
35 | 35 |
self._scheduler = scheduler
|
36 | 36 |
|
37 |
- def execute(self, action_digest, skip_cache_lookup):
|
|
37 |
+ def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
|
38 | 38 |
""" Sends a job for execution.
|
39 | 39 |
Queues an action and creates an Operation instance to be associated with
|
40 | 40 |
this action.
|
41 | 41 |
"""
|
42 |
- job = Job(action_digest)
|
|
42 |
+ job = Job(action_digest, message_queue)
|
|
43 | 43 |
self.logger.info("Operation name: {}".format(job.name))
|
44 | 44 |
|
45 | 45 |
if not skip_cache_lookup:
|
... | ... | @@ -70,3 +70,15 @@ class ExecutionInstance(): |
70 | 70 |
def cancel_operation(self, name):
|
71 | 71 |
# TODO: Cancel leases
|
72 | 72 |
raise NotImplementedError("Cancelled operations not supported")
|
73 |
+ |
|
74 |
+ def register_message_client(self, name, queue):
|
|
75 |
+ try:
|
|
76 |
+ self._scheduler.register_client(name, queue)
|
|
77 |
+ except KeyError:
|
|
78 |
+ raise InvalidArgumentError("Operation name does not exist: {}".format(name))
|
|
79 |
+ |
|
80 |
+ def unregister_message_client(self, name, queue):
|
|
81 |
+ try:
|
|
82 |
+ self._scheduler.unregister_client(name, queue)
|
|
83 |
+ except KeyError:
|
|
84 |
+ raise InvalidArgumentError("Operation name does not exist: {}".format(name))
|
... | ... | @@ -25,6 +25,7 @@ Serves remote execution requests. |
25 | 25 |
import copy
|
26 | 26 |
import grpc
|
27 | 27 |
import logging
|
28 |
+import queue
|
|
28 | 29 |
import time
|
29 | 30 |
|
30 | 31 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
... | ... | @@ -35,21 +36,27 @@ from ._exceptions import InvalidArgumentError |
35 | 36 |
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
|
36 | 37 |
|
37 | 38 |
def __init__(self, instance):
|
38 |
- self._instance = instance
|
|
39 | 39 |
self.logger = logging.getLogger(__name__)
|
40 |
+ self._instance = instance
|
|
40 | 41 |
|
41 | 42 |
def Execute(self, request, context):
|
42 | 43 |
# Ignore request.instance_name for now
|
43 | 44 |
# Have only one instance
|
44 | 45 |
try:
|
46 |
+ message_queue = queue.Queue()
|
|
45 | 47 |
operation = self._instance.execute(request.action_digest,
|
46 |
- request.skip_cache_lookup)
|
|
48 |
+ request.skip_cache_lookup,
|
|
49 |
+ message_queue)
|
|
47 | 50 |
|
48 |
- yield from self._stream_operation_updates(operation.name)
|
|
51 |
+ remove_client = lambda : self._remove_client(operation.name, message_queue)
|
|
52 |
+ context.add_callback(remove_client)
|
|
53 |
+ |
|
54 |
+ yield from self._stream_operation_updates(message_queue,
|
|
55 |
+ operation.name)
|
|
49 | 56 |
|
50 | 57 |
except InvalidArgumentError as e:
|
51 | 58 |
self.logger.error(e)
|
52 |
- context.set_details(str(e))
|
|
59 |
+ context.set_details(sxtr(e))
|
|
53 | 60 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
54 | 61 |
|
55 | 62 |
except NotImplementedError as e:
|
... | ... | @@ -59,19 +66,28 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
59 | 66 |
|
60 | 67 |
def WaitExecution(self, request, context):
|
61 | 68 |
try:
|
62 |
- yield from self._stream_operation_updates(request.name)
|
|
69 |
+ message_queue = queue.Queue()
|
|
70 |
+ operation_name = request.name
|
|
71 |
+ |
|
72 |
+ self._instance.register_message_client(operation_name, message_queue)
|
|
73 |
+ |
|
74 |
+ remove_client = lambda : self._remove_client(operation_name, message_queue)
|
|
75 |
+ context.add_callback(remove_client)
|
|
76 |
+ |
|
77 |
+ yield from self._stream_operation_updates(message_queue,
|
|
78 |
+ operation_name)
|
|
63 | 79 |
|
64 | 80 |
except InvalidArgumentError as e:
|
65 | 81 |
self.logger.error(e)
|
66 | 82 |
context.set_details(str(e))
|
67 | 83 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
68 | 84 |
|
69 |
- def _stream_operation_updates(self, name):
|
|
70 |
- stream_previous = None
|
|
71 |
- while True:
|
|
72 |
- stream = self._instance.get_operation(name)
|
|
73 |
- if stream != stream_previous:
|
|
74 |
- yield stream
|
|
75 |
- if stream.done == True: break
|
|
76 |
- stream_previous = copy.deepcopy(stream)
|
|
77 |
- time.sleep(1)
|
|
85 |
+ def _remove_client(self, operation_name, message_queue):
|
|
86 |
+ self._instance.unregister_message_client(operation_name, message_queue)
|
|
87 |
+ |
|
88 |
+ def _stream_operation_updates(self, message_queue, operation_name):
|
|
89 |
+ operation = message_queue.get()
|
|
90 |
+ while not operation.done:
|
|
91 |
+ yield operation
|
|
92 |
+ operation = message_queue.get()
|
|
93 |
+ yield operation
|
... | ... | @@ -51,9 +51,8 @@ class LeaseState(Enum): |
51 | 51 |
|
52 | 52 |
class Job():
|
53 | 53 |
|
54 |
- def __init__(self, action):
|
|
55 |
- self.action = action
|
|
56 |
- self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
|
|
54 |
+ def __init__(self, action_digest, message_queue=None):
|
|
55 |
+ self.action_digest = action_digest
|
|
57 | 56 |
self.execute_stage = ExecuteStage.UNKNOWN
|
58 | 57 |
self.lease = None
|
59 | 58 |
self.logger = logging.getLogger(__name__)
|
... | ... | @@ -62,10 +61,19 @@ class Job(): |
62 | 61 |
|
63 | 62 |
self._n_tries = 0
|
64 | 63 |
self._operation = operations_pb2.Operation(name = self.name)
|
64 |
+ self._operation_update_queues = []
|
|
65 |
+ |
|
66 |
+ if message_queue is not None:
|
|
67 |
+ self.register_client(message_queue)
|
|
68 |
+ |
|
69 |
+ def register_client(self, queue):
|
|
70 |
+ self._operation_update_queues.append(queue)
|
|
71 |
+ |
|
72 |
+ def unregister_client(self, queue):
|
|
73 |
+ self._operation_update_queues.remove(queue)
|
|
65 | 74 |
|
66 | 75 |
def get_operation(self):
|
67 | 76 |
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
|
68 |
- |
|
69 | 77 |
if self.result is not None:
|
70 | 78 |
self._operation.done = True
|
71 | 79 |
response = ExecuteResponse()
|
... | ... | @@ -81,10 +89,10 @@ class Job(): |
81 | 89 |
return meta
|
82 | 90 |
|
83 | 91 |
def create_lease(self):
|
84 |
- action = self._pack_any(self.action)
|
|
92 |
+ action_digest = self._pack_any(self.action_digest)
|
|
85 | 93 |
|
86 | 94 |
lease = bots_pb2.Lease(id = self.name,
|
87 |
- payload = action,
|
|
95 |
+ payload = action_digest,
|
|
88 | 96 |
state = LeaseState.PENDING.value)
|
89 | 97 |
self.lease = lease
|
90 | 98 |
return lease
|
... | ... | @@ -92,6 +100,11 @@ class Job(): |
92 | 100 |
def get_operations(self):
|
93 | 101 |
return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
|
94 | 102 |
|
103 |
+ def update_execute_stage(self, stage):
|
|
104 |
+ self.execute_stage = stage
|
|
105 |
+ for queue in self._operation_update_queues:
|
|
106 |
+ queue.put(self.get_operation())
|
|
107 |
+ |
|
95 | 108 |
def _pack_any(self, pack):
|
96 | 109 |
any = any_pb2.Any()
|
97 | 110 |
any.Pack(pack)
|
... | ... | @@ -35,8 +35,14 @@ class Scheduler(): |
35 | 35 |
self.jobs = {}
|
36 | 36 |
self.queue = deque()
|
37 | 37 |
|
38 |
+ def register_client(self, name, queue):
|
|
39 |
+ self.jobs[name].register_client(queue)
|
|
40 |
+ |
|
41 |
+ def unregister_client(self, name, queue):
|
|
42 |
+ self.jobs[name].unregister_client(queue)
|
|
43 |
+ |
|
38 | 44 |
def append_job(self, job):
|
39 |
- job.execute_stage = ExecuteStage.QUEUED
|
|
45 |
+ job.update_execute_stage(ExecuteStage.QUEUED)
|
|
40 | 46 |
self.jobs[job.name] = job
|
41 | 47 |
self.queue.append(job)
|
42 | 48 |
|
... | ... | @@ -45,9 +51,9 @@ class Scheduler(): |
45 | 51 |
|
46 | 52 |
if job.n_tries >= self.MAX_N_TRIES:
|
47 | 53 |
# TODO: Decide what to do with these jobs
|
48 |
- job.execute_stage = ExecuteStage.COMPLETED
|
|
54 |
+ job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
49 | 55 |
else:
|
50 |
- job.execute_stage = ExecuteStage.QUEUED
|
|
56 |
+ job.update_execute_stage(ExecuteStage.QUEUED)
|
|
51 | 57 |
job.n_tries += 1
|
52 | 58 |
self.queue.appendleft(job)
|
53 | 59 |
|
... | ... | @@ -56,15 +62,14 @@ class Scheduler(): |
56 | 62 |
def create_job(self):
|
57 | 63 |
if len(self.queue) > 0:
|
58 | 64 |
job = self.queue.popleft()
|
59 |
- job.execute_stage = ExecuteStage.EXECUTING
|
|
65 |
+ job.update_execute_stage(ExecuteStage.EXECUTING)
|
|
60 | 66 |
self.jobs[job.name] = job
|
61 | 67 |
return job
|
62 |
- return None
|
|
63 | 68 |
|
64 | 69 |
def job_complete(self, name, result):
|
65 | 70 |
job = self.jobs[name]
|
66 |
- job.execute_stage = ExecuteStage.COMPLETED
|
|
67 | 71 |
job.result = result
|
72 |
+ job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
68 | 73 |
self.jobs[name] = job
|
69 | 74 |
|
70 | 75 |
def get_operations(self):
|
... | ... | @@ -122,3 +127,7 @@ class Scheduler(): |
122 | 127 |
if state == LeaseState.PENDING.value or \
|
123 | 128 |
state == LeaseState.ACTIVE.value:
|
124 | 129 |
self.retry_job(name)
|
130 |
+ |
|
131 |
+ def _update_execute_stage(self, job, stage):
|
|
132 |
+ job.update_execute_stage(stage)
|
|
133 |
+ return job
|