[Notes] [Git][BuildGrid/buildgrid][santigl/scheduler-mutex] Scheduler: add mutex to job queue



Title: GitLab

Santiago Gil pushed to branch santigl/scheduler-mutex at BuildGrid / buildgrid

Commits:

1 changed file:

Changes:

  • buildgrid/server/scheduler.py
    ... ... @@ -27,6 +27,7 @@ from buildgrid._enums import LeaseState, OperationStage
    27 27
     from buildgrid._exceptions import NotFoundError
    
    28 28
     from buildgrid.server.job import Job
    
    29 29
     from buildgrid.utils import BrowserURL
    
    30
    +from threading import Lock
    
    30 31
     
    
    31 32
     
    
    32 33
     class Scheduler:
    
    ... ... @@ -53,6 +54,7 @@ class Scheduler:
    53 54
             self.__jobs_by_name = {}  # Name to Job 1:1 mapping
    
    54 55
     
    
    55 56
             self.__queue = []
    
    57
    +        self.__queue_lock = Lock()
    
    56 58
     
    
    57 59
             self._is_instrumented = monitor
    
    58 60
     
    
    ... ... @@ -297,27 +299,26 @@ class Scheduler:
    297 299
                     worker properties, configuration and state at the time of the
    
    298 300
                     request.
    
    299 301
             """
    
    300
    -        if not self.__queue:
    
    301
    -            return []
    
    302
    -
    
    303
    -        # Looking for the first job that could be assigned to the worker...
    
    304
    -        for job_index, job in enumerate(self.__queue):
    
    305
    -            if self._worker_is_capable(worker_capabilities, job):
    
    306
    -                self.__logger.info("Job scheduled to run: [%s]", job.name)
    
    302
    +        # TODO: Replace with a more efficient way of doing this.
    
    303
    +        with self.__queue_lock:
    
    304
    +            # Looking for the first job that could be assigned to the worker...
    
    305
    +            for job_index, job in enumerate(self.__queue):
    
    306
    +                if self._worker_is_capable(worker_capabilities, job):
    
    307
    +                    self.__logger.info("Job scheduled to run: [%s]", job.name)
    
    307 308
     
    
    308
    -                lease = job.lease
    
    309
    +                    lease = job.lease
    
    309 310
     
    
    310
    -                if not lease:
    
    311
    -                    # For now, one lease at a time:
    
    312
    -                    lease = job.create_lease()
    
    311
    +                    if not lease:
    
    312
    +                        # For now, one lease at a time:
    
    313
    +                        lease = job.create_lease()
    
    313 314
     
    
    314
    -                if lease:
    
    315
    -                    del self.__queue[job_index]
    
    316
    -                    return [lease]
    
    315
    +                    if lease:
    
    316
    +                        del self.__queue[job_index]
    
    317
    +                        return [lease]
    
    317 318
     
    
    318
    -                return None
    
    319
    +                    return []
    
    319 320
     
    
    320
    -        return None
    
    321
    +            return []
    
    321 322
     
    
    322 323
         def update_job_lease_state(self, job_name, lease):
    
    323 324
             """Requests a state transition for a job's current :class:Lease.
    
    ... ... @@ -551,11 +552,12 @@ class Scheduler:
    551 552
             """Schedules or reschedules a job."""
    
    552 553
             job = self.__jobs_by_name[job_name]
    
    553 554
     
    
    554
    -        if job.operation_stage == OperationStage.QUEUED:
    
    555
    -            self.__queue.sort()
    
    555
    +        with self.__queue_lock:
    
    556
    +            if job.operation_stage == OperationStage.QUEUED:
    
    557
    +                self.__queue.sort()
    
    556 558
     
    
    557
    -        else:
    
    558
    -            bisect.insort(self.__queue, job)
    
    559
    +            else:
    
    560
    +                bisect.insort(self.__queue, job)
    
    559 561
     
    
    560 562
             self.__logger.info("Job queued: [%s]", job.name)
    
    561 563
     
    
    ... ... @@ -564,7 +566,8 @@ class Scheduler:
    564 566
             job = self.__jobs_by_name[job_name]
    
    565 567
     
    
    566 568
             if job.operation_stage == OperationStage.QUEUED:
    
    567
    -            self.__queue.remove(job)
    
    569
    +            with self.__queue_lock:
    
    570
    +                self.__queue.remove(job)
    
    568 571
     
    
    569 572
             del self.__jobs_by_action[job.action_digest.hash]
    
    570 573
             del self.__jobs_by_name[job.name]
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]