Phil Dawson pushed to branch phil/712 at BuildStream / buildstream
Commits:
-
9e60c9ba
by Phil Dawson at 2018-11-27T15:22:47Z
-
7666f161
by Phil Dawson at 2018-11-27T15:22:47Z
-
391bda38
by Phil Dawson at 2018-11-27T15:22:47Z
-
3b63a8fc
by Phil Dawson at 2018-11-27T15:22:47Z
8 changed files:
- buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/jobs/elementjob.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/fetchqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/scheduler.py
- setup.py
Changes:
... | ... | @@ -39,6 +39,3 @@ class CacheSizeJob(Job): |
39 | 39 |
|
40 | 40 |
def child_process_data(self):
|
41 | 41 |
return {}
|
42 |
- |
|
43 |
- def key(self):
|
|
44 |
- return (100, 'cache-size')
|
... | ... | @@ -32,6 +32,3 @@ class CleanupJob(Job): |
32 | 32 |
def parent_complete(self, success, result):
|
33 | 33 |
if success:
|
34 | 34 |
self._artifacts.set_cache_size(result)
|
35 |
- |
|
36 |
- def key(self):
|
|
37 |
- return (0, 'cleanup')
|
... | ... | @@ -22,13 +22,6 @@ from ..._message import Message, MessageType |
22 | 22 |
|
23 | 23 |
from .job import Job
|
24 | 24 |
|
25 |
-_ACTIONS = {
|
|
26 |
- "Build": 10,
|
|
27 |
- "Fetch": 20,
|
|
28 |
- "Pull": 30,
|
|
29 |
- "Push": 40,
|
|
30 |
- "Track": 50,
|
|
31 |
-}
|
|
32 | 25 |
|
33 | 26 |
# ElementJob()
|
34 | 27 |
#
|
... | ... | @@ -120,6 +113,3 @@ class ElementJob(Job): |
120 | 113 |
data['workspace'] = workspace.to_dict()
|
121 | 114 |
|
122 | 115 |
return data
|
123 |
- |
|
124 |
- def key(self):
|
|
125 |
- return (_ACTIONS.get(self.action_name, 100), self._element.name)
|
... | ... | @@ -353,10 +353,6 @@ class Job(): |
353 | 353 |
def child_process_data(self):
|
354 | 354 |
return {}
|
355 | 355 |
|
356 |
- def key(self):
|
|
357 |
- raise ImplError("Job '{kind}' does not implement key()"
|
|
358 |
- .format(kind=type(self).__name__))
|
|
359 |
- |
|
360 | 356 |
#######################################################
|
361 | 357 |
# Local Private Methods #
|
362 | 358 |
#######################################################
|
... | ... | @@ -33,6 +33,7 @@ class FetchQueue(Queue): |
33 | 33 |
action_name = "Fetch"
|
34 | 34 |
complete_name = "Fetched"
|
35 | 35 |
resources = [ResourceType.DOWNLOAD]
|
36 |
+ high_priority = True
|
|
36 | 37 |
|
37 | 38 |
def __init__(self, scheduler, skip_cached=False):
|
38 | 39 |
super().__init__(scheduler)
|
... | ... | @@ -58,6 +58,7 @@ class Queue(): |
58 | 58 |
action_name = None
|
59 | 59 |
complete_name = None
|
60 | 60 |
resources = [] # Resources this queues' jobs want
|
61 |
+ high_priority = False # If jobs from this queue should be prioritised by the scheduler
|
|
61 | 62 |
|
62 | 63 |
def __init__(self, scheduler):
|
63 | 64 |
|
... | ... | @@ -25,7 +25,6 @@ from itertools import chain |
25 | 25 |
import signal
|
26 | 26 |
import datetime
|
27 | 27 |
from contextlib import contextmanager
|
28 |
-from sortedcontainers import SortedList
|
|
29 | 28 |
|
30 | 29 |
# Local imports
|
31 | 30 |
from .resources import Resources, ResourceType
|
... | ... | @@ -72,12 +71,13 @@ class Scheduler(): |
72 | 71 |
#
|
73 | 72 |
# Public members
|
74 | 73 |
#
|
75 |
- self.active_jobs = [] # Jobs currently being run in the scheduler
|
|
76 |
- self.waiting_jobs = SortedList([], key=lambda job: job.key()) # Jobs waiting for resources
|
|
77 |
- self.queues = None # Exposed for the frontend to print summaries
|
|
78 |
- self.context = context # The Context object shared with Queues
|
|
79 |
- self.terminated = False # Whether the scheduler was asked to terminate or has terminated
|
|
80 |
- self.suspended = False # Whether the scheduler is currently suspended
|
|
74 |
+ self.active_jobs = [] # Jobs currently being run in the scheduler
|
|
75 |
+ self.waiting_jobs = [] # Jobs waiting for resources
|
|
76 |
+ self.waiting_priority_jobs = [] # High priority jobs waiting for resources
|
|
77 |
+ self.queues = None # Exposed for the frontend to print summaries
|
|
78 |
+ self.context = context # The Context object shared with Queues
|
|
79 |
+ self.terminated = False # Whether the scheduler was asked to terminate or has terminated
|
|
80 |
+ self.suspended = False # Whether the scheduler is currently suspended
|
|
81 | 81 |
|
82 | 82 |
# These are shared with the Job, but should probably be removed or made private in some way.
|
83 | 83 |
self.loop = None # Shared for Job access to observe the message queue
|
... | ... | @@ -221,9 +221,11 @@ class Scheduler(): |
221 | 221 |
# run as soon any other queueing jobs finish, provided sufficient
|
222 | 222 |
# resources are available for them to run
|
223 | 223 |
#
|
224 |
- def schedule_jobs(self, jobs):
|
|
224 |
+ def schedule_jobs(self, jobs, priority_jobs):
|
|
225 |
+ for job in priority_jobs:
|
|
226 |
+ self.waiting_priority_jobs.append(job)
|
|
225 | 227 |
for job in jobs:
|
226 |
- self.waiting_jobs.add(job)
|
|
228 |
+ self.waiting_jobs.append(job)
|
|
227 | 229 |
|
228 | 230 |
# job_completed():
|
229 | 231 |
#
|
... | ... | @@ -258,7 +260,7 @@ class Scheduler(): |
258 | 260 |
resources=[ResourceType.CACHE,
|
259 | 261 |
ResourceType.PROCESS],
|
260 | 262 |
complete_cb=self._run_cleanup)
|
261 |
- self.schedule_jobs([job])
|
|
263 |
+ self.schedule_jobs([job], [])
|
|
262 | 264 |
|
263 | 265 |
#######################################################
|
264 | 266 |
# Local Private Methods #
|
... | ... | @@ -270,22 +272,27 @@ class Scheduler(): |
270 | 272 |
# automatically when Scheduler.run() is called initially,
|
271 | 273 |
#
|
272 | 274 |
def _sched(self):
|
273 |
- for job in self.waiting_jobs:
|
|
274 |
- self._resources.reserve_exclusive_resources(job)
|
|
275 |
+ def allocate_resources_and_spawn_jobs(job_list):
|
|
276 |
+ for job in job_list:
|
|
277 |
+ self._resources.reserve_exclusive_resources(job)
|
|
278 |
+ |
|
279 |
+ for job in job_list:
|
|
280 |
+ if not self._resources.reserve_job_resources(job):
|
|
281 |
+ continue
|
|
275 | 282 |
|
276 |
- for job in self.waiting_jobs:
|
|
277 |
- if not self._resources.reserve_job_resources(job):
|
|
278 |
- continue
|
|
283 |
+ job.spawn()
|
|
284 |
+ job_list.remove(job)
|
|
285 |
+ self.active_jobs.append(job)
|
|
279 | 286 |
|
280 |
- job.spawn()
|
|
281 |
- self.waiting_jobs.remove(job)
|
|
282 |
- self.active_jobs.append(job)
|
|
287 |
+ if self._job_start_callback:
|
|
288 |
+ self._job_start_callback(job)
|
|
283 | 289 |
|
284 |
- if self._job_start_callback:
|
|
285 |
- self._job_start_callback(job)
|
|
290 |
+ # Process jobs from the high priority list first
|
|
291 |
+ allocate_resources_and_spawn_jobs(self.waiting_priority_jobs)
|
|
292 |
+ allocate_resources_and_spawn_jobs(self.waiting_jobs)
|
|
286 | 293 |
|
287 | 294 |
# If nothings ticking, time to bail out
|
288 |
- if not self.active_jobs and not self.waiting_jobs:
|
|
295 |
+ if not self.active_jobs and not self.waiting_jobs and not self.waiting_priority_jobs:
|
|
289 | 296 |
self.loop.stop()
|
290 | 297 |
|
291 | 298 |
# _schedule_queue_jobs()
|
... | ... | @@ -299,6 +306,7 @@ class Scheduler(): |
299 | 306 |
#
|
300 | 307 |
def _schedule_queue_jobs(self):
|
301 | 308 |
ready = []
|
309 |
+ ready_priority = []
|
|
302 | 310 |
process_queues = True
|
303 | 311 |
|
304 | 312 |
while self._queue_jobs and process_queues:
|
... | ... | @@ -323,16 +331,19 @@ class Scheduler(): |
323 | 331 |
# to fetch tasks for elements which failed to pull, and
|
324 | 332 |
# thus need all the pulls to complete before ever starting
|
325 | 333 |
# a build
|
326 |
- ready.extend(chain.from_iterable(
|
|
327 |
- queue.pop_ready_jobs() for queue in reversed(self.queues)
|
|
328 |
- ))
|
|
334 |
+ |
|
335 |
+ for queue in reversed(self.queues):
|
|
336 |
+ if queue.high_priority:
|
|
337 |
+ ready_priority.extend(queue.pop_ready_jobs())
|
|
338 |
+ else:
|
|
339 |
+ ready.extend(queue.pop_ready_jobs())
|
|
329 | 340 |
|
330 | 341 |
# pop_ready_jobs() may have skipped jobs, adding them to
|
331 | 342 |
# the done_queue. Pull these skipped elements forward to
|
332 | 343 |
# the next queue and process them.
|
333 | 344 |
process_queues = any(q.dequeue_ready() for q in self.queues)
|
334 | 345 |
|
335 |
- self.schedule_jobs(ready)
|
|
346 |
+ self.schedule_jobs(ready, ready_priority)
|
|
336 | 347 |
self._sched()
|
337 | 348 |
|
338 | 349 |
# _run_cleanup()
|
... | ... | @@ -343,7 +343,6 @@ setup(name='BuildStream', |
343 | 343 |
'jinja2 >= 2.10',
|
344 | 344 |
'protobuf >= 3.5',
|
345 | 345 |
'grpcio >= 1.10',
|
346 |
- 'sortedcontainers >= 1.5.7',
|
|
347 | 346 |
],
|
348 | 347 |
entry_points=bst_install_entry_points,
|
349 | 348 |
tests_require=dev_requires,
|