Martin Blanchard pushed to branch mablanch/149-leases-cancellation-status at BuildGrid / buildgrid
Commits:
-
251d3951
by Martin Blanchard at 2019-01-07T15:26:18Z
-
c0795f6d
by Martin Blanchard at 2019-01-07T15:26:39Z
-
ae0a9e47
by Martin Blanchard at 2019-01-07T15:26:43Z
-
00c2e4e6
by Martin Blanchard at 2019-01-07T15:26:46Z
3 changed files:
Changes:
| ... | ... | @@ -83,7 +83,7 @@ class BotsInterface: |
| 83 | 83 |
self._check_bot_ids(bot_session.bot_id, name)
|
| 84 | 84 |
self._check_assigned_leases(bot_session)
|
| 85 | 85 |
|
| 86 |
- for lease in bot_session.leases:
|
|
| 86 |
+ for lease in list(bot_session.leases):
|
|
| 87 | 87 |
checked_lease = self._check_lease_state(lease)
|
| 88 | 88 |
if not checked_lease:
|
| 89 | 89 |
# TODO: Make sure we don't need this
|
| ... | ... | @@ -91,7 +91,10 @@ class BotsInterface: |
| 91 | 91 |
self._assigned_leases[name].remove(lease.id)
|
| 92 | 92 |
except KeyError:
|
| 93 | 93 |
pass
|
| 94 |
- lease.Clear()
|
|
| 94 |
+ |
|
| 95 |
+ self._scheduler.delete_job_lease(lease.id)
|
|
| 96 |
+ |
|
| 97 |
+ bot_session.leases.remove(lease)
|
|
| 95 | 98 |
|
| 96 | 99 |
self._request_leases(bot_session)
|
| 97 | 100 |
return bot_session
|
| ... | ... | @@ -117,7 +120,7 @@ class BotsInterface: |
| 117 | 120 |
|
| 118 | 121 |
try:
|
| 119 | 122 |
if self._scheduler.get_job_lease_cancelled(lease.id):
|
| 120 |
- lease.state.CopyFrom(LeaseState.CANCELLED.value)
|
|
| 123 |
+ lease.state = LeaseState.CANCELLED.value
|
|
| 121 | 124 |
return lease
|
| 122 | 125 |
except KeyError:
|
| 123 | 126 |
# Job does not exist, remove from bot.
|
| ... | ... | @@ -222,6 +222,13 @@ class Job: |
| 222 | 222 |
if self._lease is not None:
|
| 223 | 223 |
self.update_lease_state(LeaseState.CANCELLED)
|
| 224 | 224 |
|
| 225 |
+ def delete_lease(self):
|
|
| 226 |
+ """Discard the job's :class:Lease."""
|
|
| 227 |
+ self.__worker_start_timestamp.Clear()
|
|
| 228 |
+ self.__worker_completed_timestamp.Clear()
|
|
| 229 |
+ |
|
| 230 |
+ self._lease = None
|
|
| 231 |
+ |
|
| 225 | 232 |
def update_operation_stage(self, stage):
|
| 226 | 233 |
"""Operates a stage transition for the job's :class:Operation.
|
| 227 | 234 |
|
| ... | ... | @@ -62,18 +62,8 @@ class Scheduler: |
| 62 | 62 |
|
| 63 | 63 |
job.unregister_client(queue)
|
| 64 | 64 |
|
| 65 |
- if not job.n_clients and job.operation.done:
|
|
| 66 |
- del self.jobs[job_name]
|
|
| 67 |
- |
|
| 68 |
- if self._is_instrumented:
|
|
| 69 |
- self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
|
|
| 70 |
- self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
|
|
| 71 |
- self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
|
|
| 72 |
- self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
|
|
| 73 |
- |
|
| 74 |
- self.__leases_by_state[LeaseState.PENDING].discard(job_name)
|
|
| 75 |
- self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
|
|
| 76 |
- self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
|
|
| 65 |
+ if not job.n_clients and job.operation.done and not job.lease:
|
|
| 66 |
+ self._delete_job(job.name)
|
|
| 77 | 67 |
|
| 78 | 68 |
def queue_job(self, job, skip_cache_lookup=False):
|
| 79 | 69 |
self.jobs[job.name] = job
|
| ... | ... | @@ -199,6 +189,15 @@ class Scheduler: |
| 199 | 189 |
"""Returns true if the lease is cancelled"""
|
| 200 | 190 |
return self.jobs[job_name].lease_cancelled
|
| 201 | 191 |
|
| 192 |
+ def delete_job_lease(self, job_name):
|
|
| 193 |
+ """Discards the lease associated to a job."""
|
|
| 194 |
+ job = self.jobs[job_name]
|
|
| 195 |
+ |
|
| 196 |
+ self.jobs[job.name].delete_lease()
|
|
| 197 |
+ |
|
| 198 |
+ if not job.n_clients and job.operation.done:
|
|
| 199 |
+ self._delete_job(job.name)
|
|
| 200 |
+ |
|
| 202 | 201 |
def get_job_operation(self, job_name):
|
| 203 | 202 |
"""Returns the operation associated to job."""
|
| 204 | 203 |
return self.jobs[job_name].operation
|
| ... | ... | @@ -296,6 +295,20 @@ class Scheduler: |
| 296 | 295 |
|
| 297 | 296 |
# --- Private API ---
|
| 298 | 297 |
|
| 298 |
+ def _delete_job(self, job_name):
|
|
| 299 |
+ """Drops an entry from the internal list of jobs."""
|
|
| 300 |
+ del self.jobs[job_name]
|
|
| 301 |
+ |
|
| 302 |
+ if self._is_instrumented:
|
|
| 303 |
+ self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
|
|
| 304 |
+ self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
|
|
| 305 |
+ self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
|
|
| 306 |
+ self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
|
|
| 307 |
+ |
|
| 308 |
+ self.__leases_by_state[LeaseState.PENDING].discard(job_name)
|
|
| 309 |
+ self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
|
|
| 310 |
+ self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
|
|
| 311 |
+ |
|
| 299 | 312 |
def _update_job_operation_stage(self, job_name, operation_stage):
|
| 300 | 313 |
"""Requests a stage transition for the job's :class:Operations.
|
| 301 | 314 |
|
