... |
... |
@@ -22,6 +22,7 @@ Schedules jobs. |
22
|
22
|
import bisect
|
23
|
23
|
from datetime import timedelta
|
24
|
24
|
import logging
|
|
25
|
+from threading import Lock
|
25
|
26
|
|
26
|
27
|
from buildgrid._enums import LeaseState, OperationStage
|
27
|
28
|
from buildgrid._exceptions import NotFoundError
|
... |
... |
@@ -53,6 +54,7 @@ class Scheduler: |
53
|
54
|
self.__jobs_by_name = {} # Name to Job 1:1 mapping
|
54
|
55
|
|
55
|
56
|
self.__queue = []
|
|
57
|
+ self.__queue_lock = Lock()
|
56
|
58
|
|
57
|
59
|
self._is_instrumented = monitor
|
58
|
60
|
|
... |
... |
@@ -297,27 +299,26 @@ class Scheduler: |
297
|
299
|
worker properties, configuration and state at the time of the
|
298
|
300
|
request.
|
299
|
301
|
"""
|
300
|
|
- if not self.__queue:
|
301
|
|
- return []
|
302
|
|
-
|
303
|
|
- # Looking for the first job that could be assigned to the worker...
|
304
|
|
- for job_index, job in enumerate(self.__queue):
|
305
|
|
- if self._worker_is_capable(worker_capabilities, job):
|
306
|
|
- self.__logger.info("Job scheduled to run: [%s]", job.name)
|
|
302
|
+ # TODO: Replace with a more efficient way of doing this.
|
|
303
|
+ with self.__queue_lock:
|
|
304
|
+ # Looking for the first job that could be assigned to the worker...
|
|
305
|
+ for job_index, job in enumerate(self.__queue):
|
|
306
|
+ if self._worker_is_capable(worker_capabilities, job):
|
|
307
|
+ self.__logger.info("Job scheduled to run: [%s]", job.name)
|
307
|
308
|
|
308
|
|
- lease = job.lease
|
|
309
|
+ lease = job.lease
|
309
|
310
|
|
310
|
|
- if not lease:
|
311
|
|
- # For now, one lease at a time:
|
312
|
|
- lease = job.create_lease()
|
|
311
|
+ if not lease:
|
|
312
|
+ # For now, one lease at a time:
|
|
313
|
+ lease = job.create_lease()
|
313
|
314
|
|
314
|
|
- if lease:
|
315
|
|
- del self.__queue[job_index]
|
316
|
|
- return [lease]
|
|
315
|
+ if lease:
|
|
316
|
+ del self.__queue[job_index]
|
|
317
|
+ return [lease]
|
317
|
318
|
|
318
|
|
- return None
|
|
319
|
+ return []
|
319
|
320
|
|
320
|
|
- return None
|
|
321
|
+ return []
|
321
|
322
|
|
322
|
323
|
def update_job_lease_state(self, job_name, lease):
|
323
|
324
|
"""Requests a state transition for a job's current :class:Lease.
|
... |
... |
@@ -551,11 +552,12 @@ class Scheduler: |
551
|
552
|
"""Schedules or reschedules a job."""
|
552
|
553
|
job = self.__jobs_by_name[job_name]
|
553
|
554
|
|
554
|
|
- if job.operation_stage == OperationStage.QUEUED:
|
555
|
|
- self.__queue.sort()
|
|
555
|
+ with self.__queue_lock:
|
|
556
|
+ if job.operation_stage == OperationStage.QUEUED:
|
|
557
|
+ self.__queue.sort()
|
556
|
558
|
|
557
|
|
- else:
|
558
|
|
- bisect.insort(self.__queue, job)
|
|
559
|
+ else:
|
|
560
|
+ bisect.insort(self.__queue, job)
|
559
|
561
|
|
560
|
562
|
self.__logger.info("Job queued: [%s]", job.name)
|
561
|
563
|
|
... |
... |
@@ -564,7 +566,8 @@ class Scheduler: |
564
|
566
|
job = self.__jobs_by_name[job_name]
|
565
|
567
|
|
566
|
568
|
if job.operation_stage == OperationStage.QUEUED:
|
567
|
|
- self.__queue.remove(job)
|
|
569
|
+ with self.__queue_lock:
|
|
570
|
+ self.__queue.remove(job)
|
568
|
571
|
|
569
|
572
|
del self.__jobs_by_action[job.action_digest.hash]
|
570
|
573
|
del self.__jobs_by_name[job.name]
|