finnball pushed to branch finn/async at BuildGrid / buildgrid
Commits:
-
3ce3c083
by finn at 2018-08-02T08:16:53Z
4 changed files:
- buildgrid/server/execution/execution_instance.py
- buildgrid/server/execution/execution_service.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
Changes:
| ... | ... | @@ -34,12 +34,12 @@ class ExecutionInstance(): |
| 34 | 34 |
self.logger = logging.getLogger(__name__)
|
| 35 | 35 |
self._scheduler = scheduler
|
| 36 | 36 |
|
| 37 |
- def execute(self, action_digest, skip_cache_lookup):
|
|
| 37 |
+ def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
|
| 38 | 38 |
""" Sends a job for execution.
|
| 39 | 39 |
Queues an action and creates an Operation instance to be associated with
|
| 40 | 40 |
this action.
|
| 41 | 41 |
"""
|
| 42 |
- job = Job(action_digest)
|
|
| 42 |
+ job = Job(action_digest, message_queue)
|
|
| 43 | 43 |
self.logger.info("Operation name: {}".format(job.name))
|
| 44 | 44 |
|
| 45 | 45 |
if not skip_cache_lookup:
|
| ... | ... | @@ -70,3 +70,9 @@ class ExecutionInstance(): |
| 70 | 70 |
def cancel_operation(self, name):
|
| 71 | 71 |
# TODO: Cancel leases
|
| 72 | 72 |
raise NotImplementedError("Cancelled operations not supported")
|
| 73 |
+ |
|
| 74 |
+ def register_message_client(self, name, queue):
|
|
| 75 |
+ try:
|
|
| 76 |
+ self._scheduler.register_client(name, queue)
|
|
| 77 |
+ except KeyError:
|
|
| 78 |
+ raise InvalidArgumentError("Operation name does not exist: {}".format(name))
|
| ... | ... | @@ -25,6 +25,7 @@ Serves remote execution requests. |
| 25 | 25 |
import copy
|
| 26 | 26 |
import grpc
|
| 27 | 27 |
import logging
|
| 28 |
+import queue
|
|
| 28 | 29 |
import time
|
| 29 | 30 |
|
| 30 | 31 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
| ... | ... | @@ -42,10 +43,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 42 | 43 |
# Ignore request.instance_name for now
|
| 43 | 44 |
# Have only one instance
|
| 44 | 45 |
try:
|
| 45 |
- operation = self._instance.execute(request.action_digest,
|
|
| 46 |
- request.skip_cache_lookup)
|
|
| 47 |
- |
|
| 48 |
- yield from self._stream_operation_updates(operation.name)
|
|
| 46 |
+ message_queue = queue.Queue()
|
|
| 47 |
+ self._instance.execute(request.action_digest,
|
|
| 48 |
+ request.skip_cache_lookup,
|
|
| 49 |
+ message_queue)
|
|
| 50 |
+ yield from self._stream_operation_updates(message_queue)
|
|
| 49 | 51 |
|
| 50 | 52 |
except InvalidArgumentError as e:
|
| 51 | 53 |
self.logger.error(e)
|
| ... | ... | @@ -59,19 +61,18 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 59 | 61 |
|
| 60 | 62 |
def WaitExecution(self, request, context):
|
| 61 | 63 |
try:
|
| 62 |
- yield from self._stream_operation_updates(request.name)
|
|
| 64 |
+ message_queue = queue.Queue()
|
|
| 65 |
+ self._instance.register_message_client(request.name, message_queue)
|
|
| 66 |
+ yield from self._stream_operation_updates(message_queue)
|
|
| 63 | 67 |
|
| 64 | 68 |
except InvalidArgumentError as e:
|
| 65 | 69 |
self.logger.error(e)
|
| 66 | 70 |
context.set_details(str(e))
|
| 67 | 71 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
| 68 | 72 |
|
| 69 |
- def _stream_operation_updates(self, name):
|
|
| 70 |
- stream_previous = None
|
|
| 71 |
- while True:
|
|
| 72 |
- stream = self._instance.get_operation(name)
|
|
| 73 |
- if stream != stream_previous:
|
|
| 74 |
- yield stream
|
|
| 75 |
- if stream.done == True: break
|
|
| 76 |
- stream_previous = copy.deepcopy(stream)
|
|
| 77 |
- time.sleep(1)
|
|
| 73 |
+ def _stream_operation_updates(self, queue):
|
|
| 74 |
+ operation = queue.get()
|
|
| 75 |
+ while not operation.done:
|
|
| 76 |
+ yield operation
|
|
| 77 |
+ operation = queue.get()
|
|
| 78 |
+ yield operation
|
| ... | ... | @@ -51,9 +51,8 @@ class LeaseState(Enum): |
| 51 | 51 |
|
| 52 | 52 |
class Job():
|
| 53 | 53 |
|
| 54 |
- def __init__(self, action):
|
|
| 55 |
- self.action = action
|
|
| 56 |
- self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
|
|
| 54 |
+ def __init__(self, action_digest, message_queue=None):
|
|
| 55 |
+ self.action_digest = action_digest
|
|
| 57 | 56 |
self.execute_stage = ExecuteStage.UNKNOWN
|
| 58 | 57 |
self.lease = None
|
| 59 | 58 |
self.logger = logging.getLogger(__name__)
|
| ... | ... | @@ -62,10 +61,16 @@ class Job(): |
| 62 | 61 |
|
| 63 | 62 |
self._n_tries = 0
|
| 64 | 63 |
self._operation = operations_pb2.Operation(name = self.name)
|
| 64 |
+ self._operation_update_queues = []
|
|
| 65 |
+ |
|
| 66 |
+ if message_queue is not None:
|
|
| 67 |
+ self.register_client(message_queue)
|
|
| 68 |
+ |
|
| 69 |
+ def register_client(self, queue):
|
|
| 70 |
+ self._operation_update_queues.append(queue)
|
|
| 65 | 71 |
|
| 66 | 72 |
def get_operation(self):
|
| 67 | 73 |
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
|
| 68 |
- |
|
| 69 | 74 |
if self.result is not None:
|
| 70 | 75 |
self._operation.done = True
|
| 71 | 76 |
response = ExecuteResponse()
|
| ... | ... | @@ -81,10 +86,10 @@ class Job(): |
| 81 | 86 |
return meta
|
| 82 | 87 |
|
| 83 | 88 |
def create_lease(self):
|
| 84 |
- action = self._pack_any(self.action)
|
|
| 89 |
+ action_digest = self._pack_any(self.action_digest)
|
|
| 85 | 90 |
|
| 86 | 91 |
lease = bots_pb2.Lease(id = self.name,
|
| 87 |
- payload = action,
|
|
| 92 |
+ payload = action_digest,
|
|
| 88 | 93 |
state = LeaseState.PENDING.value)
|
| 89 | 94 |
self.lease = lease
|
| 90 | 95 |
return lease
|
| ... | ... | @@ -92,6 +97,11 @@ class Job(): |
| 92 | 97 |
def get_operations(self):
|
| 93 | 98 |
return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
|
| 94 | 99 |
|
| 100 |
+ def update_execute_stage(self, stage):
|
|
| 101 |
+ self.execute_stage = stage
|
|
| 102 |
+ for queue in self._operation_update_queues:
|
|
| 103 |
+ queue.put(self.get_operation())
|
|
| 104 |
+ |
|
| 95 | 105 |
def _pack_any(self, pack):
|
| 96 | 106 |
any = any_pb2.Any()
|
| 97 | 107 |
any.Pack(pack)
|
| ... | ... | @@ -35,8 +35,11 @@ class Scheduler(): |
| 35 | 35 |
self.jobs = {}
|
| 36 | 36 |
self.queue = deque()
|
| 37 | 37 |
|
| 38 |
+ def register_client(self, name, queue):
|
|
| 39 |
+ self.jobs[name].register_client(queue)
|
|
| 40 |
+ |
|
| 38 | 41 |
def append_job(self, job):
|
| 39 |
- job.execute_stage = ExecuteStage.QUEUED
|
|
| 42 |
+ job.update_execute_stage(ExecuteStage.QUEUED)
|
|
| 40 | 43 |
self.jobs[job.name] = job
|
| 41 | 44 |
self.queue.append(job)
|
| 42 | 45 |
|
| ... | ... | @@ -45,9 +48,9 @@ class Scheduler(): |
| 45 | 48 |
|
| 46 | 49 |
if job.n_tries >= self.MAX_N_TRIES:
|
| 47 | 50 |
# TODO: Decide what to do with these jobs
|
| 48 |
- job.execute_stage = ExecuteStage.COMPLETED
|
|
| 51 |
+ job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
| 49 | 52 |
else:
|
| 50 |
- job.execute_stage = ExecuteStage.QUEUED
|
|
| 53 |
+ job.update_execute_stage(ExecuteStage.QUEUED)
|
|
| 51 | 54 |
job.n_tries += 1
|
| 52 | 55 |
self.queue.appendleft(job)
|
| 53 | 56 |
|
| ... | ... | @@ -56,15 +59,14 @@ class Scheduler(): |
| 56 | 59 |
def create_job(self):
|
| 57 | 60 |
if len(self.queue) > 0:
|
| 58 | 61 |
job = self.queue.popleft()
|
| 59 |
- job.execute_stage = ExecuteStage.EXECUTING
|
|
| 62 |
+ job.update_execute_stage(ExecuteStage.EXECUTING)
|
|
| 60 | 63 |
self.jobs[job.name] = job
|
| 61 | 64 |
return job
|
| 62 |
- return None
|
|
| 63 | 65 |
|
| 64 | 66 |
def job_complete(self, name, result):
|
| 65 | 67 |
job = self.jobs[name]
|
| 66 |
- job.execute_stage = ExecuteStage.COMPLETED
|
|
| 67 | 68 |
job.result = result
|
| 69 |
+ job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
| 68 | 70 |
self.jobs[name] = job
|
| 69 | 71 |
|
| 70 | 72 |
def get_operations(self):
|
| ... | ... | @@ -122,3 +124,7 @@ class Scheduler(): |
| 122 | 124 |
if state == LeaseState.PENDING.value or \
|
| 123 | 125 |
state == LeaseState.ACTIVE.value:
|
| 124 | 126 |
self.retry_job(name)
|
| 127 |
+ |
|
| 128 |
+ def _update_execute_stage(self, job, stage):
|
|
| 129 |
+ job.update_execute_stage(stage)
|
|
| 130 |
+ return job
|
