Tristan Van Berkom pushed to branch master at BuildStream / buildstream
Commits:
- 
059035b9
by Tristan Van Berkom at 2019-01-07T18:02:00Z
 - 
b83d1b1f
by Tristan Van Berkom at 2019-01-07T18:02:00Z
 - 
16a8816f
by Tristan Van Berkom at 2019-01-07T18:02:00Z
 - 
c2fc2a5e
by Tristan Van Berkom at 2019-01-07T18:02:00Z
 - 
3e3984ad
by Tristan Van Berkom at 2019-01-07T18:50:23Z
 
13 changed files:
- buildstream/_frontend/app.py
 - buildstream/_scheduler/__init__.py
 - buildstream/_scheduler/jobs/__init__.py
 - buildstream/_scheduler/jobs/cachesizejob.py
 - buildstream/_scheduler/jobs/cleanupjob.py
 - buildstream/_scheduler/jobs/elementjob.py
 - buildstream/_scheduler/jobs/job.py
 - buildstream/_scheduler/queues/buildqueue.py
 - buildstream/_scheduler/queues/fetchqueue.py
 - buildstream/_scheduler/queues/pullqueue.py
 - buildstream/_scheduler/queues/queue.py
 - buildstream/_scheduler/queues/trackqueue.py
 - buildstream/_scheduler/scheduler.py
 
Changes:
| ... | ... | @@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages | 
| 38 | 38 | 
 from .._stream import Stream
 | 
| 39 | 39 | 
 from .._versions import BST_FORMAT_VERSION
 | 
| 40 | 40 | 
 from .. import _yaml
 | 
| 41 | 
-from .._scheduler import ElementJob
 | 
|
| 41 | 
+from .._scheduler import ElementJob, JobStatus
 | 
|
| 42 | 42 | 
 | 
| 43 | 43 | 
 # Import frontend assets
 | 
| 44 | 44 | 
 from . import Profile, LogLine, Status
 | 
| ... | ... | @@ -515,13 +515,13 @@ class App(): | 
| 515 | 515 | 
         self._status.add_job(job)
 | 
| 516 | 516 | 
         self._maybe_render_status()
 | 
| 517 | 517 | 
 | 
| 518 | 
-    def _job_completed(self, job, success):
 | 
|
| 518 | 
+    def _job_completed(self, job, status):
 | 
|
| 519 | 519 | 
         self._status.remove_job(job)
 | 
| 520 | 520 | 
         self._maybe_render_status()
 | 
| 521 | 521 | 
 | 
| 522 | 522 | 
         # Dont attempt to handle a failure if the user has already opted to
 | 
| 523 | 523 | 
         # terminate
 | 
| 524 | 
-        if not success and not self.stream.terminated:
 | 
|
| 524 | 
+        if status == JobStatus.FAIL and not self.stream.terminated:
 | 
|
| 525 | 525 | 
 | 
| 526 | 526 | 
             if isinstance(job, ElementJob):
 | 
| 527 | 527 | 
                 element = job.element
 | 
| ... | ... | @@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue | 
| 26 | 26 | 
 from .queues.pullqueue import PullQueue
 | 
| 27 | 27 | 
 | 
| 28 | 28 | 
 from .scheduler import Scheduler, SchedStatus
 | 
| 29 | 
-from .jobs import ElementJob
 | 
|
| 29 | 
+from .jobs import ElementJob, JobStatus
 | 
| ... | ... | @@ -20,3 +20,4 @@ | 
| 20 | 20 | 
 from .elementjob import ElementJob
 | 
| 21 | 21 | 
 from .cachesizejob import CacheSizeJob
 | 
| 22 | 22 | 
 from .cleanupjob import CleanupJob
 | 
| 23 | 
+from .job import JobStatus
 | 
| ... | ... | @@ -16,7 +16,7 @@ | 
| 16 | 16 | 
 #  Author:
 | 
| 17 | 17 | 
 #        Tristan Daniël Maat <tristan maat codethink co uk>
 | 
| 18 | 18 | 
 #
 | 
| 19 | 
-from .job import Job
 | 
|
| 19 | 
+from .job import Job, JobStatus
 | 
|
| 20 | 20 | 
 | 
| 21 | 21 | 
 | 
| 22 | 22 | 
 class CacheSizeJob(Job):
 | 
| ... | ... | @@ -30,8 +30,8 @@ class CacheSizeJob(Job): | 
| 30 | 30 | 
     def child_process(self):
 | 
| 31 | 31 | 
         return self._artifacts.compute_cache_size()
 | 
| 32 | 32 | 
 | 
| 33 | 
-    def parent_complete(self, success, result):
 | 
|
| 34 | 
-        if success:
 | 
|
| 33 | 
+    def parent_complete(self, status, result):
 | 
|
| 34 | 
+        if status == JobStatus.OK:
 | 
|
| 35 | 35 | 
             self._artifacts.set_cache_size(result)
 | 
| 36 | 36 | 
 | 
| 37 | 37 | 
             if self._complete_cb:
 | 
| ... | ... | @@ -16,7 +16,7 @@ | 
| 16 | 16 | 
 #  Author:
 | 
| 17 | 17 | 
 #        Tristan Daniël Maat <tristan maat codethink co uk>
 | 
| 18 | 18 | 
 #
 | 
| 19 | 
-from .job import Job
 | 
|
| 19 | 
+from .job import Job, JobStatus
 | 
|
| 20 | 20 | 
 | 
| 21 | 21 | 
 | 
| 22 | 22 | 
 class CleanupJob(Job):
 | 
| ... | ... | @@ -29,6 +29,6 @@ class CleanupJob(Job): | 
| 29 | 29 | 
     def child_process(self):
 | 
| 30 | 30 | 
         return self._artifacts.clean()
 | 
| 31 | 31 | 
 | 
| 32 | 
-    def parent_complete(self, success, result):
 | 
|
| 33 | 
-        if success:
 | 
|
| 32 | 
+    def parent_complete(self, status, result):
 | 
|
| 33 | 
+        if status == JobStatus.OK:
 | 
|
| 34 | 34 | 
             self._artifacts.set_cache_size(result)
 | 
| ... | ... | @@ -60,7 +60,7 @@ from .job import Job | 
| 60 | 60 | 
 #     Args:
 | 
| 61 | 61 | 
 #        job (Job): The job object which completed
 | 
| 62 | 62 | 
 #        element (Element): The element passed to the Job() constructor
 | 
| 63 | 
-#        success (bool): True if the action_cb did not raise an exception
 | 
|
| 63 | 
+#        status (JobStatus): The status of whether the workload raised an exception
 | 
|
| 64 | 64 | 
 #        result (object): The deserialized object returned by the `action_cb`, or None
 | 
| 65 | 65 | 
 #                         if `success` is False
 | 
| 66 | 66 | 
 #
 | 
| ... | ... | @@ -93,8 +93,8 @@ class ElementJob(Job): | 
| 93 | 93 | 
         # Run the action
 | 
| 94 | 94 | 
         return self._action_cb(self._element)
 | 
| 95 | 95 | 
 | 
| 96 | 
-    def parent_complete(self, success, result):
 | 
|
| 97 | 
-        self._complete_cb(self, self._element, success, self._result)
 | 
|
| 96 | 
+    def parent_complete(self, status, result):
 | 
|
| 97 | 
+        self._complete_cb(self, self._element, status, self._result)
 | 
|
| 98 | 98 | 
 | 
| 99 | 99 | 
     def message(self, message_type, message, **kwargs):
 | 
| 100 | 100 | 
         args = dict(kwargs)
 | 
| ... | ... | @@ -41,6 +41,22 @@ RC_PERM_FAIL = 2 | 
| 41 | 41 | 
 RC_SKIPPED = 3
 | 
| 42 | 42 | 
 | 
| 43 | 43 | 
 | 
| 44 | 
+# JobStatus:
 | 
|
| 45 | 
+#
 | 
|
| 46 | 
+# The job completion status, passed back through the
 | 
|
| 47 | 
+# complete callbacks.
 | 
|
| 48 | 
+#
 | 
|
| 49 | 
+class JobStatus():
 | 
|
| 50 | 
+    # Job succeeded
 | 
|
| 51 | 
+    OK = 0
 | 
|
| 52 | 
+  | 
|
| 53 | 
+    # A temporary BstError was raised
 | 
|
| 54 | 
+    FAIL = 1
 | 
|
| 55 | 
+  | 
|
| 56 | 
+    # A SkipJob was raised
 | 
|
| 57 | 
+    SKIPPED = 3
 | 
|
| 58 | 
+  | 
|
| 59 | 
+  | 
|
| 44 | 60 | 
 # Used to distinguish between status messages and return values
 | 
| 45 | 61 | 
 class Envelope():
 | 
| 46 | 62 | 
     def __init__(self, message_type, message):
 | 
| ... | ... | @@ -116,7 +132,6 @@ class Job(): | 
| 116 | 132 | 
         self._max_retries = max_retries        # Maximum number of automatic retries
 | 
| 117 | 133 | 
         self._result = None                    # Return value of child action in the parent
 | 
| 118 | 134 | 
         self._tries = 0                        # Try count, for retryable jobs
 | 
| 119 | 
-        self._skipped_flag = False             # Indicate whether the job was skipped.
 | 
|
| 120 | 135 | 
         self._terminated = False               # Whether this job has been explicitly terminated
 | 
| 121 | 136 | 
 | 
| 122 | 137 | 
         # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
 | 
| ... | ... | @@ -273,18 +288,6 @@ class Job(): | 
| 273 | 288 | 
     def set_task_id(self, task_id):
 | 
| 274 | 289 | 
         self._task_id = task_id
 | 
| 275 | 290 | 
 | 
| 276 | 
-    # skipped
 | 
|
| 277 | 
-    #
 | 
|
| 278 | 
-    # This will evaluate to True if the job was skipped
 | 
|
| 279 | 
-    # during processing, or if it was forcefully terminated.
 | 
|
| 280 | 
-    #
 | 
|
| 281 | 
-    # Returns:
 | 
|
| 282 | 
-    #    (bool): Whether the job should appear as skipped
 | 
|
| 283 | 
-    #
 | 
|
| 284 | 
-    @property
 | 
|
| 285 | 
-    def skipped(self):
 | 
|
| 286 | 
-        return self._skipped_flag or self._terminated
 | 
|
| 287 | 
-  | 
|
| 288 | 291 | 
     #######################################################
 | 
| 289 | 292 | 
     #                  Abstract Methods                   #
 | 
| 290 | 293 | 
     #######################################################
 | 
| ... | ... | @@ -295,10 +298,10 @@ class Job(): | 
| 295 | 298 | 
     # pass the result to the main thread.
 | 
| 296 | 299 | 
     #
 | 
| 297 | 300 | 
     # Args:
 | 
| 298 | 
-    #    success (bool): Whether the job was successful.
 | 
|
| 301 | 
+    #    status (JobStatus): The job exit status
 | 
|
| 299 | 302 | 
     #    result (any): The result returned by child_process().
 | 
| 300 | 303 | 
     #
 | 
| 301 | 
-    def parent_complete(self, success, result):
 | 
|
| 304 | 
+    def parent_complete(self, status, result):
 | 
|
| 302 | 305 | 
         raise ImplError("Job '{kind}' does not implement parent_complete()"
 | 
| 303 | 306 | 
                         .format(kind=type(self).__name__))
 | 
| 304 | 307 | 
 | 
| ... | ... | @@ -562,16 +565,23 @@ class Job(): | 
| 562 | 565 | 
         #
 | 
| 563 | 566 | 
         self._retry_flag = returncode == RC_FAIL
 | 
| 564 | 567 | 
 | 
| 565 | 
-        # Set the flag to alert Queue that this job skipped.
 | 
|
| 566 | 
-        self._skipped_flag = returncode == RC_SKIPPED
 | 
|
| 567 | 
-  | 
|
| 568 | 568 | 
         if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
 | 
| 569 | 569 | 
             self.spawn()
 | 
| 570 | 570 | 
             return
 | 
| 571 | 571 | 
 | 
| 572 | 
-        success = returncode in (RC_OK, RC_SKIPPED)
 | 
|
| 573 | 
-        self.parent_complete(success, self._result)
 | 
|
| 574 | 
-        self._scheduler.job_completed(self, success)
 | 
|
| 572 | 
+        # Resolve the outward facing overall job completion status
 | 
|
| 573 | 
+        #
 | 
|
| 574 | 
+        if returncode == RC_OK:
 | 
|
| 575 | 
+            status = JobStatus.OK
 | 
|
| 576 | 
+        elif returncode == RC_SKIPPED:
 | 
|
| 577 | 
+            status = JobStatus.SKIPPED
 | 
|
| 578 | 
+        elif returncode in (RC_FAIL, RC_PERM_FAIL):
 | 
|
| 579 | 
+            status = JobStatus.FAIL
 | 
|
| 580 | 
+        else:
 | 
|
| 581 | 
+            status = JobStatus.FAIL
 | 
|
| 582 | 
+  | 
|
| 583 | 
+        self.parent_complete(status, self._result)
 | 
|
| 584 | 
+        self._scheduler.job_completed(self, status)
 | 
|
| 575 | 585 | 
 | 
| 576 | 586 | 
         # Force the deletion of the queue and process objects to try and clean up FDs
 | 
| 577 | 587 | 
         self._queue = self._process = None
 | 
| ... | ... | @@ -21,7 +21,7 @@ | 
| 21 | 21 | 
 from datetime import timedelta
 | 
| 22 | 22 | 
 | 
| 23 | 23 | 
 from . import Queue, QueueStatus
 | 
| 24 | 
-from ..jobs import ElementJob
 | 
|
| 24 | 
+from ..jobs import ElementJob, JobStatus
 | 
|
| 25 | 25 | 
 from ..resources import ResourceType
 | 
| 26 | 26 | 
 from ..._message import MessageType
 | 
| 27 | 27 | 
 | 
| ... | ... | @@ -104,7 +104,7 @@ class BuildQueue(Queue): | 
| 104 | 104 | 
         if artifacts.has_quota_exceeded():
 | 
| 105 | 105 | 
             self._scheduler.check_cache_size()
 | 
| 106 | 106 | 
 | 
| 107 | 
-    def done(self, job, element, result, success):
 | 
|
| 107 | 
+    def done(self, job, element, result, status):
 | 
|
| 108 | 108 | 
 | 
| 109 | 109 | 
         # Inform element in main process that assembly is done
 | 
| 110 | 110 | 
         element._assemble_done()
 | 
| ... | ... | @@ -117,5 +117,5 @@ class BuildQueue(Queue): | 
| 117 | 117 | 
         #        artifact cache size for a successful build even though we know a
 | 
| 118 | 118 | 
         #        failed build also grows the artifact cache size.
 | 
| 119 | 119 | 
         #
 | 
| 120 | 
-        if success:
 | 
|
| 120 | 
+        if status == JobStatus.OK:
 | 
|
| 121 | 121 | 
             self._check_cache_size(job, element, result)
 | 
| ... | ... | @@ -24,6 +24,7 @@ from ... import Consistency | 
| 24 | 24 | 
 # Local imports
 | 
| 25 | 25 | 
 from . import Queue, QueueStatus
 | 
| 26 | 26 | 
 from ..resources import ResourceType
 | 
| 27 | 
+from ..jobs import JobStatus
 | 
|
| 27 | 28 | 
 | 
| 28 | 29 | 
 | 
| 29 | 30 | 
 # A queue which fetches element sources
 | 
| ... | ... | @@ -66,9 +67,9 @@ class FetchQueue(Queue): | 
| 66 | 67 | 
 | 
| 67 | 68 | 
         return QueueStatus.READY
 | 
| 68 | 69 | 
 | 
| 69 | 
-    def done(self, _, element, result, success):
 | 
|
| 70 | 
+    def done(self, _, element, result, status):
 | 
|
| 70 | 71 | 
 | 
| 71 | 
-        if not success:
 | 
|
| 72 | 
+        if status == JobStatus.FAIL:
 | 
|
| 72 | 73 | 
             return
 | 
| 73 | 74 | 
 | 
| 74 | 75 | 
         element._update_state()
 | 
| ... | ... | @@ -21,6 +21,7 @@ | 
| 21 | 21 | 
 # Local imports
 | 
| 22 | 22 | 
 from . import Queue, QueueStatus
 | 
| 23 | 23 | 
 from ..resources import ResourceType
 | 
| 24 | 
+from ..jobs import JobStatus
 | 
|
| 24 | 25 | 
 from ..._exceptions import SkipJob
 | 
| 25 | 26 | 
 | 
| 26 | 27 | 
 | 
| ... | ... | @@ -54,9 +55,9 @@ class PullQueue(Queue): | 
| 54 | 55 | 
         else:
 | 
| 55 | 56 | 
             return QueueStatus.SKIP
 | 
| 56 | 57 | 
 | 
| 57 | 
-    def done(self, _, element, result, success):
 | 
|
| 58 | 
+    def done(self, _, element, result, status):
 | 
|
| 58 | 59 | 
 | 
| 59 | 
-        if not success:
 | 
|
| 60 | 
+        if status == JobStatus.FAIL:
 | 
|
| 60 | 61 | 
             return
 | 
| 61 | 62 | 
 | 
| 62 | 63 | 
         element._pull_done()
 | 
| ... | ... | @@ -64,4 +65,5 @@ class PullQueue(Queue): | 
| 64 | 65 | 
         # Build jobs will check the "approximate" size first. Since we
 | 
| 65 | 66 | 
         # do not get an artifact size from pull jobs, we have to
 | 
| 66 | 67 | 
         # actually check the cache size.
 | 
| 67 | 
-        self._scheduler.check_cache_size()
 | 
|
| 68 | 
+        if status == JobStatus.OK:
 | 
|
| 69 | 
+            self._scheduler.check_cache_size()
 | 
| ... | ... | @@ -25,7 +25,7 @@ from enum import Enum | 
| 25 | 25 | 
 import traceback
 | 
| 26 | 26 | 
 | 
| 27 | 27 | 
 # Local imports
 | 
| 28 | 
-from ..jobs import ElementJob
 | 
|
| 28 | 
+from ..jobs import ElementJob, JobStatus
 | 
|
| 29 | 29 | 
 from ..resources import ResourceType
 | 
| 30 | 30 | 
 | 
| 31 | 31 | 
 # BuildStream toplevel imports
 | 
| ... | ... | @@ -133,10 +133,9 @@ class Queue(): | 
| 133 | 133 | 
     #    job (Job): The job which completed processing
 | 
| 134 | 134 | 
     #    element (Element): The element which completed processing
 | 
| 135 | 135 | 
     #    result (any): The return value of the process() implementation
 | 
| 136 | 
-    #    success (bool): True if the process() implementation did not
 | 
|
| 137 | 
-    #                    raise any exception
 | 
|
| 136 | 
+    #    status (JobStatus): The return status of the Job
 | 
|
| 138 | 137 | 
     #
 | 
| 139 | 
-    def done(self, job, element, result, success):
 | 
|
| 138 | 
+    def done(self, job, element, result, status):
 | 
|
| 140 | 139 | 
         pass
 | 
| 141 | 140 | 
 | 
| 142 | 141 | 
     #####################################################
 | 
| ... | ... | @@ -291,7 +290,7 @@ class Queue(): | 
| 291 | 290 | 
     #
 | 
| 292 | 291 | 
     # See the Job object for an explanation of the call signature
 | 
| 293 | 292 | 
     #
 | 
| 294 | 
-    def _job_done(self, job, element, success, result):
 | 
|
| 293 | 
+    def _job_done(self, job, element, status, result):
 | 
|
| 295 | 294 | 
 | 
| 296 | 295 | 
         # Update values that need to be synchronized in the main task
 | 
| 297 | 296 | 
         # before calling any queue implementation
 | 
| ... | ... | @@ -301,7 +300,7 @@ class Queue(): | 
| 301 | 300 | 
         # and determine if it should be considered as processed
 | 
| 302 | 301 | 
         # or skipped.
 | 
| 303 | 302 | 
         try:
 | 
| 304 | 
-            self.done(job, element, result, success)
 | 
|
| 303 | 
+            self.done(job, element, result, status)
 | 
|
| 305 | 304 | 
         except BstError as e:
 | 
| 306 | 305 | 
 | 
| 307 | 306 | 
             # Report error and mark as failed
 | 
| ... | ... | @@ -332,12 +331,10 @@ class Queue(): | 
| 332 | 331 | 
             # All jobs get placed on the done queue for later processing.
 | 
| 333 | 332 | 
             self._done_queue.append(job)
 | 
| 334 | 333 | 
 | 
| 335 | 
-            # A Job can be skipped whether or not it has failed,
 | 
|
| 336 | 
-            # we want to only bookkeep them as processed or failed
 | 
|
| 337 | 
-            # if they are not skipped.
 | 
|
| 338 | 
-            if job.skipped:
 | 
|
| 334 | 
+            # These lists are for bookkeeping purposes for the UI and logging.
 | 
|
| 335 | 
+            if status == JobStatus.SKIPPED:
 | 
|
| 339 | 336 | 
                 self.skipped_elements.append(element)
 | 
| 340 | 
-            elif success:
 | 
|
| 337 | 
+            elif status == JobStatus.OK:
 | 
|
| 341 | 338 | 
                 self.processed_elements.append(element)
 | 
| 342 | 339 | 
             else:
 | 
| 343 | 340 | 
                 self.failed_elements.append(element)
 | 
| ... | ... | @@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup | 
| 24 | 24 | 
 # Local imports
 | 
| 25 | 25 | 
 from . import Queue, QueueStatus
 | 
| 26 | 26 | 
 from ..resources import ResourceType
 | 
| 27 | 
+from ..jobs import JobStatus
 | 
|
| 27 | 28 | 
 | 
| 28 | 29 | 
 | 
| 29 | 30 | 
 # A queue which tracks sources
 | 
| ... | ... | @@ -47,9 +48,9 @@ class TrackQueue(Queue): | 
| 47 | 48 | 
 | 
| 48 | 49 | 
         return QueueStatus.READY
 | 
| 49 | 50 | 
 | 
| 50 | 
-    def done(self, _, element, result, success):
 | 
|
| 51 | 
+    def done(self, _, element, result, status):
 | 
|
| 51 | 52 | 
 | 
| 52 | 
-        if not success:
 | 
|
| 53 | 
+        if status == JobStatus.FAIL:
 | 
|
| 53 | 54 | 
             return
 | 
| 54 | 55 | 
 | 
| 55 | 56 | 
         # Set the new refs in the main process one by one as they complete
 | 
| ... | ... | @@ -38,6 +38,16 @@ class SchedStatus(): | 
| 38 | 38 | 
     TERMINATED = 1
 | 
| 39 | 39 | 
 | 
| 40 | 40 | 
 | 
| 41 | 
+# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
 | 
|
| 42 | 
+# which we launch dynamically, they have the property of being
 | 
|
| 43 | 
+# meaningless to queue if one is already queued, and it also
 | 
|
| 44 | 
+# doesnt make sense to run them in parallel
 | 
|
| 45 | 
+#
 | 
|
| 46 | 
+_ACTION_NAME_CLEANUP = 'cleanup'
 | 
|
| 47 | 
+_ACTION_NAME_CACHE_SIZE = 'cache_size'
 | 
|
| 48 | 
+_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
 | 
|
| 49 | 
+  | 
|
| 50 | 
+  | 
|
| 41 | 51 | 
 # Scheduler()
 | 
| 42 | 52 | 
 #
 | 
| 43 | 53 | 
 # The scheduler operates on a list queues, each of which is meant to accomplish
 | 
| ... | ... | @@ -94,6 +104,15 @@ class Scheduler(): | 
| 94 | 104 | 
         self._suspendtime = None
 | 
| 95 | 105 | 
         self._queue_jobs = True      # Whether we should continue to queue jobs
 | 
| 96 | 106 | 
 | 
| 107 | 
+        # Whether our exclusive jobs, like 'cleanup' are currently already
 | 
|
| 108 | 
+        # waiting or active.
 | 
|
| 109 | 
+        #
 | 
|
| 110 | 
+        # This is just a bit quicker than scanning the wait queue and active
 | 
|
| 111 | 
+        # queue and comparing job action names.
 | 
|
| 112 | 
+        #
 | 
|
| 113 | 
+        self._exclusive_waiting = set()
 | 
|
| 114 | 
+        self._exclusive_active = set()
 | 
|
| 115 | 
+  | 
|
| 97 | 116 | 
         self._resources = Resources(context.sched_builders,
 | 
| 98 | 117 | 
                                     context.sched_fetchers,
 | 
| 99 | 118 | 
                                     context.sched_pushers)
 | 
| ... | ... | @@ -211,19 +230,6 @@ class Scheduler(): | 
| 211 | 230 | 
             starttime = timenow
 | 
| 212 | 231 | 
         return timenow - starttime
 | 
| 213 | 232 | 
 | 
| 214 | 
-    # schedule_jobs()
 | 
|
| 215 | 
-    #
 | 
|
| 216 | 
-    # Args:
 | 
|
| 217 | 
-    #     jobs ([Job]): A list of jobs to schedule
 | 
|
| 218 | 
-    #
 | 
|
| 219 | 
-    # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
 | 
|
| 220 | 
-    # run as soon any other queueing jobs finish, provided sufficient
 | 
|
| 221 | 
-    # resources are available for them to run
 | 
|
| 222 | 
-    #
 | 
|
| 223 | 
-    def schedule_jobs(self, jobs):
 | 
|
| 224 | 
-        for job in jobs:
 | 
|
| 225 | 
-            self.waiting_jobs.append(job)
 | 
|
| 226 | 
-  | 
|
| 227 | 233 | 
     # job_completed():
 | 
| 228 | 234 | 
     #
 | 
| 229 | 235 | 
     # Called when a Job completes
 | 
| ... | ... | @@ -231,12 +237,14 @@ class Scheduler(): | 
| 231 | 237 | 
     # Args:
 | 
| 232 | 238 | 
     #    queue (Queue): The Queue holding a complete job
 | 
| 233 | 239 | 
     #    job (Job): The completed Job
 | 
| 234 | 
-    #    success (bool): Whether the Job completed with a success status
 | 
|
| 240 | 
+    #    status (JobStatus): The status of the completed job
 | 
|
| 235 | 241 | 
     #
 | 
| 236 | 
-    def job_completed(self, job, success):
 | 
|
| 242 | 
+    def job_completed(self, job, status):
 | 
|
| 237 | 243 | 
         self._resources.clear_job_resources(job)
 | 
| 238 | 244 | 
         self.active_jobs.remove(job)
 | 
| 239 | 
-        self._job_complete_callback(job, success)
 | 
|
| 245 | 
+        if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
 | 
|
| 246 | 
+            self._exclusive_active.remove(job.action_name)
 | 
|
| 247 | 
+        self._job_complete_callback(job, status)
 | 
|
| 240 | 248 | 
         self._schedule_queue_jobs()
 | 
| 241 | 249 | 
         self._sched()
 | 
| 242 | 250 | 
 | 
| ... | ... | @@ -246,18 +254,13 @@ class Scheduler(): | 
| 246 | 254 | 
     # size is calculated, a cleanup job will be run automatically
 | 
| 247 | 255 | 
     # if needed.
 | 
| 248 | 256 | 
     #
 | 
| 249 | 
-    # FIXME: This should ensure that only one cache size job
 | 
|
| 250 | 
-    #        is ever pending at a given time. If a cache size
 | 
|
| 251 | 
-    #        job is already running, it is correct to queue
 | 
|
| 252 | 
-    #        a new one, it is incorrect to have more than one
 | 
|
| 253 | 
-    #        of these jobs pending at a given time, though.
 | 
|
| 254 | 
-    #
 | 
|
| 255 | 257 | 
     def check_cache_size(self):
 | 
| 256 | 
-        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
 | 
|
| 258 | 
+        job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
 | 
|
| 259 | 
+                           'cache_size/cache_size',
 | 
|
| 257 | 260 | 
                            resources=[ResourceType.CACHE,
 | 
| 258 | 261 | 
                                       ResourceType.PROCESS],
 | 
| 259 | 262 | 
                            complete_cb=self._run_cleanup)
 | 
| 260 | 
-        self.schedule_jobs([job])
 | 
|
| 263 | 
+        self._schedule_jobs([job])
 | 
|
| 261 | 264 | 
 | 
| 262 | 265 | 
     #######################################################
 | 
| 263 | 266 | 
     #                  Local Private Methods              #
 | 
| ... | ... | @@ -276,10 +279,19 @@ class Scheduler(): | 
| 276 | 279 | 
             if not self._resources.reserve_job_resources(job):
 | 
| 277 | 280 | 
                 continue
 | 
| 278 | 281 | 
 | 
| 282 | 
+            # Postpone these jobs if one is already running
 | 
|
| 283 | 
+            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
 | 
|
| 284 | 
+               job.action_name in self._exclusive_active:
 | 
|
| 285 | 
+                continue
 | 
|
| 286 | 
+  | 
|
| 279 | 287 | 
             job.spawn()
 | 
| 280 | 288 | 
             self.waiting_jobs.remove(job)
 | 
| 281 | 289 | 
             self.active_jobs.append(job)
 | 
| 282 | 290 | 
 | 
| 291 | 
+            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
 | 
|
| 292 | 
+                self._exclusive_waiting.remove(job.action_name)
 | 
|
| 293 | 
+                self._exclusive_active.add(job.action_name)
 | 
|
| 294 | 
+  | 
|
| 283 | 295 | 
             if self._job_start_callback:
 | 
| 284 | 296 | 
                 self._job_start_callback(job)
 | 
| 285 | 297 | 
 | 
| ... | ... | @@ -287,6 +299,33 @@ class Scheduler(): | 
| 287 | 299 | 
         if not self.active_jobs and not self.waiting_jobs:
 | 
| 288 | 300 | 
             self.loop.stop()
 | 
| 289 | 301 | 
 | 
| 302 | 
+    # _schedule_jobs()
 | 
|
| 303 | 
+    #
 | 
|
| 304 | 
+    # The main entry point for jobs to be scheduled.
 | 
|
| 305 | 
+    #
 | 
|
| 306 | 
+    # This is called either as a result of scanning the queues
 | 
|
| 307 | 
+    # in _schedule_queue_jobs(), or directly by the Scheduler
 | 
|
| 308 | 
+    # to insert special jobs like cleanups.
 | 
|
| 309 | 
+    #
 | 
|
| 310 | 
+    # Args:
 | 
|
| 311 | 
+    #     jobs ([Job]): A list of jobs to schedule
 | 
|
| 312 | 
+    #
 | 
|
| 313 | 
+    def _schedule_jobs(self, jobs):
 | 
|
| 314 | 
+        for job in jobs:
 | 
|
| 315 | 
+  | 
|
| 316 | 
+            # Special treatment of our redundant exclusive jobs
 | 
|
| 317 | 
+            #
 | 
|
| 318 | 
+            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
 | 
|
| 319 | 
+  | 
|
| 320 | 
+                # Drop the job if one is already queued
 | 
|
| 321 | 
+                if job.action_name in self._exclusive_waiting:
 | 
|
| 322 | 
+                    continue
 | 
|
| 323 | 
+  | 
|
| 324 | 
+                # Mark this action type as queued
 | 
|
| 325 | 
+                self._exclusive_waiting.add(job.action_name)
 | 
|
| 326 | 
+  | 
|
| 327 | 
+            self.waiting_jobs.append(job)
 | 
|
| 328 | 
+  | 
|
| 290 | 329 | 
     # _schedule_queue_jobs()
 | 
| 291 | 330 | 
     #
 | 
| 292 | 331 | 
     # Ask the queues what jobs they want to schedule and schedule
 | 
| ... | ... | @@ -331,7 +370,7 @@ class Scheduler(): | 
| 331 | 370 | 
             # the next queue and process them.
 | 
| 332 | 371 | 
             process_queues = any(q.dequeue_ready() for q in self.queues)
 | 
| 333 | 372 | 
 | 
| 334 | 
-        self.schedule_jobs(ready)
 | 
|
| 373 | 
+        self._schedule_jobs(ready)
 | 
|
| 335 | 374 | 
         self._sched()
 | 
| 336 | 375 | 
 | 
| 337 | 376 | 
     # _run_cleanup()
 | 
| ... | ... | @@ -353,11 +392,11 @@ class Scheduler(): | 
| 353 | 392 | 
         if not artifacts.has_quota_exceeded():
 | 
| 354 | 393 | 
             return
 | 
| 355 | 394 | 
 | 
| 356 | 
-        job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
 | 
|
| 395 | 
+        job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
 | 
|
| 357 | 396 | 
                          resources=[ResourceType.CACHE,
 | 
| 358 | 397 | 
                                     ResourceType.PROCESS],
 | 
| 359 | 398 | 
                          exclusive_resources=[ResourceType.CACHE])
 | 
| 360 | 
-        self.schedule_jobs([job])
 | 
|
| 399 | 
+        self._schedule_jobs([job])
 | 
|
| 361 | 400 | 
 | 
| 362 | 401 | 
     # _suspend_jobs()
 | 
| 363 | 402 | 
     #
 | 
