finnball pushed to branch finn/async at BuildGrid / buildgrid
Commits:
6 changed files:
- CONTRIBUTING.rst
- buildgrid/server/execution/execution_instance.py
- buildgrid/server/execution/execution_service.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- tests/integration/execution_service.py
Changes:
... | ... | @@ -9,8 +9,23 @@ We welcome contributions in the form of bug fixes or feature additions / enhance |
9 | 9 |
|
10 | 10 |
Any major feature additions should be raised as a proposal on the `Mailing List <https://lists.buildgrid.build/cgi-bin/mailman/listinfo/buildgrid/>`_ to be discussed, and then eventually followed up with an issue here on gitlab. We recommend that you propose the feature in advance of commencing work. We are also on irc, but do not have our own dedicated channel - you can find us on #buildstream on GIMPNet and #bazel on freenode.
|
11 | 11 |
|
12 |
-The author of any patch is expected to take ownership of that code and is to support it for a reasonable
|
|
13 |
-time-frame. This means addressing any unforeseen side effects and quirks the feature may have introduced.
|
|
12 |
+The author of any patch is expected to take ownership of that code and is to support it for a reasonable time-frame. This means addressing any unforeseen side effects and quirks the feature may have introduced. More on this below in 'Granting Committer Access'.
|
|
13 |
+ |
|
14 |
+Granting Committer Access
|
|
15 |
+-------------------------
|
|
16 |
+ |
|
17 |
+We'll hand out commit access to anyone who has successfully landed a single patch to the code base. Please request this via irc or the mailing list.
|
|
18 |
+ |
|
19 |
+This of course relies on contributors being responsive and show willingness to address problems after landing branches there should not be any problems here.
|
|
20 |
+ |
|
21 |
+What we are expecting of committers here in general is basically to
|
|
22 |
+escalate the review in cases of uncertainty:
|
|
23 |
+ |
|
24 |
+* If the patch/branch is very trivial (obvious few line changes or typos etc), and you are confident of the change, there is no need for review.
|
|
25 |
+ |
|
26 |
+* If the patch/branch is non trivial, please obtain a review from another committer who is familiar with the area which the branch effects. An approval from someone who is not the patch author will be needed before any merge.
|
|
27 |
+ |
|
28 |
+We don't have any detailed policy for "bad actors", but will of course handle things on a case by case basis - commit access should not result in commit wars or be used as a tool to subvert the project when disagreements arise, such incidents (if any) would surely lead to temporary suspension of commit rights.
|
|
14 | 29 |
|
15 | 30 |
Patch Submissions
|
16 | 31 |
-----------------
|
... | ... | @@ -34,12 +34,12 @@ class ExecutionInstance(): |
34 | 34 |
self.logger = logging.getLogger(__name__)
|
35 | 35 |
self._scheduler = scheduler
|
36 | 36 |
|
37 |
- def execute(self, action_digest, skip_cache_lookup):
|
|
37 |
+ def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
|
38 | 38 |
""" Sends a job for execution.
|
39 | 39 |
Queues an action and creates an Operation instance to be associated with
|
40 | 40 |
this action.
|
41 | 41 |
"""
|
42 |
- job = Job(action_digest)
|
|
42 |
+ job = Job(action_digest, message_queue)
|
|
43 | 43 |
self.logger.info("Operation name: {}".format(job.name))
|
44 | 44 |
|
45 | 45 |
if not skip_cache_lookup:
|
... | ... | @@ -70,3 +70,15 @@ class ExecutionInstance(): |
70 | 70 |
def cancel_operation(self, name):
|
71 | 71 |
# TODO: Cancel leases
|
72 | 72 |
raise NotImplementedError("Cancelled operations not supported")
|
73 |
+ |
|
74 |
+ def register_message_client(self, name, queue):
|
|
75 |
+ try:
|
|
76 |
+ self._scheduler.register_client(name, queue)
|
|
77 |
+ except KeyError:
|
|
78 |
+ raise InvalidArgumentError("Operation name does not exist: {}".format(name))
|
|
79 |
+ |
|
80 |
+ def unregister_message_client(self, name, queue):
|
|
81 |
+ try:
|
|
82 |
+ self._scheduler.unregister_client(name, queue)
|
|
83 |
+ except KeyError:
|
|
84 |
+ raise InvalidArgumentError("Operation name does not exist: {}".format(name))
|
... | ... | @@ -22,10 +22,9 @@ ExecutionService |
22 | 22 |
Serves remote execution requests.
|
23 | 23 |
"""
|
24 | 24 |
|
25 |
-import copy
|
|
26 | 25 |
import grpc
|
27 | 26 |
import logging
|
28 |
-import time
|
|
27 |
+import queue
|
|
29 | 28 |
|
30 | 29 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
31 | 30 |
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
|
... | ... | @@ -35,21 +34,27 @@ from ._exceptions import InvalidArgumentError |
35 | 34 |
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
|
36 | 35 |
|
37 | 36 |
def __init__(self, instance):
|
38 |
- self._instance = instance
|
|
39 | 37 |
self.logger = logging.getLogger(__name__)
|
38 |
+ self._instance = instance
|
|
40 | 39 |
|
41 | 40 |
def Execute(self, request, context):
|
42 | 41 |
# Ignore request.instance_name for now
|
43 | 42 |
# Have only one instance
|
44 | 43 |
try:
|
44 |
+ message_queue = queue.Queue()
|
|
45 | 45 |
operation = self._instance.execute(request.action_digest,
|
46 |
- request.skip_cache_lookup)
|
|
46 |
+ request.skip_cache_lookup,
|
|
47 |
+ message_queue)
|
|
47 | 48 |
|
48 |
- yield from self._stream_operation_updates(operation.name)
|
|
49 |
+ remove_client = lambda : self._remove_client(operation.name, message_queue)
|
|
50 |
+ context.add_callback(remove_client)
|
|
51 |
+ |
|
52 |
+ yield from self._stream_operation_updates(message_queue,
|
|
53 |
+ operation.name)
|
|
49 | 54 |
|
50 | 55 |
except InvalidArgumentError as e:
|
51 | 56 |
self.logger.error(e)
|
52 |
- context.set_details(str(e))
|
|
57 |
+ context.set_details(sxtr(e))
|
|
53 | 58 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
54 | 59 |
|
55 | 60 |
except NotImplementedError as e:
|
... | ... | @@ -59,19 +64,28 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
59 | 64 |
|
60 | 65 |
def WaitExecution(self, request, context):
|
61 | 66 |
try:
|
62 |
- yield from self._stream_operation_updates(request.name)
|
|
67 |
+ message_queue = queue.Queue()
|
|
68 |
+ operation_name = request.name
|
|
69 |
+ |
|
70 |
+ self._instance.register_message_client(operation_name, message_queue)
|
|
71 |
+ |
|
72 |
+ remove_client = lambda : self._remove_client(operation_name, message_queue)
|
|
73 |
+ context.add_callback(remove_client)
|
|
74 |
+ |
|
75 |
+ yield from self._stream_operation_updates(message_queue,
|
|
76 |
+ operation_name)
|
|
63 | 77 |
|
64 | 78 |
except InvalidArgumentError as e:
|
65 | 79 |
self.logger.error(e)
|
66 | 80 |
context.set_details(str(e))
|
67 | 81 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
68 | 82 |
|
69 |
- def _stream_operation_updates(self, name):
|
|
70 |
- stream_previous = None
|
|
71 |
- while True:
|
|
72 |
- stream = self._instance.get_operation(name)
|
|
73 |
- if stream != stream_previous:
|
|
74 |
- yield stream
|
|
75 |
- if stream.done == True: break
|
|
76 |
- stream_previous = copy.deepcopy(stream)
|
|
77 |
- time.sleep(1)
|
|
83 |
+ def _remove_client(self, operation_name, message_queue):
|
|
84 |
+ self._instance.unregister_message_client(operation_name, message_queue)
|
|
85 |
+ |
|
86 |
+ def _stream_operation_updates(self, message_queue, operation_name):
|
|
87 |
+ operation = message_queue.get()
|
|
88 |
+ while not operation.done:
|
|
89 |
+ yield operation
|
|
90 |
+ operation = message_queue.get()
|
|
91 |
+ yield operation
|
... | ... | @@ -51,9 +51,8 @@ class LeaseState(Enum): |
51 | 51 |
|
52 | 52 |
class Job():
|
53 | 53 |
|
54 |
- def __init__(self, action):
|
|
55 |
- self.action = action
|
|
56 |
- self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
|
|
54 |
+ def __init__(self, action_digest, message_queue=None):
|
|
55 |
+ self.action_digest = action_digest
|
|
57 | 56 |
self.execute_stage = ExecuteStage.UNKNOWN
|
58 | 57 |
self.lease = None
|
59 | 58 |
self.logger = logging.getLogger(__name__)
|
... | ... | @@ -62,10 +61,24 @@ class Job(): |
62 | 61 |
|
63 | 62 |
self._n_tries = 0
|
64 | 63 |
self._operation = operations_pb2.Operation(name = self.name)
|
64 |
+ self._operation_update_queues = []
|
|
65 |
+ |
|
66 |
+ if message_queue is not None:
|
|
67 |
+ self.register_client(message_queue)
|
|
68 |
+ |
|
69 |
+ def check_job_finished(self):
|
|
70 |
+ if not self._operation_update_queues:
|
|
71 |
+ return self._operation.done
|
|
72 |
+ return False
|
|
73 |
+ |
|
74 |
+ def register_client(self, queue):
|
|
75 |
+ self._operation_update_queues.append(queue)
|
|
76 |
+ |
|
77 |
+ def unregister_client(self, queue):
|
|
78 |
+ self._operation_update_queues.remove(queue)
|
|
65 | 79 |
|
66 | 80 |
def get_operation(self):
|
67 | 81 |
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
|
68 |
- |
|
69 | 82 |
if self.result is not None:
|
70 | 83 |
self._operation.done = True
|
71 | 84 |
response = ExecuteResponse()
|
... | ... | @@ -81,10 +94,10 @@ class Job(): |
81 | 94 |
return meta
|
82 | 95 |
|
83 | 96 |
def create_lease(self):
|
84 |
- action = self._pack_any(self.action)
|
|
97 |
+ action_digest = self._pack_any(self.action_digest)
|
|
85 | 98 |
|
86 | 99 |
lease = bots_pb2.Lease(id = self.name,
|
87 |
- payload = action,
|
|
100 |
+ payload = action_digest,
|
|
88 | 101 |
state = LeaseState.PENDING.value)
|
89 | 102 |
self.lease = lease
|
90 | 103 |
return lease
|
... | ... | @@ -92,6 +105,11 @@ class Job(): |
92 | 105 |
def get_operations(self):
|
93 | 106 |
return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
|
94 | 107 |
|
108 |
+ def update_execute_stage(self, stage):
|
|
109 |
+ self.execute_stage = stage
|
|
110 |
+ for queue in self._operation_update_queues:
|
|
111 |
+ queue.put(self.get_operation())
|
|
112 |
+ |
|
95 | 113 |
def _pack_any(self, pack):
|
96 | 114 |
any = any_pb2.Any()
|
97 | 115 |
any.Pack(pack)
|
... | ... | @@ -35,8 +35,17 @@ class Scheduler(): |
35 | 35 |
self.jobs = {}
|
36 | 36 |
self.queue = deque()
|
37 | 37 |
|
38 |
+ def register_client(self, name, queue):
|
|
39 |
+ self.jobs[name].register_client(queue)
|
|
40 |
+ |
|
41 |
+ def unregister_client(self, name, queue):
|
|
42 |
+ job = self.jobs[name]
|
|
43 |
+ job.unregister_client(queue)
|
|
44 |
+ if job.check_job_finished():
|
|
45 |
+ del self.jobs[name]
|
|
46 |
+ |
|
38 | 47 |
def append_job(self, job):
|
39 |
- job.execute_stage = ExecuteStage.QUEUED
|
|
48 |
+ job.update_execute_stage(ExecuteStage.QUEUED)
|
|
40 | 49 |
self.jobs[job.name] = job
|
41 | 50 |
self.queue.append(job)
|
42 | 51 |
|
... | ... | @@ -45,9 +54,9 @@ class Scheduler(): |
45 | 54 |
|
46 | 55 |
if job.n_tries >= self.MAX_N_TRIES:
|
47 | 56 |
# TODO: Decide what to do with these jobs
|
48 |
- job.execute_stage = ExecuteStage.COMPLETED
|
|
57 |
+ job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
49 | 58 |
else:
|
50 |
- job.execute_stage = ExecuteStage.QUEUED
|
|
59 |
+ job.update_execute_stage(ExecuteStage.QUEUED)
|
|
51 | 60 |
job.n_tries += 1
|
52 | 61 |
self.queue.appendleft(job)
|
53 | 62 |
|
... | ... | @@ -56,15 +65,14 @@ class Scheduler(): |
56 | 65 |
def create_job(self):
|
57 | 66 |
if len(self.queue) > 0:
|
58 | 67 |
job = self.queue.popleft()
|
59 |
- job.execute_stage = ExecuteStage.EXECUTING
|
|
68 |
+ job.update_execute_stage(ExecuteStage.EXECUTING)
|
|
60 | 69 |
self.jobs[job.name] = job
|
61 | 70 |
return job
|
62 |
- return None
|
|
63 | 71 |
|
64 | 72 |
def job_complete(self, name, result):
|
65 | 73 |
job = self.jobs[name]
|
66 |
- job.execute_stage = ExecuteStage.COMPLETED
|
|
67 | 74 |
job.result = result
|
75 |
+ job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
68 | 76 |
self.jobs[name] = job
|
69 | 77 |
|
70 | 78 |
def get_operations(self):
|
... | ... | @@ -122,3 +130,7 @@ class Scheduler(): |
122 | 130 |
if state == LeaseState.PENDING.value or \
|
123 | 131 |
state == LeaseState.ACTIVE.value:
|
124 | 132 |
self.retry_job(name)
|
133 |
+ |
|
134 |
+ def _update_execute_stage(self, job, stage):
|
|
135 |
+ job.update_execute_stage(stage)
|
|
136 |
+ return job
|
... | ... | @@ -72,14 +72,17 @@ def test_wait_execution(instance, context): |
72 | 72 |
action_digest = remote_execution_pb2.Digest()
|
73 | 73 |
action_digest.hash = 'zhora'
|
74 | 74 |
|
75 |
- execution_request = remote_execution_pb2.ExecuteRequest(instance_name = '',
|
|
76 |
- action_digest = action_digest,
|
|
77 |
- skip_cache_lookup = True)
|
|
78 |
- execution_response = next(instance.Execute(execution_request, context))
|
|
75 |
+ j = job.Job(action_digest, None)
|
|
76 |
+ j._operation.done = True
|
|
79 | 77 |
|
78 |
+ request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
|
|
80 | 79 |
|
81 |
- request = remote_execution_pb2.WaitExecutionRequest(name=execution_response.name)
|
|
80 |
+ instance._instance._scheduler.jobs[j.name] = j
|
|
82 | 81 |
|
83 |
- response = next(instance.WaitExecution(request, context))
|
|
82 |
+ action_result_any = any_pb2.Any()
|
|
83 |
+ action_result = remote_execution_pb2.ActionResult()
|
|
84 |
+ action_result_any.Pack(action_result)
|
|
84 | 85 |
|
85 |
- assert response == execution_response
|
|
86 |
+ instance._instance._scheduler._update_execute_stage(j, job.ExecuteStage.COMPLETED)
|
|
87 |
+ |
|
88 |
+ response = instance.WaitExecution(request, context)
|