Phil Dawson pushed to branch phil/712 at BuildStream / buildstream
Commits:
- 
9e60c9ba
by Phil Dawson at 2018-11-27T15:22:47Z
 - 
7666f161
by Phil Dawson at 2018-11-27T15:22:47Z
 - 
391bda38
by Phil Dawson at 2018-11-27T15:22:47Z
 - 
3b63a8fc
by Phil Dawson at 2018-11-27T15:22:47Z
 
8 changed files:
- buildstream/_scheduler/jobs/cachesizejob.py
 - buildstream/_scheduler/jobs/cleanupjob.py
 - buildstream/_scheduler/jobs/elementjob.py
 - buildstream/_scheduler/jobs/job.py
 - buildstream/_scheduler/queues/fetchqueue.py
 - buildstream/_scheduler/queues/queue.py
 - buildstream/_scheduler/scheduler.py
 - setup.py
 
Changes:
| ... | ... | @@ -39,6 +39,3 @@ class CacheSizeJob(Job): | 
| 39 | 39 | 
 | 
| 40 | 40 | 
     def child_process_data(self):
 | 
| 41 | 41 | 
         return {}
 | 
| 42 | 
-  | 
|
| 43 | 
-    def key(self):
 | 
|
| 44 | 
-        return (100, 'cache-size')
 | 
| ... | ... | @@ -32,6 +32,3 @@ class CleanupJob(Job): | 
| 32 | 32 | 
     def parent_complete(self, success, result):
 | 
| 33 | 33 | 
         if success:
 | 
| 34 | 34 | 
             self._artifacts.set_cache_size(result)
 | 
| 35 | 
-  | 
|
| 36 | 
-    def key(self):
 | 
|
| 37 | 
-        return (0, 'cleanup')
 | 
| ... | ... | @@ -22,13 +22,6 @@ from ..._message import Message, MessageType | 
| 22 | 22 | 
 | 
| 23 | 23 | 
 from .job import Job
 | 
| 24 | 24 | 
 | 
| 25 | 
-_ACTIONS = {
 | 
|
| 26 | 
-    "Build": 10,
 | 
|
| 27 | 
-    "Fetch": 20,
 | 
|
| 28 | 
-    "Pull": 30,
 | 
|
| 29 | 
-    "Push": 40,
 | 
|
| 30 | 
-    "Track": 50,
 | 
|
| 31 | 
-}
 | 
|
| 32 | 25 | 
 | 
| 33 | 26 | 
 # ElementJob()
 | 
| 34 | 27 | 
 #
 | 
| ... | ... | @@ -120,6 +113,3 @@ class ElementJob(Job): | 
| 120 | 113 | 
             data['workspace'] = workspace.to_dict()
 | 
| 121 | 114 | 
 | 
| 122 | 115 | 
         return data
 | 
| 123 | 
-  | 
|
| 124 | 
-    def key(self):
 | 
|
| 125 | 
-        return (_ACTIONS.get(self.action_name, 100), self._element.name)
 | 
| ... | ... | @@ -353,10 +353,6 @@ class Job(): | 
| 353 | 353 | 
     def child_process_data(self):
 | 
| 354 | 354 | 
         return {}
 | 
| 355 | 355 | 
 | 
| 356 | 
-    def key(self):
 | 
|
| 357 | 
-        raise ImplError("Job '{kind}' does not implement key()"
 | 
|
| 358 | 
-                        .format(kind=type(self).__name__))
 | 
|
| 359 | 
-  | 
|
| 360 | 356 | 
     #######################################################
 | 
| 361 | 357 | 
     #                  Local Private Methods              #
 | 
| 362 | 358 | 
     #######################################################
 | 
| ... | ... | @@ -33,6 +33,7 @@ class FetchQueue(Queue): | 
| 33 | 33 | 
     action_name = "Fetch"
 | 
| 34 | 34 | 
     complete_name = "Fetched"
 | 
| 35 | 35 | 
     resources = [ResourceType.DOWNLOAD]
 | 
| 36 | 
+    high_priority = True
 | 
|
| 36 | 37 | 
 | 
| 37 | 38 | 
     def __init__(self, scheduler, skip_cached=False):
 | 
| 38 | 39 | 
         super().__init__(scheduler)
 | 
| ... | ... | @@ -58,6 +58,7 @@ class Queue(): | 
| 58 | 58 | 
     action_name = None
 | 
| 59 | 59 | 
     complete_name = None
 | 
| 60 | 60 | 
     resources = []                     # Resources this queues' jobs want
 | 
| 61 | 
+    high_priority = False              # If jobs from this queue should be prioritised by the scheduler
 | 
|
| 61 | 62 | 
 | 
| 62 | 63 | 
     def __init__(self, scheduler):
 | 
| 63 | 64 | 
 | 
| ... | ... | @@ -25,7 +25,6 @@ from itertools import chain | 
| 25 | 25 | 
 import signal
 | 
| 26 | 26 | 
 import datetime
 | 
| 27 | 27 | 
 from contextlib import contextmanager
 | 
| 28 | 
-from sortedcontainers import SortedList
 | 
|
| 29 | 28 | 
 | 
| 30 | 29 | 
 # Local imports
 | 
| 31 | 30 | 
 from .resources import Resources, ResourceType
 | 
| ... | ... | @@ -72,12 +71,13 @@ class Scheduler(): | 
| 72 | 71 | 
         #
 | 
| 73 | 72 | 
         # Public members
 | 
| 74 | 73 | 
         #
 | 
| 75 | 
-        self.active_jobs = []       # Jobs currently being run in the scheduler
 | 
|
| 76 | 
-        self.waiting_jobs = SortedList([], key=lambda job: job.key())  # Jobs waiting for resources
 | 
|
| 77 | 
-        self.queues = None          # Exposed for the frontend to print summaries
 | 
|
| 78 | 
-        self.context = context      # The Context object shared with Queues
 | 
|
| 79 | 
-        self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
 | 
|
| 80 | 
-        self.suspended = False      # Whether the scheduler is currently suspended
 | 
|
| 74 | 
+        self.active_jobs = []           # Jobs currently being run in the scheduler
 | 
|
| 75 | 
+        self.waiting_jobs = []          # Jobs waiting for resources
 | 
|
| 76 | 
+        self.waiting_priority_jobs = [] # High priority jobs waiting for resources
 | 
|
| 77 | 
+        self.queues = None              # Exposed for the frontend to print summaries
 | 
|
| 78 | 
+        self.context = context          # The Context object shared with Queues
 | 
|
| 79 | 
+        self.terminated = False         # Whether the scheduler was asked to terminate or has terminated
 | 
|
| 80 | 
+        self.suspended = False          # Whether the scheduler is currently suspended
 | 
|
| 81 | 81 | 
 | 
| 82 | 82 | 
         # These are shared with the Job, but should probably be removed or made private in some way.
 | 
| 83 | 83 | 
         self.loop = None            # Shared for Job access to observe the message queue
 | 
| ... | ... | @@ -221,9 +221,11 @@ class Scheduler(): | 
| 221 | 221 | 
     # run as soon any other queueing jobs finish, provided sufficient
 | 
| 222 | 222 | 
     # resources are available for them to run
 | 
| 223 | 223 | 
     #
 | 
| 224 | 
-    def schedule_jobs(self, jobs):
 | 
|
| 224 | 
+    def schedule_jobs(self, jobs, priority_jobs):
 | 
|
| 225 | 
+        for job in priority_jobs:
 | 
|
| 226 | 
+            self.waiting_priority_jobs.append(job)
 | 
|
| 225 | 227 | 
         for job in jobs:
 | 
| 226 | 
-            self.waiting_jobs.add(job)
 | 
|
| 228 | 
+            self.waiting_jobs.append(job)
 | 
|
| 227 | 229 | 
 | 
| 228 | 230 | 
     # job_completed():
 | 
| 229 | 231 | 
     #
 | 
| ... | ... | @@ -258,7 +260,7 @@ class Scheduler(): | 
| 258 | 260 | 
                            resources=[ResourceType.CACHE,
 | 
| 259 | 261 | 
                                       ResourceType.PROCESS],
 | 
| 260 | 262 | 
                            complete_cb=self._run_cleanup)
 | 
| 261 | 
-        self.schedule_jobs([job])
 | 
|
| 263 | 
+        self.schedule_jobs([job], [])
 | 
|
| 262 | 264 | 
 | 
| 263 | 265 | 
     #######################################################
 | 
| 264 | 266 | 
     #                  Local Private Methods              #
 | 
| ... | ... | @@ -270,22 +272,27 @@ class Scheduler(): | 
| 270 | 272 | 
     # automatically when Scheduler.run() is called initially,
 | 
| 271 | 273 | 
     #
 | 
| 272 | 274 | 
     def _sched(self):
 | 
| 273 | 
-        for job in self.waiting_jobs:
 | 
|
| 274 | 
-            self._resources.reserve_exclusive_resources(job)
 | 
|
| 275 | 
+        def allocate_resources_and_spawn_jobs(job_list):
 | 
|
| 276 | 
+            for job in job_list:
 | 
|
| 277 | 
+                self._resources.reserve_exclusive_resources(job)
 | 
|
| 278 | 
+  | 
|
| 279 | 
+            for job in job_list:
 | 
|
| 280 | 
+                if not self._resources.reserve_job_resources(job):
 | 
|
| 281 | 
+                    continue
 | 
|
| 275 | 282 | 
 | 
| 276 | 
-        for job in self.waiting_jobs:
 | 
|
| 277 | 
-            if not self._resources.reserve_job_resources(job):
 | 
|
| 278 | 
-                continue
 | 
|
| 283 | 
+                job.spawn()
 | 
|
| 284 | 
+                job_list.remove(job)
 | 
|
| 285 | 
+                self.active_jobs.append(job)
 | 
|
| 279 | 286 | 
 | 
| 280 | 
-            job.spawn()
 | 
|
| 281 | 
-            self.waiting_jobs.remove(job)
 | 
|
| 282 | 
-            self.active_jobs.append(job)
 | 
|
| 287 | 
+                if self._job_start_callback:
 | 
|
| 288 | 
+                    self._job_start_callback(job)
 | 
|
| 283 | 289 | 
 | 
| 284 | 
-            if self._job_start_callback:
 | 
|
| 285 | 
-                self._job_start_callback(job)
 | 
|
| 290 | 
+        # Process jobs from the high priority list first
 | 
|
| 291 | 
+        allocate_resources_and_spawn_jobs(self.waiting_priority_jobs)
 | 
|
| 292 | 
+        allocate_resources_and_spawn_jobs(self.waiting_jobs)
 | 
|
| 286 | 293 | 
 | 
| 287 | 294 | 
         # If nothings ticking, time to bail out
 | 
| 288 | 
-        if not self.active_jobs and not self.waiting_jobs:
 | 
|
| 295 | 
+        if not self.active_jobs and not self.waiting_jobs and not self.waiting_priority_jobs:
 | 
|
| 289 | 296 | 
             self.loop.stop()
 | 
| 290 | 297 | 
 | 
| 291 | 298 | 
     # _schedule_queue_jobs()
 | 
| ... | ... | @@ -299,6 +306,7 @@ class Scheduler(): | 
| 299 | 306 | 
     #
 | 
| 300 | 307 | 
     def _schedule_queue_jobs(self):
 | 
| 301 | 308 | 
         ready = []
 | 
| 309 | 
+        ready_priority = []
 | 
|
| 302 | 310 | 
         process_queues = True
 | 
| 303 | 311 | 
 | 
| 304 | 312 | 
         while self._queue_jobs and process_queues:
 | 
| ... | ... | @@ -323,16 +331,19 @@ class Scheduler(): | 
| 323 | 331 | 
             # to fetch tasks for elements which failed to pull, and
 | 
| 324 | 332 | 
             # thus need all the pulls to complete before ever starting
 | 
| 325 | 333 | 
             # a build
 | 
| 326 | 
-            ready.extend(chain.from_iterable(
 | 
|
| 327 | 
-                queue.pop_ready_jobs() for queue in reversed(self.queues)
 | 
|
| 328 | 
-            ))
 | 
|
| 334 | 
+  | 
|
| 335 | 
+            for queue in reversed(self.queues):
 | 
|
| 336 | 
+                if queue.high_priority:
 | 
|
| 337 | 
+                    ready_priority.extend(queue.pop_ready_jobs())
 | 
|
| 338 | 
+                else:
 | 
|
| 339 | 
+                    ready.extend(queue.pop_ready_jobs())
 | 
|
| 329 | 340 | 
 | 
| 330 | 341 | 
             # pop_ready_jobs() may have skipped jobs, adding them to
 | 
| 331 | 342 | 
             # the done_queue.  Pull these skipped elements forward to
 | 
| 332 | 343 | 
             # the next queue and process them.
 | 
| 333 | 344 | 
             process_queues = any(q.dequeue_ready() for q in self.queues)
 | 
| 334 | 345 | 
 | 
| 335 | 
-        self.schedule_jobs(ready)
 | 
|
| 346 | 
+        self.schedule_jobs(ready, ready_priority)
 | 
|
| 336 | 347 | 
         self._sched()
 | 
| 337 | 348 | 
 | 
| 338 | 349 | 
     # _run_cleanup()
 | 
| ... | ... | @@ -343,7 +343,6 @@ setup(name='BuildStream', | 
| 343 | 343 | 
           'jinja2 >= 2.10',
 | 
| 344 | 344 | 
           'protobuf >= 3.5',
 | 
| 345 | 345 | 
           'grpcio >= 1.10',
 | 
| 346 | 
-          'sortedcontainers >= 1.5.7',
 | 
|
| 347 | 346 | 
       ],
 | 
| 348 | 347 | 
       entry_points=bst_install_entry_points,
 | 
| 349 | 348 | 
       tests_require=dev_requires,
 | 
