Phil Dawson pushed to branch phil/712 at BuildStream / buildstream
Commits:
-
9e60c9ba
by Phil Dawson at 2018-11-27T15:22:47Z
-
7666f161
by Phil Dawson at 2018-11-27T15:22:47Z
-
391bda38
by Phil Dawson at 2018-11-27T15:22:47Z
-
3b63a8fc
by Phil Dawson at 2018-11-27T15:22:47Z
8 changed files:
- buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/jobs/elementjob.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/fetchqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/scheduler.py
- setup.py
Changes:
| ... | ... | @@ -39,6 +39,3 @@ class CacheSizeJob(Job): |
| 39 | 39 |
|
| 40 | 40 |
def child_process_data(self):
|
| 41 | 41 |
return {}
|
| 42 |
- |
|
| 43 |
- def key(self):
|
|
| 44 |
- return (100, 'cache-size')
|
| ... | ... | @@ -32,6 +32,3 @@ class CleanupJob(Job): |
| 32 | 32 |
def parent_complete(self, success, result):
|
| 33 | 33 |
if success:
|
| 34 | 34 |
self._artifacts.set_cache_size(result)
|
| 35 |
- |
|
| 36 |
- def key(self):
|
|
| 37 |
- return (0, 'cleanup')
|
| ... | ... | @@ -22,13 +22,6 @@ from ..._message import Message, MessageType |
| 22 | 22 |
|
| 23 | 23 |
from .job import Job
|
| 24 | 24 |
|
| 25 |
-_ACTIONS = {
|
|
| 26 |
- "Build": 10,
|
|
| 27 |
- "Fetch": 20,
|
|
| 28 |
- "Pull": 30,
|
|
| 29 |
- "Push": 40,
|
|
| 30 |
- "Track": 50,
|
|
| 31 |
-}
|
|
| 32 | 25 |
|
| 33 | 26 |
# ElementJob()
|
| 34 | 27 |
#
|
| ... | ... | @@ -120,6 +113,3 @@ class ElementJob(Job): |
| 120 | 113 |
data['workspace'] = workspace.to_dict()
|
| 121 | 114 |
|
| 122 | 115 |
return data
|
| 123 |
- |
|
| 124 |
- def key(self):
|
|
| 125 |
- return (_ACTIONS.get(self.action_name, 100), self._element.name)
|
| ... | ... | @@ -353,10 +353,6 @@ class Job(): |
| 353 | 353 |
def child_process_data(self):
|
| 354 | 354 |
return {}
|
| 355 | 355 |
|
| 356 |
- def key(self):
|
|
| 357 |
- raise ImplError("Job '{kind}' does not implement key()"
|
|
| 358 |
- .format(kind=type(self).__name__))
|
|
| 359 |
- |
|
| 360 | 356 |
#######################################################
|
| 361 | 357 |
# Local Private Methods #
|
| 362 | 358 |
#######################################################
|
| ... | ... | @@ -33,6 +33,7 @@ class FetchQueue(Queue): |
| 33 | 33 |
action_name = "Fetch"
|
| 34 | 34 |
complete_name = "Fetched"
|
| 35 | 35 |
resources = [ResourceType.DOWNLOAD]
|
| 36 |
+ high_priority = True
|
|
| 36 | 37 |
|
| 37 | 38 |
def __init__(self, scheduler, skip_cached=False):
|
| 38 | 39 |
super().__init__(scheduler)
|
| ... | ... | @@ -58,6 +58,7 @@ class Queue(): |
| 58 | 58 |
action_name = None
|
| 59 | 59 |
complete_name = None
|
| 60 | 60 |
resources = [] # Resources this queues' jobs want
|
| 61 |
+ high_priority = False # If jobs from this queue should be prioritised by the scheduler
|
|
| 61 | 62 |
|
| 62 | 63 |
def __init__(self, scheduler):
|
| 63 | 64 |
|
| ... | ... | @@ -25,7 +25,6 @@ from itertools import chain |
| 25 | 25 |
import signal
|
| 26 | 26 |
import datetime
|
| 27 | 27 |
from contextlib import contextmanager
|
| 28 |
-from sortedcontainers import SortedList
|
|
| 29 | 28 |
|
| 30 | 29 |
# Local imports
|
| 31 | 30 |
from .resources import Resources, ResourceType
|
| ... | ... | @@ -72,12 +71,13 @@ class Scheduler(): |
| 72 | 71 |
#
|
| 73 | 72 |
# Public members
|
| 74 | 73 |
#
|
| 75 |
- self.active_jobs = [] # Jobs currently being run in the scheduler
|
|
| 76 |
- self.waiting_jobs = SortedList([], key=lambda job: job.key()) # Jobs waiting for resources
|
|
| 77 |
- self.queues = None # Exposed for the frontend to print summaries
|
|
| 78 |
- self.context = context # The Context object shared with Queues
|
|
| 79 |
- self.terminated = False # Whether the scheduler was asked to terminate or has terminated
|
|
| 80 |
- self.suspended = False # Whether the scheduler is currently suspended
|
|
| 74 |
+ self.active_jobs = [] # Jobs currently being run in the scheduler
|
|
| 75 |
+ self.waiting_jobs = [] # Jobs waiting for resources
|
|
| 76 |
+ self.waiting_priority_jobs = [] # High priority jobs waiting for resources
|
|
| 77 |
+ self.queues = None # Exposed for the frontend to print summaries
|
|
| 78 |
+ self.context = context # The Context object shared with Queues
|
|
| 79 |
+ self.terminated = False # Whether the scheduler was asked to terminate or has terminated
|
|
| 80 |
+ self.suspended = False # Whether the scheduler is currently suspended
|
|
| 81 | 81 |
|
| 82 | 82 |
# These are shared with the Job, but should probably be removed or made private in some way.
|
| 83 | 83 |
self.loop = None # Shared for Job access to observe the message queue
|
| ... | ... | @@ -221,9 +221,11 @@ class Scheduler(): |
| 221 | 221 |
# run as soon any other queueing jobs finish, provided sufficient
|
| 222 | 222 |
# resources are available for them to run
|
| 223 | 223 |
#
|
| 224 |
- def schedule_jobs(self, jobs):
|
|
| 224 |
+ def schedule_jobs(self, jobs, priority_jobs):
|
|
| 225 |
+ for job in priority_jobs:
|
|
| 226 |
+ self.waiting_priority_jobs.append(job)
|
|
| 225 | 227 |
for job in jobs:
|
| 226 |
- self.waiting_jobs.add(job)
|
|
| 228 |
+ self.waiting_jobs.append(job)
|
|
| 227 | 229 |
|
| 228 | 230 |
# job_completed():
|
| 229 | 231 |
#
|
| ... | ... | @@ -258,7 +260,7 @@ class Scheduler(): |
| 258 | 260 |
resources=[ResourceType.CACHE,
|
| 259 | 261 |
ResourceType.PROCESS],
|
| 260 | 262 |
complete_cb=self._run_cleanup)
|
| 261 |
- self.schedule_jobs([job])
|
|
| 263 |
+ self.schedule_jobs([job], [])
|
|
| 262 | 264 |
|
| 263 | 265 |
#######################################################
|
| 264 | 266 |
# Local Private Methods #
|
| ... | ... | @@ -270,22 +272,27 @@ class Scheduler(): |
| 270 | 272 |
# automatically when Scheduler.run() is called initially,
|
| 271 | 273 |
#
|
| 272 | 274 |
def _sched(self):
|
| 273 |
- for job in self.waiting_jobs:
|
|
| 274 |
- self._resources.reserve_exclusive_resources(job)
|
|
| 275 |
+ def allocate_resources_and_spawn_jobs(job_list):
|
|
| 276 |
+ for job in job_list:
|
|
| 277 |
+ self._resources.reserve_exclusive_resources(job)
|
|
| 278 |
+ |
|
| 279 |
+ for job in job_list:
|
|
| 280 |
+ if not self._resources.reserve_job_resources(job):
|
|
| 281 |
+ continue
|
|
| 275 | 282 |
|
| 276 |
- for job in self.waiting_jobs:
|
|
| 277 |
- if not self._resources.reserve_job_resources(job):
|
|
| 278 |
- continue
|
|
| 283 |
+ job.spawn()
|
|
| 284 |
+ job_list.remove(job)
|
|
| 285 |
+ self.active_jobs.append(job)
|
|
| 279 | 286 |
|
| 280 |
- job.spawn()
|
|
| 281 |
- self.waiting_jobs.remove(job)
|
|
| 282 |
- self.active_jobs.append(job)
|
|
| 287 |
+ if self._job_start_callback:
|
|
| 288 |
+ self._job_start_callback(job)
|
|
| 283 | 289 |
|
| 284 |
- if self._job_start_callback:
|
|
| 285 |
- self._job_start_callback(job)
|
|
| 290 |
+ # Process jobs from the high priority list first
|
|
| 291 |
+ allocate_resources_and_spawn_jobs(self.waiting_priority_jobs)
|
|
| 292 |
+ allocate_resources_and_spawn_jobs(self.waiting_jobs)
|
|
| 286 | 293 |
|
| 287 | 294 |
# If nothings ticking, time to bail out
|
| 288 |
- if not self.active_jobs and not self.waiting_jobs:
|
|
| 295 |
+ if not self.active_jobs and not self.waiting_jobs and not self.waiting_priority_jobs:
|
|
| 289 | 296 |
self.loop.stop()
|
| 290 | 297 |
|
| 291 | 298 |
# _schedule_queue_jobs()
|
| ... | ... | @@ -299,6 +306,7 @@ class Scheduler(): |
| 299 | 306 |
#
|
| 300 | 307 |
def _schedule_queue_jobs(self):
|
| 301 | 308 |
ready = []
|
| 309 |
+ ready_priority = []
|
|
| 302 | 310 |
process_queues = True
|
| 303 | 311 |
|
| 304 | 312 |
while self._queue_jobs and process_queues:
|
| ... | ... | @@ -323,16 +331,19 @@ class Scheduler(): |
| 323 | 331 |
# to fetch tasks for elements which failed to pull, and
|
| 324 | 332 |
# thus need all the pulls to complete before ever starting
|
| 325 | 333 |
# a build
|
| 326 |
- ready.extend(chain.from_iterable(
|
|
| 327 |
- queue.pop_ready_jobs() for queue in reversed(self.queues)
|
|
| 328 |
- ))
|
|
| 334 |
+ |
|
| 335 |
+ for queue in reversed(self.queues):
|
|
| 336 |
+ if queue.high_priority:
|
|
| 337 |
+ ready_priority.extend(queue.pop_ready_jobs())
|
|
| 338 |
+ else:
|
|
| 339 |
+ ready.extend(queue.pop_ready_jobs())
|
|
| 329 | 340 |
|
| 330 | 341 |
# pop_ready_jobs() may have skipped jobs, adding them to
|
| 331 | 342 |
# the done_queue. Pull these skipped elements forward to
|
| 332 | 343 |
# the next queue and process them.
|
| 333 | 344 |
process_queues = any(q.dequeue_ready() for q in self.queues)
|
| 334 | 345 |
|
| 335 |
- self.schedule_jobs(ready)
|
|
| 346 |
+ self.schedule_jobs(ready, ready_priority)
|
|
| 336 | 347 |
self._sched()
|
| 337 | 348 |
|
| 338 | 349 |
# _run_cleanup()
|
| ... | ... | @@ -343,7 +343,6 @@ setup(name='BuildStream', |
| 343 | 343 |
'jinja2 >= 2.10',
|
| 344 | 344 |
'protobuf >= 3.5',
|
| 345 | 345 |
'grpcio >= 1.10',
|
| 346 |
- 'sortedcontainers >= 1.5.7',
|
|
| 347 | 346 |
],
|
| 348 | 347 |
entry_points=bst_install_entry_points,
|
| 349 | 348 |
tests_require=dev_requires,
|
