finnball pushed to branch finn/async at BuildGrid / buildgrid
Commits:
-
4c708a2d
by finn at 2018-08-03T08:14:52Z
5 changed files:
- buildgrid/server/execution/execution_instance.py
- buildgrid/server/execution/execution_service.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- tests/integration/execution_service.py
Changes:
| ... | ... | @@ -34,12 +34,12 @@ class ExecutionInstance(): |
| 34 | 34 |
self.logger = logging.getLogger(__name__)
|
| 35 | 35 |
self._scheduler = scheduler
|
| 36 | 36 |
|
| 37 |
- def execute(self, action_digest, skip_cache_lookup):
|
|
| 37 |
+ def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
|
| 38 | 38 |
""" Sends a job for execution.
|
| 39 | 39 |
Queues an action and creates an Operation instance to be associated with
|
| 40 | 40 |
this action.
|
| 41 | 41 |
"""
|
| 42 |
- job = Job(action_digest)
|
|
| 42 |
+ job = Job(action_digest, message_queue)
|
|
| 43 | 43 |
self.logger.info("Operation name: {}".format(job.name))
|
| 44 | 44 |
|
| 45 | 45 |
if not skip_cache_lookup:
|
| ... | ... | @@ -70,3 +70,15 @@ class ExecutionInstance(): |
| 70 | 70 |
def cancel_operation(self, name):
|
| 71 | 71 |
# TODO: Cancel leases
|
| 72 | 72 |
raise NotImplementedError("Cancelled operations not supported")
|
| 73 |
+ |
|
| 74 |
+ def register_message_client(self, name, queue):
|
|
| 75 |
+ try:
|
|
| 76 |
+ self._scheduler.register_client(name, queue)
|
|
| 77 |
+ except KeyError:
|
|
| 78 |
+ raise InvalidArgumentError("Operation name does not exist: {}".format(name))
|
|
| 79 |
+ |
|
| 80 |
+ def unregister_message_client(self, name, queue):
|
|
| 81 |
+ try:
|
|
| 82 |
+ self._scheduler.unregister_client(name, queue)
|
|
| 83 |
+ except KeyError:
|
|
| 84 |
+ raise InvalidArgumentError("Operation name does not exist: {}".format(name))
|
| ... | ... | @@ -22,10 +22,9 @@ ExecutionService |
| 22 | 22 |
Serves remote execution requests.
|
| 23 | 23 |
"""
|
| 24 | 24 |
|
| 25 |
-import copy
|
|
| 26 | 25 |
import grpc
|
| 27 | 26 |
import logging
|
| 28 |
-import time
|
|
| 27 |
+import queue
|
|
| 29 | 28 |
|
| 30 | 29 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
| 31 | 30 |
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
|
| ... | ... | @@ -35,17 +34,23 @@ from ._exceptions import InvalidArgumentError |
| 35 | 34 |
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
|
| 36 | 35 |
|
| 37 | 36 |
def __init__(self, instance):
|
| 38 |
- self._instance = instance
|
|
| 39 | 37 |
self.logger = logging.getLogger(__name__)
|
| 38 |
+ self._instance = instance
|
|
| 40 | 39 |
|
| 41 | 40 |
def Execute(self, request, context):
|
| 42 | 41 |
# Ignore request.instance_name for now
|
| 43 | 42 |
# Have only one instance
|
| 44 | 43 |
try:
|
| 44 |
+ message_queue = queue.Queue()
|
|
| 45 | 45 |
operation = self._instance.execute(request.action_digest,
|
| 46 |
- request.skip_cache_lookup)
|
|
| 46 |
+ request.skip_cache_lookup,
|
|
| 47 |
+ message_queue)
|
|
| 47 | 48 |
|
| 48 |
- yield from self._stream_operation_updates(operation.name)
|
|
| 49 |
+ remove_client = lambda : self._remove_client(operation.name, message_queue)
|
|
| 50 |
+ context.add_callback(remove_client)
|
|
| 51 |
+ |
|
| 52 |
+ yield from self._stream_operation_updates(message_queue,
|
|
| 53 |
+ operation.name)
|
|
| 49 | 54 |
|
| 50 | 55 |
except InvalidArgumentError as e:
|
| 51 | 56 |
self.logger.error(e)
|
| ... | ... | @@ -59,19 +64,28 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 59 | 64 |
|
| 60 | 65 |
def WaitExecution(self, request, context):
|
| 61 | 66 |
try:
|
| 62 |
- yield from self._stream_operation_updates(request.name)
|
|
| 67 |
+ message_queue = queue.Queue()
|
|
| 68 |
+ operation_name = request.name
|
|
| 69 |
+ |
|
| 70 |
+ self._instance.register_message_client(operation_name, message_queue)
|
|
| 71 |
+ |
|
| 72 |
+ remove_client = lambda : self._remove_client(operation_name, message_queue)
|
|
| 73 |
+ context.add_callback(remove_client)
|
|
| 74 |
+ |
|
| 75 |
+ yield from self._stream_operation_updates(message_queue,
|
|
| 76 |
+ operation_name)
|
|
| 63 | 77 |
|
| 64 | 78 |
except InvalidArgumentError as e:
|
| 65 | 79 |
self.logger.error(e)
|
| 66 | 80 |
context.set_details(str(e))
|
| 67 | 81 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
| 68 | 82 |
|
| 69 |
- def _stream_operation_updates(self, name):
|
|
| 70 |
- stream_previous = None
|
|
| 71 |
- while True:
|
|
| 72 |
- stream = self._instance.get_operation(name)
|
|
| 73 |
- if stream != stream_previous:
|
|
| 74 |
- yield stream
|
|
| 75 |
- if stream.done == True: break
|
|
| 76 |
- stream_previous = copy.deepcopy(stream)
|
|
| 77 |
- time.sleep(1)
|
|
| 83 |
+ def _remove_client(self, operation_name, message_queue):
|
|
| 84 |
+ self._instance.unregister_message_client(operation_name, message_queue)
|
|
| 85 |
+ |
|
| 86 |
+ def _stream_operation_updates(self, message_queue, operation_name):
|
|
| 87 |
+ operation = message_queue.get()
|
|
| 88 |
+ while not operation.done:
|
|
| 89 |
+ yield operation
|
|
| 90 |
+ operation = message_queue.get()
|
|
| 91 |
+ yield operation
|
| ... | ... | @@ -51,9 +51,8 @@ class LeaseState(Enum): |
| 51 | 51 |
|
| 52 | 52 |
class Job():
|
| 53 | 53 |
|
| 54 |
- def __init__(self, action):
|
|
| 55 |
- self.action = action
|
|
| 56 |
- self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
|
|
| 54 |
+ def __init__(self, action_digest, message_queue=None):
|
|
| 55 |
+ self.action_digest = action_digest
|
|
| 57 | 56 |
self.execute_stage = ExecuteStage.UNKNOWN
|
| 58 | 57 |
self.lease = None
|
| 59 | 58 |
self.logger = logging.getLogger(__name__)
|
| ... | ... | @@ -62,10 +61,24 @@ class Job(): |
| 62 | 61 |
|
| 63 | 62 |
self._n_tries = 0
|
| 64 | 63 |
self._operation = operations_pb2.Operation(name = self.name)
|
| 64 |
+ self._operation_update_queues = []
|
|
| 65 |
+ |
|
| 66 |
+ if message_queue is not None:
|
|
| 67 |
+ self.register_client(message_queue)
|
|
| 68 |
+ |
|
| 69 |
+ def check_job_finished(self):
|
|
| 70 |
+ if not self._operation_update_queues:
|
|
| 71 |
+ return self._operation.done
|
|
| 72 |
+ return False
|
|
| 73 |
+ |
|
| 74 |
+ def register_client(self, queue):
|
|
| 75 |
+ self._operation_update_queues.append(queue)
|
|
| 76 |
+ |
|
| 77 |
+ def unregister_client(self, queue):
|
|
| 78 |
+ self._operation_update_queues.remove(queue)
|
|
| 65 | 79 |
|
| 66 | 80 |
def get_operation(self):
|
| 67 | 81 |
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
|
| 68 |
- |
|
| 69 | 82 |
if self.result is not None:
|
| 70 | 83 |
self._operation.done = True
|
| 71 | 84 |
response = ExecuteResponse()
|
| ... | ... | @@ -81,10 +94,10 @@ class Job(): |
| 81 | 94 |
return meta
|
| 82 | 95 |
|
| 83 | 96 |
def create_lease(self):
|
| 84 |
- action = self._pack_any(self.action)
|
|
| 97 |
+ action_digest = self._pack_any(self.action_digest)
|
|
| 85 | 98 |
|
| 86 | 99 |
lease = bots_pb2.Lease(id = self.name,
|
| 87 |
- payload = action,
|
|
| 100 |
+ payload = action_digest,
|
|
| 88 | 101 |
state = LeaseState.PENDING.value)
|
| 89 | 102 |
self.lease = lease
|
| 90 | 103 |
return lease
|
| ... | ... | @@ -92,6 +105,11 @@ class Job(): |
| 92 | 105 |
def get_operations(self):
|
| 93 | 106 |
return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
|
| 94 | 107 |
|
| 108 |
+ def update_execute_stage(self, stage):
|
|
| 109 |
+ self.execute_stage = stage
|
|
| 110 |
+ for queue in self._operation_update_queues:
|
|
| 111 |
+ queue.put(self.get_operation())
|
|
| 112 |
+ |
|
| 95 | 113 |
def _pack_any(self, pack):
|
| 96 | 114 |
any = any_pb2.Any()
|
| 97 | 115 |
any.Pack(pack)
|
| ... | ... | @@ -35,8 +35,17 @@ class Scheduler(): |
| 35 | 35 |
self.jobs = {}
|
| 36 | 36 |
self.queue = deque()
|
| 37 | 37 |
|
| 38 |
+ def register_client(self, name, queue):
|
|
| 39 |
+ self.jobs[name].register_client(queue)
|
|
| 40 |
+ |
|
| 41 |
+ def unregister_client(self, name, queue):
|
|
| 42 |
+ job = self.jobs[name]
|
|
| 43 |
+ job.unregister_client(queue)
|
|
| 44 |
+ if job.check_job_finished():
|
|
| 45 |
+ del self.jobs[name]
|
|
| 46 |
+ |
|
| 38 | 47 |
def append_job(self, job):
|
| 39 |
- job.execute_stage = ExecuteStage.QUEUED
|
|
| 48 |
+ job.update_execute_stage(ExecuteStage.QUEUED)
|
|
| 40 | 49 |
self.jobs[job.name] = job
|
| 41 | 50 |
self.queue.append(job)
|
| 42 | 51 |
|
| ... | ... | @@ -45,9 +54,9 @@ class Scheduler(): |
| 45 | 54 |
|
| 46 | 55 |
if job.n_tries >= self.MAX_N_TRIES:
|
| 47 | 56 |
# TODO: Decide what to do with these jobs
|
| 48 |
- job.execute_stage = ExecuteStage.COMPLETED
|
|
| 57 |
+ job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
| 49 | 58 |
else:
|
| 50 |
- job.execute_stage = ExecuteStage.QUEUED
|
|
| 59 |
+ job.update_execute_stage(ExecuteStage.QUEUED)
|
|
| 51 | 60 |
job.n_tries += 1
|
| 52 | 61 |
self.queue.appendleft(job)
|
| 53 | 62 |
|
| ... | ... | @@ -56,15 +65,14 @@ class Scheduler(): |
| 56 | 65 |
def create_job(self):
|
| 57 | 66 |
if len(self.queue) > 0:
|
| 58 | 67 |
job = self.queue.popleft()
|
| 59 |
- job.execute_stage = ExecuteStage.EXECUTING
|
|
| 68 |
+ job.update_execute_stage(ExecuteStage.EXECUTING)
|
|
| 60 | 69 |
self.jobs[job.name] = job
|
| 61 | 70 |
return job
|
| 62 |
- return None
|
|
| 63 | 71 |
|
| 64 | 72 |
def job_complete(self, name, result):
|
| 65 | 73 |
job = self.jobs[name]
|
| 66 |
- job.execute_stage = ExecuteStage.COMPLETED
|
|
| 67 | 74 |
job.result = result
|
| 75 |
+ job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
| 68 | 76 |
self.jobs[name] = job
|
| 69 | 77 |
|
| 70 | 78 |
def get_operations(self):
|
| ... | ... | @@ -122,3 +130,7 @@ class Scheduler(): |
| 122 | 130 |
if state == LeaseState.PENDING.value or \
|
| 123 | 131 |
state == LeaseState.ACTIVE.value:
|
| 124 | 132 |
self.retry_job(name)
|
| 133 |
+ |
|
| 134 |
+ def _update_execute_stage(self, job, stage):
|
|
| 135 |
+ job.update_execute_stage(stage)
|
|
| 136 |
+ return job
|
| ... | ... | @@ -69,17 +69,22 @@ def test_execute(skip_cache_lookup, instance, context): |
| 69 | 69 |
assert result.done is False
|
| 70 | 70 |
|
| 71 | 71 |
def test_wait_execution(instance, context):
|
| 72 |
+ # TODO: Figure out why next(response) hangs on the .get()
|
|
| 73 |
+ # method when running in pytest.
|
|
| 72 | 74 |
action_digest = remote_execution_pb2.Digest()
|
| 73 | 75 |
action_digest.hash = 'zhora'
|
| 74 | 76 |
|
| 75 |
- execution_request = remote_execution_pb2.ExecuteRequest(instance_name = '',
|
|
| 76 |
- action_digest = action_digest,
|
|
| 77 |
- skip_cache_lookup = True)
|
|
| 78 |
- execution_response = next(instance.Execute(execution_request, context))
|
|
| 77 |
+ j = job.Job(action_digest, None)
|
|
| 78 |
+ j._operation.done = True
|
|
| 79 | 79 |
|
| 80 |
+ request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
|
|
| 80 | 81 |
|
| 81 |
- request = remote_execution_pb2.WaitExecutionRequest(name=execution_response.name)
|
|
| 82 |
+ instance._instance._scheduler.jobs[j.name] = j
|
|
| 82 | 83 |
|
| 83 |
- response = next(instance.WaitExecution(request, context))
|
|
| 84 |
+ action_result_any = any_pb2.Any()
|
|
| 85 |
+ action_result = remote_execution_pb2.ActionResult()
|
|
| 86 |
+ action_result_any.Pack(action_result)
|
|
| 84 | 87 |
|
| 85 |
- assert response == execution_response
|
|
| 88 |
+ instance._instance._scheduler._update_execute_stage(j, job.ExecuteStage.COMPLETED)
|
|
| 89 |
+ |
|
| 90 |
+ response = instance.WaitExecution(request, context)
|
