Martin Blanchard pushed to branch mablanch/117-job-scheduler-refactoring at BuildGrid / buildgrid
Commits:
-
6f303c53
by Martin Blanchard at 2018-10-17T14:28:07Z
5 changed files:
- buildgrid/server/execution/instance.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
Changes:
| ... | ... | @@ -48,7 +48,10 @@ class ExecutionInstance: |
| 48 | 48 |
if not action:
|
| 49 | 49 |
raise FailedPreconditionError("Could not get action from storage.")
|
| 50 | 50 |
|
| 51 |
- job = Job(action_digest, action.do_not_cache, message_queue)
|
|
| 51 |
+ job = Job(action, action_digest)
|
|
| 52 |
+ if message_queue is not None:
|
|
| 53 |
+ job.register_client(message_queue)
|
|
| 54 |
+ |
|
| 52 | 55 |
self.logger.info("Operation name: [{}]".format(job.name))
|
| 53 | 56 |
|
| 54 | 57 |
self._scheduler.queue_job(job, skip_cache_lookup)
|
| ... | ... | @@ -50,11 +50,12 @@ class LeaseState(Enum): |
| 50 | 50 |
|
| 51 | 51 |
class Job:
|
| 52 | 52 |
|
| 53 |
- def __init__(self, action_digest, do_not_cache=False, message_queue=None):
|
|
| 53 |
+ def __init__(self, action, action_digest):
|
|
| 54 | 54 |
self.logger = logging.getLogger(__name__)
|
| 55 | 55 |
|
| 56 | 56 |
self._name = str(uuid.uuid4())
|
| 57 |
- self._do_not_cache = do_not_cache
|
|
| 57 |
+ self._do_not_cache = False
|
|
| 58 |
+ self._action = remote_execution_pb2.Action()
|
|
| 58 | 59 |
self._operation = operations_pb2.Operation()
|
| 59 | 60 |
self._lease = None
|
| 60 | 61 |
self._n_tries = 0
|
| ... | ... | @@ -65,13 +66,12 @@ class Job: |
| 65 | 66 |
self.__operation_metadata.action_digest.CopyFrom(action_digest)
|
| 66 | 67 |
self.__operation_metadata.stage = OperationStage.UNKNOWN.value
|
| 67 | 68 |
|
| 69 |
+ self._action.CopyFrom(action)
|
|
| 70 |
+ self._do_not_cache = self._action.do_not_cache
|
|
| 68 | 71 |
self._operation_update_queues = []
|
| 69 | 72 |
self._operation.name = self._name
|
| 70 | 73 |
self._operation.done = False
|
| 71 | 74 |
|
| 72 |
- if message_queue is not None:
|
|
| 73 |
- self.register_client(message_queue)
|
|
| 74 |
- |
|
| 75 | 75 |
@property
|
| 76 | 76 |
def name(self):
|
| 77 | 77 |
return self._name
|
| ... | ... | @@ -80,6 +80,10 @@ class Job: |
| 80 | 80 |
def do_not_cache(self):
|
| 81 | 81 |
return self._do_not_cache
|
| 82 | 82 |
|
| 83 |
+ @property
|
|
| 84 |
+ def action(self):
|
|
| 85 |
+ return self._action
|
|
| 86 |
+ |
|
| 83 | 87 |
@property
|
| 84 | 88 |
def action_digest(self):
|
| 85 | 89 |
return self.__operation_metadata.action_digest
|
| ... | ... | @@ -157,6 +161,10 @@ class Job: |
| 157 | 161 |
if self._lease.state == LeaseState.COMPLETED.value:
|
| 158 | 162 |
action_result = remote_execution_pb2.ActionResult()
|
| 159 | 163 |
|
| 164 |
+ # TODO: Make a distinction between build and bot failures!
|
|
| 165 |
+ if status.code != 0:
|
|
| 166 |
+ self._do_not_cache = True
|
|
| 167 |
+ |
|
| 160 | 168 |
if result is not None:
|
| 161 | 169 |
assert result.Is(action_result.DESCRIPTOR)
|
| 162 | 170 |
result.Unpack(action_result)
|
| ... | ... | @@ -86,9 +86,8 @@ class Scheduler: |
| 86 | 86 |
else:
|
| 87 | 87 |
job.update_lease_state(lease_state, status=lease_status, result=lease_result)
|
| 88 | 88 |
|
| 89 |
- if not job.do_not_cache and self._action_cache is not None:
|
|
| 90 |
- if not job.lease.status.code:
|
|
| 91 |
- self._action_cache.update_action_result(job.action_digest, job.action_result)
|
|
| 89 |
+ if self._action_cache is not None and not job.do_not_cache:
|
|
| 90 |
+ self._action_cache.update_action_result(job.action_digest, job.action_result)
|
|
| 92 | 91 |
|
| 93 | 92 |
job.update_operation_stage(OperationStage.COMPLETED)
|
| 94 | 93 |
|
| ... | ... | @@ -137,7 +137,7 @@ def test_update_leases_with_work(bot_session, context, instance): |
| 137 | 137 |
bot_session=bot_session)
|
| 138 | 138 |
|
| 139 | 139 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
| 140 |
- _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
| 140 |
+ _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
|
|
| 141 | 141 |
|
| 142 | 142 |
response = instance.CreateBotSession(request, context)
|
| 143 | 143 |
|
| ... | ... | @@ -159,7 +159,7 @@ def test_update_leases_work_complete(bot_session, context, instance): |
| 159 | 159 |
|
| 160 | 160 |
# Inject work
|
| 161 | 161 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
| 162 |
- _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
| 162 |
+ _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
|
|
| 163 | 163 |
|
| 164 | 164 |
request = bots_pb2.UpdateBotSessionRequest(name=response.name,
|
| 165 | 165 |
bot_session=response)
|
| ... | ... | @@ -188,7 +188,7 @@ def test_work_rejected_by_bot(bot_session, context, instance): |
| 188 | 188 |
bot_session=bot_session)
|
| 189 | 189 |
# Inject work
|
| 190 | 190 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
| 191 |
- _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
| 191 |
+ _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
|
|
| 192 | 192 |
|
| 193 | 193 |
# Simulated the severed binding between client and server
|
| 194 | 194 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
| ... | ... | @@ -210,7 +210,7 @@ def test_work_out_of_sync_from_pending(state, bot_session, context, instance): |
| 210 | 210 |
bot_session=bot_session)
|
| 211 | 211 |
# Inject work
|
| 212 | 212 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
| 213 |
- _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
| 213 |
+ _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
|
|
| 214 | 214 |
|
| 215 | 215 |
# Simulated the severed binding between client and server
|
| 216 | 216 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
| ... | ... | @@ -231,7 +231,7 @@ def test_work_out_of_sync_from_active(state, bot_session, context, instance): |
| 231 | 231 |
bot_session=bot_session)
|
| 232 | 232 |
# Inject work
|
| 233 | 233 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
| 234 |
- _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
| 234 |
+ _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
|
|
| 235 | 235 |
|
| 236 | 236 |
# Simulated the severed binding between client and server
|
| 237 | 237 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
| ... | ... | @@ -258,7 +258,7 @@ def test_work_active_to_active(bot_session, context, instance): |
| 258 | 258 |
bot_session=bot_session)
|
| 259 | 259 |
# Inject work
|
| 260 | 260 |
action_digest = remote_execution_pb2.Digest(hash='gaff')
|
| 261 |
- _inject_work(instance._instances[""]._scheduler, action_digest)
|
|
| 261 |
+ _inject_work(instance._instances[""]._scheduler, action_digest=action_digest)
|
|
| 262 | 262 |
|
| 263 | 263 |
# Simulated the severed binding between client and server
|
| 264 | 264 |
response = copy.deepcopy(instance.CreateBotSession(request, context))
|
| ... | ... | @@ -280,8 +280,10 @@ def test_post_bot_event_temp(context, instance): |
| 280 | 280 |
context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
| 281 | 281 |
|
| 282 | 282 |
|
| 283 |
-def _inject_work(scheduler, action_digest=None):
|
|
| 283 |
+def _inject_work(scheduler, action=None, action_digest=None):
|
|
| 284 |
+ if not action:
|
|
| 285 |
+ action = remote_execution_pb2.Action()
|
|
| 284 | 286 |
if not action_digest:
|
| 285 | 287 |
action_digest = remote_execution_pb2.Digest()
|
| 286 |
- j = job.Job(action_digest, False)
|
|
| 288 |
+ j = job.Job(action, action_digest)
|
|
| 287 | 289 |
scheduler.queue_job(j, True)
|
| ... | ... | @@ -105,7 +105,7 @@ def test_no_action_digest_in_storage(instance, context): |
| 105 | 105 |
|
| 106 | 106 |
|
| 107 | 107 |
def test_wait_execution(instance, controller, context):
|
| 108 |
- j = job.Job(action_digest, None)
|
|
| 108 |
+ j = job.Job(action, action_digest)
|
|
| 109 | 109 |
j._operation.done = True
|
| 110 | 110 |
|
| 111 | 111 |
request = remote_execution_pb2.WaitExecutionRequest(name="{}/{}".format('', j.name))
|
