Martin Blanchard pushed to branch mablanch/117-job-scheduler-refactoring at BuildGrid / buildgrid
Commits:
-
6f303c53
by Martin Blanchard at 2018-10-17T14:28:07Z
5 changed files:
- buildgrid/server/execution/instance.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
Changes:
... | ... | @@ -48,7 +48,10 @@ class ExecutionInstance: |
48 | 48 |
if not action:
|
49 | 49 |
raise FailedPreconditionError("Could not get action from storage.")
|
50 | 50 |
|
51 |
- job = Job(action_digest, action.do_not_cache, message_queue)
|
|
51 |
+ job = Job(action, action_digest)
|
|
52 |
+ if message_queue is not None:
|
|
53 |
+ job.register_client(message_queue)
|
|
54 |
+ |
|
52 | 55 |
self.logger.info("Operation name: [{}]".format(job.name))
|
53 | 56 |
|
54 | 57 |
self._scheduler.queue_job(job, skip_cache_lookup)
|
... | ... | @@ -50,11 +50,12 @@ class LeaseState(Enum): |
50 | 50 |
|
51 | 51 |
class Job:
|
52 | 52 |
|
53 |
- def __init__(self, action_digest, do_not_cache=False, message_queue=None):
|
|
53 |
+ def __init__(self, action, action_digest):
|
|
54 | 54 |
self.logger = logging.getLogger(__name__)
|
55 | 55 |
|
56 | 56 |
self._name = str(uuid.uuid4())
|
57 |
- self._do_not_cache = do_not_cache
|
|
57 |
+ self._do_not_cache = False
|
|
58 |
+ self._action = remote_execution_pb2.Action()
|
|
58 | 59 |
self._operation = operations_pb2.Operation()
|
59 | 60 |
self._lease = None
|
60 | 61 |
self._n_tries = 0
|
... | ... | @@ -65,13 +66,12 @@ class Job: |
65 | 66 |
self.__operation_metadata.action_digest.CopyFrom(action_digest)
|
66 | 67 |
self.__operation_metadata.stage = OperationStage.UNKNOWN.value
|
67 | 68 |
|
69 |
+ self._action.CopyFrom(action)
|
|
70 |
+ self._do_not_cache = self._action.do_not_cache
|
|
68 | 71 |
self._operation_update_queues = []
|
69 | 72 |
self._operation.name = self._name
|
70 | 73 |
self._operation.done = False
|
71 | 74 |
|
72 |
- if message_queue is not None:
|
|
73 |
- self.register_client(message_queue)
|
|
74 |
- |
|
75 | 75 |
@property
|
76 | 76 |
def name(self):
|
77 | 77 |
return self._name
|
... | ... | @@ -80,6 +80,10 @@ class Job: |
80 | 80 |
def do_not_cache(self):
|
81 | 81 |
return self._do_not_cache
|
82 | 82 |
|
83 |
+ @property
|
|
84 |
+ def action(self):
|
|
85 |
+ return self._action
|
|
86 |
+ |
|
83 | 87 |
@property
|
84 | 88 |
def action_digest(self):
|
85 | 89 |
return self.__operation_metadata.action_digest
|
... | ... | @@ -157,6 +161,10 @@ class Job: |
157 | 161 |
if self._lease.state == LeaseState.COMPLETED.value:
|
158 | 162 |
action_result = remote_execution_pb2.ActionResult()
|
159 | 163 |
|
164 |
+ # TODO: Make a distinction between build and bot failures!
|
|
165 |
+ if status.code != 0:
|
|
166 |
+ self._do_not_cache = True
|
|
167 |
+ |
|
160 | 168 |
if result is not None:
|
161 | 169 |
assert result.Is(action_result.DESCRIPTOR)
|
162 | 170 |
result.Unpack(action_result)
|
... | ... | @@ -86,9 +86,8 @@ class Scheduler: |
86 | 86 |
else:
|
87 | 87 |
job.update_lease_state(lease_state, status=lease_status, result=lease_result)
|
88 | 88 |
|
89 |
- if not job.do_not_cache and self._action_cache is not None:
|
|
90 |
- if not job.lease.status.code:
|
|
91 |
- self._action_cache.update_action_result(job.action_digest, job.action_result)
|
|
89 |
+ if self._action_cache is not None and not job.do_not_cache:
|
|
90 |
+ self._action_cache.update_action_result(job.action_digest, job.action_result)
|
|
92 | 91 |
|
93 | 92 |
job.update_operation_stage(OperationStage.COMPLETED)
|
94 | 93 |
|
... | ... | @@ -137,7 +137,7 @@ def test_update_leases_with_work(bot_session, context, instance): |
137 | 137 |
bot_session=bot_session)
|
138 | 138 |
|
139 | 139 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
140 |
- _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
140 |
+ _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
|
|
141 | 141 |
|
142 | 142 |
response = instance.CreateBotSession(request, context)
|
143 | 143 |
|
... | ... | @@ -159,7 +159,7 @@ def test_update_leases_work_complete(bot_session, context, instance): |
159 | 159 |
|
160 | 160 |
# Inject work
|
161 | 161 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
162 |
- _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
162 |
+ _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
|
|
163 | 163 |
|
164 | 164 |
request = bots_pb2.UpdateBotSessionRequest(name=response.name,
|
165 | 165 |
bot_session=response)
|
... | ... | @@ -188,7 +188,7 @@ def test_work_rejected_by_bot(bot_session, context, instance): |
188 | 188 |
bot_session=bot_session)
|
189 | 189 |
# Inject work
|
190 | 190 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
191 |
- _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
191 |
+ _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
|
|
192 | 192 |
|
193 | 193 |
# Simulated the severed binding between client and server
|
194 | 194 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
... | ... | @@ -210,7 +210,7 @@ def test_work_out_of_sync_from_pending(state, bot_session, context, instance): |
210 | 210 |
bot_session=bot_session)
|
211 | 211 |
# Inject work
|
212 | 212 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
213 |
- _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
213 |
+ _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
|
|
214 | 214 |
|
215 | 215 |
# Simulated the severed binding between client and server
|
216 | 216 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
... | ... | @@ -231,7 +231,7 @@ def test_work_out_of_sync_from_active(state, bot_session, context, instance): |
231 | 231 |
bot_session=bot_session)
|
232 | 232 |
# Inject work
|
233 | 233 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
234 |
- _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
234 |
+ _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
|
|
235 | 235 |
|
236 | 236 |
# Simulated the severed binding between client and server
|
237 | 237 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
... | ... | @@ -258,7 +258,7 @@ def test_work_active_to_active(bot_session, context, instance): |
258 | 258 |
bot_session=bot_session)
|
259 | 259 |
# Inject work
|
260 | 260 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
261 |
- _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
261 |
+ _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
|
|
262 | 262 |
|
263 | 263 |
# Simulated the severed binding between client and server
|
264 | 264 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
... | ... | @@ -280,8 +280,10 @@ def test_post_bot_event_temp(context, instance): |
280 | 280 |
context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
281 | 281 |
|
282 | 282 |
|
283 |
-def _inject_work(scheduler, action_digest=None):
|
|
283 |
+def _inject_work(scheduler, action=None, action_digest=None):
|
|
284 |
+ if not action:
|
|
285 |
+ action = remote_execution_pb2.Action()
|
|
284 | 286 |
if not action_digest:
|
285 | 287 |
action_digest = remote_execution_pb2.Digest()
|
286 |
- j = job.Job(action_digest, False)
|
|
288 |
+ j = job.Job(action, action_digest)
|
|
287 | 289 |
scheduler.queue_job(j, True)
|
... | ... | @@ -105,7 +105,7 @@ def test_no_action_digest_in_storage(instance, context): |
105 | 105 |
|
106 | 106 |
|
107 | 107 |
def test_wait_execution(instance, controller, context):
|
108 |
- j = job.Job(action_digest, None)
|
|
108 |
+ j = job.Job(action, action_digest)
|
|
109 | 109 |
j._operation.done = True
|
110 | 110 |
|
111 | 111 |
request = remote_execution_pb2.WaitExecutionRequest(name="{}/{}".format('', j.name))
|