[Notes] [Git][BuildStream/buildstream][phil/712] 4 commits: Revert "WIP: use priority queue in schedular"



Title: GitLab

Phil Dawson pushed to branch phil/712 at BuildStream / buildstream

Commits:

8 changed files:

Changes:

  • buildstream/_scheduler/jobs/cachesizejob.py
    ... ... @@ -39,6 +39,3 @@ class CacheSizeJob(Job):
    39 39
     
    
    40 40
         def child_process_data(self):
    
    41 41
             return {}
    42
    -
    
    43
    -    def key(self):
    
    44
    -        return (100, 'cache-size')

  • buildstream/_scheduler/jobs/cleanupjob.py
    ... ... @@ -32,6 +32,3 @@ class CleanupJob(Job):
    32 32
         def parent_complete(self, success, result):
    
    33 33
             if success:
    
    34 34
                 self._artifacts.set_cache_size(result)
    35
    -
    
    36
    -    def key(self):
    
    37
    -        return (0, 'cleanup')

  • buildstream/_scheduler/jobs/elementjob.py
    ... ... @@ -22,13 +22,6 @@ from ..._message import Message, MessageType
    22 22
     
    
    23 23
     from .job import Job
    
    24 24
     
    
    25
    -_ACTIONS = {
    
    26
    -    "Build": 10,
    
    27
    -    "Fetch": 20,
    
    28
    -    "Pull": 30,
    
    29
    -    "Push": 40,
    
    30
    -    "Track": 50,
    
    31
    -}
    
    32 25
     
    
    33 26
     # ElementJob()
    
    34 27
     #
    
    ... ... @@ -120,6 +113,3 @@ class ElementJob(Job):
    120 113
                 data['workspace'] = workspace.to_dict()
    
    121 114
     
    
    122 115
             return data
    123
    -
    
    124
    -    def key(self):
    
    125
    -        return (_ACTIONS.get(self.action_name, 100), self._element.name)

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -353,10 +353,6 @@ class Job():
    353 353
         def child_process_data(self):
    
    354 354
             return {}
    
    355 355
     
    
    356
    -    def key(self):
    
    357
    -        raise ImplError("Job '{kind}' does not implement key()"
    
    358
    -                        .format(kind=type(self).__name__))
    
    359
    -
    
    360 356
         #######################################################
    
    361 357
         #                  Local Private Methods              #
    
    362 358
         #######################################################
    

  • buildstream/_scheduler/queues/fetchqueue.py
    ... ... @@ -33,6 +33,7 @@ class FetchQueue(Queue):
    33 33
         action_name = "Fetch"
    
    34 34
         complete_name = "Fetched"
    
    35 35
         resources = [ResourceType.DOWNLOAD]
    
    36
    +    high_priority = True
    
    36 37
     
    
    37 38
         def __init__(self, scheduler, skip_cached=False):
    
    38 39
             super().__init__(scheduler)
    

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -58,6 +58,7 @@ class Queue():
    58 58
         action_name = None
    
    59 59
         complete_name = None
    
    60 60
         resources = []                     # Resources this queues' jobs want
    
    61
    +    high_priority = False              # If jobs from this queue should be prioritised by the scheduler
    
    61 62
     
    
    62 63
         def __init__(self, scheduler):
    
    63 64
     
    

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -25,7 +25,6 @@ from itertools import chain
    25 25
     import signal
    
    26 26
     import datetime
    
    27 27
     from contextlib import contextmanager
    
    28
    -from sortedcontainers import SortedList
    
    29 28
     
    
    30 29
     # Local imports
    
    31 30
     from .resources import Resources, ResourceType
    
    ... ... @@ -72,12 +71,13 @@ class Scheduler():
    72 71
             #
    
    73 72
             # Public members
    
    74 73
             #
    
    75
    -        self.active_jobs = []       # Jobs currently being run in the scheduler
    
    76
    -        self.waiting_jobs = SortedList([], key=lambda job: job.key())  # Jobs waiting for resources
    
    77
    -        self.queues = None          # Exposed for the frontend to print summaries
    
    78
    -        self.context = context      # The Context object shared with Queues
    
    79
    -        self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
    
    80
    -        self.suspended = False      # Whether the scheduler is currently suspended
    
    74
    +        self.active_jobs = []           # Jobs currently being run in the scheduler
    
    75
    +        self.waiting_jobs = []          # Jobs waiting for resources
    
    76
    +        self.waiting_priority_jobs = [] # High priority jobs waiting for resources
    
    77
    +        self.queues = None              # Exposed for the frontend to print summaries
    
    78
    +        self.context = context          # The Context object shared with Queues
    
    79
    +        self.terminated = False         # Whether the scheduler was asked to terminate or has terminated
    
    80
    +        self.suspended = False          # Whether the scheduler is currently suspended
    
    81 81
     
    
    82 82
             # These are shared with the Job, but should probably be removed or made private in some way.
    
    83 83
             self.loop = None            # Shared for Job access to observe the message queue
    
    ... ... @@ -221,9 +221,11 @@ class Scheduler():
    221 221
         # run as soon any other queueing jobs finish, provided sufficient
    
    222 222
         # resources are available for them to run
    
    223 223
         #
    
    224
    -    def schedule_jobs(self, jobs):
    
    224
    +    def schedule_jobs(self, jobs, priority_jobs):
    
    225
    +        for job in priority_jobs:
    
    226
    +            self.waiting_priority_jobs.append(job)
    
    225 227
             for job in jobs:
    
    226
    -            self.waiting_jobs.add(job)
    
    228
    +            self.waiting_jobs.append(job)
    
    227 229
     
    
    228 230
         # job_completed():
    
    229 231
         #
    
    ... ... @@ -258,7 +260,7 @@ class Scheduler():
    258 260
                                resources=[ResourceType.CACHE,
    
    259 261
                                           ResourceType.PROCESS],
    
    260 262
                                complete_cb=self._run_cleanup)
    
    261
    -        self.schedule_jobs([job])
    
    263
    +        self.schedule_jobs([job], [])
    
    262 264
     
    
    263 265
         #######################################################
    
    264 266
         #                  Local Private Methods              #
    
    ... ... @@ -270,22 +272,27 @@ class Scheduler():
    270 272
         # automatically when Scheduler.run() is called initially,
    
    271 273
         #
    
    272 274
         def _sched(self):
    
    273
    -        for job in self.waiting_jobs:
    
    274
    -            self._resources.reserve_exclusive_resources(job)
    
    275
    +        def allocate_resources_and_spawn_jobs(job_list):
    
    276
    +            for job in job_list:
    
    277
    +                self._resources.reserve_exclusive_resources(job)
    
    278
    +
    
    279
    +            for job in job_list:
    
    280
    +                if not self._resources.reserve_job_resources(job):
    
    281
    +                    continue
    
    275 282
     
    
    276
    -        for job in self.waiting_jobs:
    
    277
    -            if not self._resources.reserve_job_resources(job):
    
    278
    -                continue
    
    283
    +                job.spawn()
    
    284
    +                job_list.remove(job)
    
    285
    +                self.active_jobs.append(job)
    
    279 286
     
    
    280
    -            job.spawn()
    
    281
    -            self.waiting_jobs.remove(job)
    
    282
    -            self.active_jobs.append(job)
    
    287
    +                if self._job_start_callback:
    
    288
    +                    self._job_start_callback(job)
    
    283 289
     
    
    284
    -            if self._job_start_callback:
    
    285
    -                self._job_start_callback(job)
    
    290
    +        # Process jobs from the high priority list first
    
    291
    +        allocate_resources_and_spawn_jobs(self.waiting_priority_jobs)
    
    292
    +        allocate_resources_and_spawn_jobs(self.waiting_jobs)
    
    286 293
     
    
    287 294
             # If nothings ticking, time to bail out
    
    288
    -        if not self.active_jobs and not self.waiting_jobs:
    
    295
    +        if not self.active_jobs and not self.waiting_jobs and not self.waiting_priority_jobs:
    
    289 296
                 self.loop.stop()
    
    290 297
     
    
    291 298
         # _schedule_queue_jobs()
    
    ... ... @@ -299,6 +306,7 @@ class Scheduler():
    299 306
         #
    
    300 307
         def _schedule_queue_jobs(self):
    
    301 308
             ready = []
    
    309
    +        ready_priority = []
    
    302 310
             process_queues = True
    
    303 311
     
    
    304 312
             while self._queue_jobs and process_queues:
    
    ... ... @@ -323,16 +331,19 @@ class Scheduler():
    323 331
                 # to fetch tasks for elements which failed to pull, and
    
    324 332
                 # thus need all the pulls to complete before ever starting
    
    325 333
                 # a build
    
    326
    -            ready.extend(chain.from_iterable(
    
    327
    -                queue.pop_ready_jobs() for queue in reversed(self.queues)
    
    328
    -            ))
    
    334
    +
    
    335
    +            for queue in reversed(self.queues):
    
    336
    +                if queue.high_priority:
    
    337
    +                    ready_priority.extend(queue.pop_ready_jobs())
    
    338
    +                else:
    
    339
    +                    ready.extend(queue.pop_ready_jobs())
    
    329 340
     
    
    330 341
                 # pop_ready_jobs() may have skipped jobs, adding them to
    
    331 342
                 # the done_queue.  Pull these skipped elements forward to
    
    332 343
                 # the next queue and process them.
    
    333 344
                 process_queues = any(q.dequeue_ready() for q in self.queues)
    
    334 345
     
    
    335
    -        self.schedule_jobs(ready)
    
    346
    +        self.schedule_jobs(ready, ready_priority)
    
    336 347
             self._sched()
    
    337 348
     
    
    338 349
         # _run_cleanup()
    

  • setup.py
    ... ... @@ -343,7 +343,6 @@ setup(name='BuildStream',
    343 343
               'jinja2 >= 2.10',
    
    344 344
               'protobuf >= 3.5',
    
    345 345
               'grpcio >= 1.10',
    
    346
    -          'sortedcontainers >= 1.5.7',
    
    347 346
           ],
    
    348 347
           entry_points=bst_install_entry_points,
    
    349 348
           tests_require=dev_requires,
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]