Tristan Van Berkom pushed to branch tristan/one-cache-size-job at BuildStream / buildstream
Commits:
-
5de42d43
by Tristan Van Berkom at 2019-01-07T18:00:37Z
-
059035b9
by Tristan Van Berkom at 2019-01-07T18:02:00Z
-
b83d1b1f
by Tristan Van Berkom at 2019-01-07T18:02:00Z
-
16a8816f
by Tristan Van Berkom at 2019-01-07T18:02:00Z
-
c2fc2a5e
by Tristan Van Berkom at 2019-01-07T18:02:00Z
13 changed files:
- buildstream/_frontend/app.py
- buildstream/_scheduler/__init__.py
- buildstream/_scheduler/jobs/__init__.py
- buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/jobs/elementjob.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/fetchqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/queues/trackqueue.py
- buildstream/_scheduler/scheduler.py
Changes:
... | ... | @@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages |
38 | 38 |
from .._stream import Stream
|
39 | 39 |
from .._versions import BST_FORMAT_VERSION
|
40 | 40 |
from .. import _yaml
|
41 |
-from .._scheduler import ElementJob
|
|
41 |
+from .._scheduler import ElementJob, JobStatus
|
|
42 | 42 |
|
43 | 43 |
# Import frontend assets
|
44 | 44 |
from . import Profile, LogLine, Status
|
... | ... | @@ -515,13 +515,13 @@ class App(): |
515 | 515 |
self._status.add_job(job)
|
516 | 516 |
self._maybe_render_status()
|
517 | 517 |
|
518 |
- def _job_completed(self, job, success):
|
|
518 |
+ def _job_completed(self, job, status):
|
|
519 | 519 |
self._status.remove_job(job)
|
520 | 520 |
self._maybe_render_status()
|
521 | 521 |
|
522 | 522 |
# Dont attempt to handle a failure if the user has already opted to
|
523 | 523 |
# terminate
|
524 |
- if not success and not self.stream.terminated:
|
|
524 |
+ if status == JobStatus.FAIL and not self.stream.terminated:
|
|
525 | 525 |
|
526 | 526 |
if isinstance(job, ElementJob):
|
527 | 527 |
element = job.element
|
... | ... | @@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue |
26 | 26 |
from .queues.pullqueue import PullQueue
|
27 | 27 |
|
28 | 28 |
from .scheduler import Scheduler, SchedStatus
|
29 |
-from .jobs import ElementJob
|
|
29 |
+from .jobs import ElementJob, JobStatus
|
... | ... | @@ -20,3 +20,4 @@ |
20 | 20 |
from .elementjob import ElementJob
|
21 | 21 |
from .cachesizejob import CacheSizeJob
|
22 | 22 |
from .cleanupjob import CleanupJob
|
23 |
+from .job import JobStatus
|
... | ... | @@ -16,7 +16,7 @@ |
16 | 16 |
# Author:
|
17 | 17 |
# Tristan Daniël Maat <tristan maat codethink co uk>
|
18 | 18 |
#
|
19 |
-from .job import Job
|
|
19 |
+from .job import Job, JobStatus
|
|
20 | 20 |
|
21 | 21 |
|
22 | 22 |
class CacheSizeJob(Job):
|
... | ... | @@ -30,8 +30,8 @@ class CacheSizeJob(Job): |
30 | 30 |
def child_process(self):
|
31 | 31 |
return self._artifacts.compute_cache_size()
|
32 | 32 |
|
33 |
- def parent_complete(self, success, result):
|
|
34 |
- if success:
|
|
33 |
+ def parent_complete(self, status, result):
|
|
34 |
+ if status == JobStatus.OK:
|
|
35 | 35 |
self._artifacts.set_cache_size(result)
|
36 | 36 |
|
37 | 37 |
if self._complete_cb:
|
... | ... | @@ -16,7 +16,7 @@ |
16 | 16 |
# Author:
|
17 | 17 |
# Tristan Daniël Maat <tristan maat codethink co uk>
|
18 | 18 |
#
|
19 |
-from .job import Job
|
|
19 |
+from .job import Job, JobStatus
|
|
20 | 20 |
|
21 | 21 |
|
22 | 22 |
class CleanupJob(Job):
|
... | ... | @@ -29,6 +29,6 @@ class CleanupJob(Job): |
29 | 29 |
def child_process(self):
|
30 | 30 |
return self._artifacts.clean()
|
31 | 31 |
|
32 |
- def parent_complete(self, success, result):
|
|
33 |
- if success:
|
|
32 |
+ def parent_complete(self, status, result):
|
|
33 |
+ if status == JobStatus.OK:
|
|
34 | 34 |
self._artifacts.set_cache_size(result)
|
... | ... | @@ -60,7 +60,7 @@ from .job import Job |
60 | 60 |
# Args:
|
61 | 61 |
# job (Job): The job object which completed
|
62 | 62 |
# element (Element): The element passed to the Job() constructor
|
63 |
-# success (bool): True if the action_cb did not raise an exception
|
|
63 |
+# status (JobStatus): The status of whether the workload raised an exception
|
|
64 | 64 |
# result (object): The deserialized object returned by the `action_cb`, or None
|
65 | 65 |
# if `success` is False
|
66 | 66 |
#
|
... | ... | @@ -93,8 +93,8 @@ class ElementJob(Job): |
93 | 93 |
# Run the action
|
94 | 94 |
return self._action_cb(self._element)
|
95 | 95 |
|
96 |
- def parent_complete(self, success, result):
|
|
97 |
- self._complete_cb(self, self._element, success, self._result)
|
|
96 |
+ def parent_complete(self, status, result):
|
|
97 |
+ self._complete_cb(self, self._element, status, self._result)
|
|
98 | 98 |
|
99 | 99 |
def message(self, message_type, message, **kwargs):
|
100 | 100 |
args = dict(kwargs)
|
... | ... | @@ -41,6 +41,22 @@ RC_PERM_FAIL = 2 |
41 | 41 |
RC_SKIPPED = 3
|
42 | 42 |
|
43 | 43 |
|
44 |
+# JobStatus:
|
|
45 |
+#
|
|
46 |
+# The job completion status, passed back through the
|
|
47 |
+# complete callbacks.
|
|
48 |
+#
|
|
49 |
+class JobStatus():
|
|
50 |
+ # Job succeeded
|
|
51 |
+ OK = 0
|
|
52 |
+ |
|
53 |
+ # A temporary BstError was raised
|
|
54 |
+ FAIL = 1
|
|
55 |
+ |
|
56 |
+ # A SkipJob was raised
|
|
57 |
+ SKIPPED = 3
|
|
58 |
+ |
|
59 |
+ |
|
44 | 60 |
# Used to distinguish between status messages and return values
|
45 | 61 |
class Envelope():
|
46 | 62 |
def __init__(self, message_type, message):
|
... | ... | @@ -116,7 +132,6 @@ class Job(): |
116 | 132 |
self._max_retries = max_retries # Maximum number of automatic retries
|
117 | 133 |
self._result = None # Return value of child action in the parent
|
118 | 134 |
self._tries = 0 # Try count, for retryable jobs
|
119 |
- self._skipped_flag = False # Indicate whether the job was skipped.
|
|
120 | 135 |
self._terminated = False # Whether this job has been explicitly terminated
|
121 | 136 |
|
122 | 137 |
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
|
... | ... | @@ -273,18 +288,6 @@ class Job(): |
273 | 288 |
def set_task_id(self, task_id):
|
274 | 289 |
self._task_id = task_id
|
275 | 290 |
|
276 |
- # skipped
|
|
277 |
- #
|
|
278 |
- # This will evaluate to True if the job was skipped
|
|
279 |
- # during processing, or if it was forcefully terminated.
|
|
280 |
- #
|
|
281 |
- # Returns:
|
|
282 |
- # (bool): Whether the job should appear as skipped
|
|
283 |
- #
|
|
284 |
- @property
|
|
285 |
- def skipped(self):
|
|
286 |
- return self._skipped_flag or self._terminated
|
|
287 |
- |
|
288 | 291 |
#######################################################
|
289 | 292 |
# Abstract Methods #
|
290 | 293 |
#######################################################
|
... | ... | @@ -295,10 +298,10 @@ class Job(): |
295 | 298 |
# pass the result to the main thread.
|
296 | 299 |
#
|
297 | 300 |
# Args:
|
298 |
- # success (bool): Whether the job was successful.
|
|
301 |
+ # status (JobStatus): The job exit status
|
|
299 | 302 |
# result (any): The result returned by child_process().
|
300 | 303 |
#
|
301 |
- def parent_complete(self, success, result):
|
|
304 |
+ def parent_complete(self, status, result):
|
|
302 | 305 |
raise ImplError("Job '{kind}' does not implement parent_complete()"
|
303 | 306 |
.format(kind=type(self).__name__))
|
304 | 307 |
|
... | ... | @@ -562,16 +565,23 @@ class Job(): |
562 | 565 |
#
|
563 | 566 |
self._retry_flag = returncode == RC_FAIL
|
564 | 567 |
|
565 |
- # Set the flag to alert Queue that this job skipped.
|
|
566 |
- self._skipped_flag = returncode == RC_SKIPPED
|
|
567 |
- |
|
568 | 568 |
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
|
569 | 569 |
self.spawn()
|
570 | 570 |
return
|
571 | 571 |
|
572 |
- success = returncode in (RC_OK, RC_SKIPPED)
|
|
573 |
- self.parent_complete(success, self._result)
|
|
574 |
- self._scheduler.job_completed(self, success)
|
|
572 |
+ # Resolve the outward facing overall job completion status
|
|
573 |
+ #
|
|
574 |
+ if returncode == RC_OK:
|
|
575 |
+ status = JobStatus.OK
|
|
576 |
+ elif returncode == RC_SKIPPED:
|
|
577 |
+ status = JobStatus.SKIPPED
|
|
578 |
+ elif returncode in (RC_FAIL, RC_PERM_FAIL):
|
|
579 |
+ status = JobStatus.FAIL
|
|
580 |
+ else:
|
|
581 |
+ status = JobStatus.FAIL
|
|
582 |
+ |
|
583 |
+ self.parent_complete(status, self._result)
|
|
584 |
+ self._scheduler.job_completed(self, status)
|
|
575 | 585 |
|
576 | 586 |
# Force the deletion of the queue and process objects to try and clean up FDs
|
577 | 587 |
self._queue = self._process = None
|
... | ... | @@ -21,7 +21,7 @@ |
21 | 21 |
from datetime import timedelta
|
22 | 22 |
|
23 | 23 |
from . import Queue, QueueStatus
|
24 |
-from ..jobs import ElementJob
|
|
24 |
+from ..jobs import ElementJob, JobStatus
|
|
25 | 25 |
from ..resources import ResourceType
|
26 | 26 |
from ..._message import MessageType
|
27 | 27 |
|
... | ... | @@ -104,7 +104,7 @@ class BuildQueue(Queue): |
104 | 104 |
if artifacts.has_quota_exceeded():
|
105 | 105 |
self._scheduler.check_cache_size()
|
106 | 106 |
|
107 |
- def done(self, job, element, result, success):
|
|
107 |
+ def done(self, job, element, result, status):
|
|
108 | 108 |
|
109 | 109 |
# Inform element in main process that assembly is done
|
110 | 110 |
element._assemble_done()
|
... | ... | @@ -117,5 +117,5 @@ class BuildQueue(Queue): |
117 | 117 |
# artifact cache size for a successful build even though we know a
|
118 | 118 |
# failed build also grows the artifact cache size.
|
119 | 119 |
#
|
120 |
- if success:
|
|
120 |
+ if status == JobStatus.OK:
|
|
121 | 121 |
self._check_cache_size(job, element, result)
|
... | ... | @@ -24,6 +24,7 @@ from ... import Consistency |
24 | 24 |
# Local imports
|
25 | 25 |
from . import Queue, QueueStatus
|
26 | 26 |
from ..resources import ResourceType
|
27 |
+from ..jobs import JobStatus
|
|
27 | 28 |
|
28 | 29 |
|
29 | 30 |
# A queue which fetches element sources
|
... | ... | @@ -66,9 +67,9 @@ class FetchQueue(Queue): |
66 | 67 |
|
67 | 68 |
return QueueStatus.READY
|
68 | 69 |
|
69 |
- def done(self, _, element, result, success):
|
|
70 |
+ def done(self, _, element, result, status):
|
|
70 | 71 |
|
71 |
- if not success:
|
|
72 |
+ if status == JobStatus.FAIL:
|
|
72 | 73 |
return
|
73 | 74 |
|
74 | 75 |
element._update_state()
|
... | ... | @@ -21,6 +21,7 @@ |
21 | 21 |
# Local imports
|
22 | 22 |
from . import Queue, QueueStatus
|
23 | 23 |
from ..resources import ResourceType
|
24 |
+from ..jobs import JobStatus
|
|
24 | 25 |
from ..._exceptions import SkipJob
|
25 | 26 |
|
26 | 27 |
|
... | ... | @@ -54,9 +55,9 @@ class PullQueue(Queue): |
54 | 55 |
else:
|
55 | 56 |
return QueueStatus.SKIP
|
56 | 57 |
|
57 |
- def done(self, _, element, result, success):
|
|
58 |
+ def done(self, _, element, result, status):
|
|
58 | 59 |
|
59 |
- if not success:
|
|
60 |
+ if status == JobStatus.FAIL:
|
|
60 | 61 |
return
|
61 | 62 |
|
62 | 63 |
element._pull_done()
|
... | ... | @@ -64,4 +65,5 @@ class PullQueue(Queue): |
64 | 65 |
# Build jobs will check the "approximate" size first. Since we
|
65 | 66 |
# do not get an artifact size from pull jobs, we have to
|
66 | 67 |
# actually check the cache size.
|
67 |
- self._scheduler.check_cache_size()
|
|
68 |
+ if status == JobStatus.OK:
|
|
69 |
+ self._scheduler.check_cache_size()
|
... | ... | @@ -25,7 +25,7 @@ from enum import Enum |
25 | 25 |
import traceback
|
26 | 26 |
|
27 | 27 |
# Local imports
|
28 |
-from ..jobs import ElementJob
|
|
28 |
+from ..jobs import ElementJob, JobStatus
|
|
29 | 29 |
from ..resources import ResourceType
|
30 | 30 |
|
31 | 31 |
# BuildStream toplevel imports
|
... | ... | @@ -133,10 +133,9 @@ class Queue(): |
133 | 133 |
# job (Job): The job which completed processing
|
134 | 134 |
# element (Element): The element which completed processing
|
135 | 135 |
# result (any): The return value of the process() implementation
|
136 |
- # success (bool): True if the process() implementation did not
|
|
137 |
- # raise any exception
|
|
136 |
+ # status (JobStatus): The return status of the Job
|
|
138 | 137 |
#
|
139 |
- def done(self, job, element, result, success):
|
|
138 |
+ def done(self, job, element, result, status):
|
|
140 | 139 |
pass
|
141 | 140 |
|
142 | 141 |
#####################################################
|
... | ... | @@ -291,7 +290,7 @@ class Queue(): |
291 | 290 |
#
|
292 | 291 |
# See the Job object for an explanation of the call signature
|
293 | 292 |
#
|
294 |
- def _job_done(self, job, element, success, result):
|
|
293 |
+ def _job_done(self, job, element, status, result):
|
|
295 | 294 |
|
296 | 295 |
# Update values that need to be synchronized in the main task
|
297 | 296 |
# before calling any queue implementation
|
... | ... | @@ -301,7 +300,7 @@ class Queue(): |
301 | 300 |
# and determine if it should be considered as processed
|
302 | 301 |
# or skipped.
|
303 | 302 |
try:
|
304 |
- self.done(job, element, result, success)
|
|
303 |
+ self.done(job, element, result, status)
|
|
305 | 304 |
except BstError as e:
|
306 | 305 |
|
307 | 306 |
# Report error and mark as failed
|
... | ... | @@ -332,12 +331,10 @@ class Queue(): |
332 | 331 |
# All jobs get placed on the done queue for later processing.
|
333 | 332 |
self._done_queue.append(job)
|
334 | 333 |
|
335 |
- # A Job can be skipped whether or not it has failed,
|
|
336 |
- # we want to only bookkeep them as processed or failed
|
|
337 |
- # if they are not skipped.
|
|
338 |
- if job.skipped:
|
|
334 |
+ # These lists are for bookkeeping purposes for the UI and logging.
|
|
335 |
+ if status == JobStatus.SKIPPED:
|
|
339 | 336 |
self.skipped_elements.append(element)
|
340 |
- elif success:
|
|
337 |
+ elif status == JobStatus.OK:
|
|
341 | 338 |
self.processed_elements.append(element)
|
342 | 339 |
else:
|
343 | 340 |
self.failed_elements.append(element)
|
... | ... | @@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup |
24 | 24 |
# Local imports
|
25 | 25 |
from . import Queue, QueueStatus
|
26 | 26 |
from ..resources import ResourceType
|
27 |
+from ..jobs import JobStatus
|
|
27 | 28 |
|
28 | 29 |
|
29 | 30 |
# A queue which tracks sources
|
... | ... | @@ -47,9 +48,9 @@ class TrackQueue(Queue): |
47 | 48 |
|
48 | 49 |
return QueueStatus.READY
|
49 | 50 |
|
50 |
- def done(self, _, element, result, success):
|
|
51 |
+ def done(self, _, element, result, status):
|
|
51 | 52 |
|
52 |
- if not success:
|
|
53 |
+ if status == JobStatus.FAIL:
|
|
53 | 54 |
return
|
54 | 55 |
|
55 | 56 |
# Set the new refs in the main process one by one as they complete
|
... | ... | @@ -38,6 +38,16 @@ class SchedStatus(): |
38 | 38 |
TERMINATED = 1
|
39 | 39 |
|
40 | 40 |
|
41 |
+# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
|
|
42 |
+# which we launch dynamically, they have the property of being
|
|
43 |
+# meaningless to queue if one is already queued, and it also
|
|
44 |
+# doesnt make sense to run them in parallel
|
|
45 |
+#
|
|
46 |
+_ACTION_NAME_CLEANUP = 'cleanup'
|
|
47 |
+_ACTION_NAME_CACHE_SIZE = 'cache_size'
|
|
48 |
+_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
|
|
49 |
+ |
|
50 |
+ |
|
41 | 51 |
# Scheduler()
|
42 | 52 |
#
|
43 | 53 |
# The scheduler operates on a list queues, each of which is meant to accomplish
|
... | ... | @@ -94,6 +104,15 @@ class Scheduler(): |
94 | 104 |
self._suspendtime = None
|
95 | 105 |
self._queue_jobs = True # Whether we should continue to queue jobs
|
96 | 106 |
|
107 |
+ # Whether our exclusive jobs, like 'cleanup' are currently already
|
|
108 |
+ # waiting or active.
|
|
109 |
+ #
|
|
110 |
+ # This is just a bit quicker than scanning the wait queue and active
|
|
111 |
+ # queue and comparing job action names.
|
|
112 |
+ #
|
|
113 |
+ self._exclusive_waiting = set()
|
|
114 |
+ self._exclusive_active = set()
|
|
115 |
+ |
|
97 | 116 |
self._resources = Resources(context.sched_builders,
|
98 | 117 |
context.sched_fetchers,
|
99 | 118 |
context.sched_pushers)
|
... | ... | @@ -211,19 +230,6 @@ class Scheduler(): |
211 | 230 |
starttime = timenow
|
212 | 231 |
return timenow - starttime
|
213 | 232 |
|
214 |
- # schedule_jobs()
|
|
215 |
- #
|
|
216 |
- # Args:
|
|
217 |
- # jobs ([Job]): A list of jobs to schedule
|
|
218 |
- #
|
|
219 |
- # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
|
|
220 |
- # run as soon any other queueing jobs finish, provided sufficient
|
|
221 |
- # resources are available for them to run
|
|
222 |
- #
|
|
223 |
- def schedule_jobs(self, jobs):
|
|
224 |
- for job in jobs:
|
|
225 |
- self.waiting_jobs.append(job)
|
|
226 |
- |
|
227 | 233 |
# job_completed():
|
228 | 234 |
#
|
229 | 235 |
# Called when a Job completes
|
... | ... | @@ -231,12 +237,14 @@ class Scheduler(): |
231 | 237 |
# Args:
|
232 | 238 |
# queue (Queue): The Queue holding a complete job
|
233 | 239 |
# job (Job): The completed Job
|
234 |
- # success (bool): Whether the Job completed with a success status
|
|
240 |
+ # status (JobStatus): The status of the completed job
|
|
235 | 241 |
#
|
236 |
- def job_completed(self, job, success):
|
|
242 |
+ def job_completed(self, job, status):
|
|
237 | 243 |
self._resources.clear_job_resources(job)
|
238 | 244 |
self.active_jobs.remove(job)
|
239 |
- self._job_complete_callback(job, success)
|
|
245 |
+ if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
|
246 |
+ self._exclusive_active.remove(job.action_name)
|
|
247 |
+ self._job_complete_callback(job, status)
|
|
240 | 248 |
self._schedule_queue_jobs()
|
241 | 249 |
self._sched()
|
242 | 250 |
|
... | ... | @@ -246,18 +254,13 @@ class Scheduler(): |
246 | 254 |
# size is calculated, a cleanup job will be run automatically
|
247 | 255 |
# if needed.
|
248 | 256 |
#
|
249 |
- # FIXME: This should ensure that only one cache size job
|
|
250 |
- # is ever pending at a given time. If a cache size
|
|
251 |
- # job is already running, it is correct to queue
|
|
252 |
- # a new one, it is incorrect to have more than one
|
|
253 |
- # of these jobs pending at a given time, though.
|
|
254 |
- #
|
|
255 | 257 |
def check_cache_size(self):
|
256 |
- job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
|
|
258 |
+ job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
|
|
259 |
+ 'cache_size/cache_size',
|
|
257 | 260 |
resources=[ResourceType.CACHE,
|
258 | 261 |
ResourceType.PROCESS],
|
259 | 262 |
complete_cb=self._run_cleanup)
|
260 |
- self.schedule_jobs([job])
|
|
263 |
+ self._schedule_jobs([job])
|
|
261 | 264 |
|
262 | 265 |
#######################################################
|
263 | 266 |
# Local Private Methods #
|
... | ... | @@ -276,10 +279,19 @@ class Scheduler(): |
276 | 279 |
if not self._resources.reserve_job_resources(job):
|
277 | 280 |
continue
|
278 | 281 |
|
282 |
+ # Postpone these jobs if one is already running
|
|
283 |
+ if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
|
|
284 |
+ job.action_name in self._exclusive_active:
|
|
285 |
+ continue
|
|
286 |
+ |
|
279 | 287 |
job.spawn()
|
280 | 288 |
self.waiting_jobs.remove(job)
|
281 | 289 |
self.active_jobs.append(job)
|
282 | 290 |
|
291 |
+ if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
|
292 |
+ self._exclusive_waiting.remove(job.action_name)
|
|
293 |
+ self._exclusive_active.add(job.action_name)
|
|
294 |
+ |
|
283 | 295 |
if self._job_start_callback:
|
284 | 296 |
self._job_start_callback(job)
|
285 | 297 |
|
... | ... | @@ -287,6 +299,33 @@ class Scheduler(): |
287 | 299 |
if not self.active_jobs and not self.waiting_jobs:
|
288 | 300 |
self.loop.stop()
|
289 | 301 |
|
302 |
+ # _schedule_jobs()
|
|
303 |
+ #
|
|
304 |
+ # The main entry point for jobs to be scheduled.
|
|
305 |
+ #
|
|
306 |
+ # This is called either as a result of scanning the queues
|
|
307 |
+ # in _schedule_queue_jobs(), or directly by the Scheduler
|
|
308 |
+ # to insert special jobs like cleanups.
|
|
309 |
+ #
|
|
310 |
+ # Args:
|
|
311 |
+ # jobs ([Job]): A list of jobs to schedule
|
|
312 |
+ #
|
|
313 |
+ def _schedule_jobs(self, jobs):
|
|
314 |
+ for job in jobs:
|
|
315 |
+ |
|
316 |
+ # Special treatment of our redundant exclusive jobs
|
|
317 |
+ #
|
|
318 |
+ if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
|
319 |
+ |
|
320 |
+ # Drop the job if one is already queued
|
|
321 |
+ if job.action_name in self._exclusive_waiting:
|
|
322 |
+ continue
|
|
323 |
+ |
|
324 |
+ # Mark this action type as queued
|
|
325 |
+ self._exclusive_waiting.add(job.action_name)
|
|
326 |
+ |
|
327 |
+ self.waiting_jobs.append(job)
|
|
328 |
+ |
|
290 | 329 |
# _schedule_queue_jobs()
|
291 | 330 |
#
|
292 | 331 |
# Ask the queues what jobs they want to schedule and schedule
|
... | ... | @@ -331,7 +370,7 @@ class Scheduler(): |
331 | 370 |
# the next queue and process them.
|
332 | 371 |
process_queues = any(q.dequeue_ready() for q in self.queues)
|
333 | 372 |
|
334 |
- self.schedule_jobs(ready)
|
|
373 |
+ self._schedule_jobs(ready)
|
|
335 | 374 |
self._sched()
|
336 | 375 |
|
337 | 376 |
# _run_cleanup()
|
... | ... | @@ -353,11 +392,11 @@ class Scheduler(): |
353 | 392 |
if not artifacts.has_quota_exceeded():
|
354 | 393 |
return
|
355 | 394 |
|
356 |
- job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
|
|
395 |
+ job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
|
|
357 | 396 |
resources=[ResourceType.CACHE,
|
358 | 397 |
ResourceType.PROCESS],
|
359 | 398 |
exclusive_resources=[ResourceType.CACHE])
|
360 |
- self.schedule_jobs([job])
|
|
399 |
+ self._schedule_jobs([job])
|
|
361 | 400 |
|
362 | 401 |
# _suspend_jobs()
|
363 | 402 |
#
|