[Notes] [Git][BuildStream/buildstream][master] 5 commits: _scheduler/scheduler.py: Make _schedule_jobs() private



Title: GitLab

Tristan Van Berkom pushed to branch master at BuildStream / buildstream

Commits:

13 changed files:

Changes:

  • buildstream/_frontend/app.py
    ... ... @@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages
    38 38
     from .._stream import Stream
    
    39 39
     from .._versions import BST_FORMAT_VERSION
    
    40 40
     from .. import _yaml
    
    41
    -from .._scheduler import ElementJob
    
    41
    +from .._scheduler import ElementJob, JobStatus
    
    42 42
     
    
    43 43
     # Import frontend assets
    
    44 44
     from . import Profile, LogLine, Status
    
    ... ... @@ -515,13 +515,13 @@ class App():
    515 515
             self._status.add_job(job)
    
    516 516
             self._maybe_render_status()
    
    517 517
     
    
    518
    -    def _job_completed(self, job, success):
    
    518
    +    def _job_completed(self, job, status):
    
    519 519
             self._status.remove_job(job)
    
    520 520
             self._maybe_render_status()
    
    521 521
     
    
    522 522
             # Dont attempt to handle a failure if the user has already opted to
    
    523 523
             # terminate
    
    524
    -        if not success and not self.stream.terminated:
    
    524
    +        if status == JobStatus.FAIL and not self.stream.terminated:
    
    525 525
     
    
    526 526
                 if isinstance(job, ElementJob):
    
    527 527
                     element = job.element
    

  • buildstream/_scheduler/__init__.py
    ... ... @@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue
    26 26
     from .queues.pullqueue import PullQueue
    
    27 27
     
    
    28 28
     from .scheduler import Scheduler, SchedStatus
    
    29
    -from .jobs import ElementJob
    29
    +from .jobs import ElementJob, JobStatus

  • buildstream/_scheduler/jobs/__init__.py
    ... ... @@ -20,3 +20,4 @@
    20 20
     from .elementjob import ElementJob
    
    21 21
     from .cachesizejob import CacheSizeJob
    
    22 22
     from .cleanupjob import CleanupJob
    
    23
    +from .job import JobStatus

  • buildstream/_scheduler/jobs/cachesizejob.py
    ... ... @@ -16,7 +16,7 @@
    16 16
     #  Author:
    
    17 17
     #        Tristan Daniël Maat <tristan maat codethink co uk>
    
    18 18
     #
    
    19
    -from .job import Job
    
    19
    +from .job import Job, JobStatus
    
    20 20
     
    
    21 21
     
    
    22 22
     class CacheSizeJob(Job):
    
    ... ... @@ -30,8 +30,8 @@ class CacheSizeJob(Job):
    30 30
         def child_process(self):
    
    31 31
             return self._artifacts.compute_cache_size()
    
    32 32
     
    
    33
    -    def parent_complete(self, success, result):
    
    34
    -        if success:
    
    33
    +    def parent_complete(self, status, result):
    
    34
    +        if status == JobStatus.OK:
    
    35 35
                 self._artifacts.set_cache_size(result)
    
    36 36
     
    
    37 37
                 if self._complete_cb:
    

  • buildstream/_scheduler/jobs/cleanupjob.py
    ... ... @@ -16,7 +16,7 @@
    16 16
     #  Author:
    
    17 17
     #        Tristan Daniël Maat <tristan maat codethink co uk>
    
    18 18
     #
    
    19
    -from .job import Job
    
    19
    +from .job import Job, JobStatus
    
    20 20
     
    
    21 21
     
    
    22 22
     class CleanupJob(Job):
    
    ... ... @@ -29,6 +29,6 @@ class CleanupJob(Job):
    29 29
         def child_process(self):
    
    30 30
             return self._artifacts.clean()
    
    31 31
     
    
    32
    -    def parent_complete(self, success, result):
    
    33
    -        if success:
    
    32
    +    def parent_complete(self, status, result):
    
    33
    +        if status == JobStatus.OK:
    
    34 34
                 self._artifacts.set_cache_size(result)

  • buildstream/_scheduler/jobs/elementjob.py
    ... ... @@ -60,7 +60,7 @@ from .job import Job
    60 60
     #     Args:
    
    61 61
     #        job (Job): The job object which completed
    
    62 62
     #        element (Element): The element passed to the Job() constructor
    
    63
    -#        success (bool): True if the action_cb did not raise an exception
    
    63
    +#        status (JobStatus): The status of whether the workload raised an exception
    
    64 64
     #        result (object): The deserialized object returned by the `action_cb`, or None
    
    65 65
     #                         if `success` is False
    
    66 66
     #
    
    ... ... @@ -93,8 +93,8 @@ class ElementJob(Job):
    93 93
             # Run the action
    
    94 94
             return self._action_cb(self._element)
    
    95 95
     
    
    96
    -    def parent_complete(self, success, result):
    
    97
    -        self._complete_cb(self, self._element, success, self._result)
    
    96
    +    def parent_complete(self, status, result):
    
    97
    +        self._complete_cb(self, self._element, status, self._result)
    
    98 98
     
    
    99 99
         def message(self, message_type, message, **kwargs):
    
    100 100
             args = dict(kwargs)
    

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -41,6 +41,22 @@ RC_PERM_FAIL = 2
    41 41
     RC_SKIPPED = 3
    
    42 42
     
    
    43 43
     
    
    44
    +# JobStatus:
    
    45
    +#
    
    46
    +# The job completion status, passed back through the
    
    47
    +# complete callbacks.
    
    48
    +#
    
    49
    +class JobStatus():
    
    50
    +    # Job succeeded
    
    51
    +    OK = 0
    
    52
    +
    
    53
    +    # A temporary BstError was raised
    
    54
    +    FAIL = 1
    
    55
    +
    
    56
    +    # A SkipJob was raised
    
    57
    +    SKIPPED = 3
    
    58
    +
    
    59
    +
    
    44 60
     # Used to distinguish between status messages and return values
    
    45 61
     class Envelope():
    
    46 62
         def __init__(self, message_type, message):
    
    ... ... @@ -116,7 +132,6 @@ class Job():
    116 132
             self._max_retries = max_retries        # Maximum number of automatic retries
    
    117 133
             self._result = None                    # Return value of child action in the parent
    
    118 134
             self._tries = 0                        # Try count, for retryable jobs
    
    119
    -        self._skipped_flag = False             # Indicate whether the job was skipped.
    
    120 135
             self._terminated = False               # Whether this job has been explicitly terminated
    
    121 136
     
    
    122 137
             # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
    
    ... ... @@ -273,18 +288,6 @@ class Job():
    273 288
         def set_task_id(self, task_id):
    
    274 289
             self._task_id = task_id
    
    275 290
     
    
    276
    -    # skipped
    
    277
    -    #
    
    278
    -    # This will evaluate to True if the job was skipped
    
    279
    -    # during processing, or if it was forcefully terminated.
    
    280
    -    #
    
    281
    -    # Returns:
    
    282
    -    #    (bool): Whether the job should appear as skipped
    
    283
    -    #
    
    284
    -    @property
    
    285
    -    def skipped(self):
    
    286
    -        return self._skipped_flag or self._terminated
    
    287
    -
    
    288 291
         #######################################################
    
    289 292
         #                  Abstract Methods                   #
    
    290 293
         #######################################################
    
    ... ... @@ -295,10 +298,10 @@ class Job():
    295 298
         # pass the result to the main thread.
    
    296 299
         #
    
    297 300
         # Args:
    
    298
    -    #    success (bool): Whether the job was successful.
    
    301
    +    #    status (JobStatus): The job exit status
    
    299 302
         #    result (any): The result returned by child_process().
    
    300 303
         #
    
    301
    -    def parent_complete(self, success, result):
    
    304
    +    def parent_complete(self, status, result):
    
    302 305
             raise ImplError("Job '{kind}' does not implement parent_complete()"
    
    303 306
                             .format(kind=type(self).__name__))
    
    304 307
     
    
    ... ... @@ -562,16 +565,23 @@ class Job():
    562 565
             #
    
    563 566
             self._retry_flag = returncode == RC_FAIL
    
    564 567
     
    
    565
    -        # Set the flag to alert Queue that this job skipped.
    
    566
    -        self._skipped_flag = returncode == RC_SKIPPED
    
    567
    -
    
    568 568
             if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
    
    569 569
                 self.spawn()
    
    570 570
                 return
    
    571 571
     
    
    572
    -        success = returncode in (RC_OK, RC_SKIPPED)
    
    573
    -        self.parent_complete(success, self._result)
    
    574
    -        self._scheduler.job_completed(self, success)
    
    572
    +        # Resolve the outward facing overall job completion status
    
    573
    +        #
    
    574
    +        if returncode == RC_OK:
    
    575
    +            status = JobStatus.OK
    
    576
    +        elif returncode == RC_SKIPPED:
    
    577
    +            status = JobStatus.SKIPPED
    
    578
    +        elif returncode in (RC_FAIL, RC_PERM_FAIL):
    
    579
    +            status = JobStatus.FAIL
    
    580
    +        else:
    
    581
    +            status = JobStatus.FAIL
    
    582
    +
    
    583
    +        self.parent_complete(status, self._result)
    
    584
    +        self._scheduler.job_completed(self, status)
    
    575 585
     
    
    576 586
             # Force the deletion of the queue and process objects to try and clean up FDs
    
    577 587
             self._queue = self._process = None
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -21,7 +21,7 @@
    21 21
     from datetime import timedelta
    
    22 22
     
    
    23 23
     from . import Queue, QueueStatus
    
    24
    -from ..jobs import ElementJob
    
    24
    +from ..jobs import ElementJob, JobStatus
    
    25 25
     from ..resources import ResourceType
    
    26 26
     from ..._message import MessageType
    
    27 27
     
    
    ... ... @@ -104,7 +104,7 @@ class BuildQueue(Queue):
    104 104
             if artifacts.has_quota_exceeded():
    
    105 105
                 self._scheduler.check_cache_size()
    
    106 106
     
    
    107
    -    def done(self, job, element, result, success):
    
    107
    +    def done(self, job, element, result, status):
    
    108 108
     
    
    109 109
             # Inform element in main process that assembly is done
    
    110 110
             element._assemble_done()
    
    ... ... @@ -117,5 +117,5 @@ class BuildQueue(Queue):
    117 117
             #        artifact cache size for a successful build even though we know a
    
    118 118
             #        failed build also grows the artifact cache size.
    
    119 119
             #
    
    120
    -        if success:
    
    120
    +        if status == JobStatus.OK:
    
    121 121
                 self._check_cache_size(job, element, result)

  • buildstream/_scheduler/queues/fetchqueue.py
    ... ... @@ -24,6 +24,7 @@ from ... import Consistency
    24 24
     # Local imports
    
    25 25
     from . import Queue, QueueStatus
    
    26 26
     from ..resources import ResourceType
    
    27
    +from ..jobs import JobStatus
    
    27 28
     
    
    28 29
     
    
    29 30
     # A queue which fetches element sources
    
    ... ... @@ -66,9 +67,9 @@ class FetchQueue(Queue):
    66 67
     
    
    67 68
             return QueueStatus.READY
    
    68 69
     
    
    69
    -    def done(self, _, element, result, success):
    
    70
    +    def done(self, _, element, result, status):
    
    70 71
     
    
    71
    -        if not success:
    
    72
    +        if status == JobStatus.FAIL:
    
    72 73
                 return
    
    73 74
     
    
    74 75
             element._update_state()
    

  • buildstream/_scheduler/queues/pullqueue.py
    ... ... @@ -21,6 +21,7 @@
    21 21
     # Local imports
    
    22 22
     from . import Queue, QueueStatus
    
    23 23
     from ..resources import ResourceType
    
    24
    +from ..jobs import JobStatus
    
    24 25
     from ..._exceptions import SkipJob
    
    25 26
     
    
    26 27
     
    
    ... ... @@ -54,9 +55,9 @@ class PullQueue(Queue):
    54 55
             else:
    
    55 56
                 return QueueStatus.SKIP
    
    56 57
     
    
    57
    -    def done(self, _, element, result, success):
    
    58
    +    def done(self, _, element, result, status):
    
    58 59
     
    
    59
    -        if not success:
    
    60
    +        if status == JobStatus.FAIL:
    
    60 61
                 return
    
    61 62
     
    
    62 63
             element._pull_done()
    
    ... ... @@ -64,4 +65,5 @@ class PullQueue(Queue):
    64 65
             # Build jobs will check the "approximate" size first. Since we
    
    65 66
             # do not get an artifact size from pull jobs, we have to
    
    66 67
             # actually check the cache size.
    
    67
    -        self._scheduler.check_cache_size()
    68
    +        if status == JobStatus.OK:
    
    69
    +            self._scheduler.check_cache_size()

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -25,7 +25,7 @@ from enum import Enum
    25 25
     import traceback
    
    26 26
     
    
    27 27
     # Local imports
    
    28
    -from ..jobs import ElementJob
    
    28
    +from ..jobs import ElementJob, JobStatus
    
    29 29
     from ..resources import ResourceType
    
    30 30
     
    
    31 31
     # BuildStream toplevel imports
    
    ... ... @@ -133,10 +133,9 @@ class Queue():
    133 133
         #    job (Job): The job which completed processing
    
    134 134
         #    element (Element): The element which completed processing
    
    135 135
         #    result (any): The return value of the process() implementation
    
    136
    -    #    success (bool): True if the process() implementation did not
    
    137
    -    #                    raise any exception
    
    136
    +    #    status (JobStatus): The return status of the Job
    
    138 137
         #
    
    139
    -    def done(self, job, element, result, success):
    
    138
    +    def done(self, job, element, result, status):
    
    140 139
             pass
    
    141 140
     
    
    142 141
         #####################################################
    
    ... ... @@ -291,7 +290,7 @@ class Queue():
    291 290
         #
    
    292 291
         # See the Job object for an explanation of the call signature
    
    293 292
         #
    
    294
    -    def _job_done(self, job, element, success, result):
    
    293
    +    def _job_done(self, job, element, status, result):
    
    295 294
     
    
    296 295
             # Update values that need to be synchronized in the main task
    
    297 296
             # before calling any queue implementation
    
    ... ... @@ -301,7 +300,7 @@ class Queue():
    301 300
             # and determine if it should be considered as processed
    
    302 301
             # or skipped.
    
    303 302
             try:
    
    304
    -            self.done(job, element, result, success)
    
    303
    +            self.done(job, element, result, status)
    
    305 304
             except BstError as e:
    
    306 305
     
    
    307 306
                 # Report error and mark as failed
    
    ... ... @@ -332,12 +331,10 @@ class Queue():
    332 331
                 # All jobs get placed on the done queue for later processing.
    
    333 332
                 self._done_queue.append(job)
    
    334 333
     
    
    335
    -            # A Job can be skipped whether or not it has failed,
    
    336
    -            # we want to only bookkeep them as processed or failed
    
    337
    -            # if they are not skipped.
    
    338
    -            if job.skipped:
    
    334
    +            # These lists are for bookkeeping purposes for the UI and logging.
    
    335
    +            if status == JobStatus.SKIPPED:
    
    339 336
                     self.skipped_elements.append(element)
    
    340
    -            elif success:
    
    337
    +            elif status == JobStatus.OK:
    
    341 338
                     self.processed_elements.append(element)
    
    342 339
                 else:
    
    343 340
                     self.failed_elements.append(element)
    

  • buildstream/_scheduler/queues/trackqueue.py
    ... ... @@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup
    24 24
     # Local imports
    
    25 25
     from . import Queue, QueueStatus
    
    26 26
     from ..resources import ResourceType
    
    27
    +from ..jobs import JobStatus
    
    27 28
     
    
    28 29
     
    
    29 30
     # A queue which tracks sources
    
    ... ... @@ -47,9 +48,9 @@ class TrackQueue(Queue):
    47 48
     
    
    48 49
             return QueueStatus.READY
    
    49 50
     
    
    50
    -    def done(self, _, element, result, success):
    
    51
    +    def done(self, _, element, result, status):
    
    51 52
     
    
    52
    -        if not success:
    
    53
    +        if status == JobStatus.FAIL:
    
    53 54
                 return
    
    54 55
     
    
    55 56
             # Set the new refs in the main process one by one as they complete
    

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -38,6 +38,16 @@ class SchedStatus():
    38 38
         TERMINATED = 1
    
    39 39
     
    
    40 40
     
    
    41
    +# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
    
    42
    +# which we launch dynamically, they have the property of being
    
    43
    +# meaningless to queue if one is already queued, and it also
    
    44
    +# doesnt make sense to run them in parallel
    
    45
    +#
    
    46
    +_ACTION_NAME_CLEANUP = 'cleanup'
    
    47
    +_ACTION_NAME_CACHE_SIZE = 'cache_size'
    
    48
    +_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
    
    49
    +
    
    50
    +
    
    41 51
     # Scheduler()
    
    42 52
     #
    
    43 53
     # The scheduler operates on a list queues, each of which is meant to accomplish
    
    ... ... @@ -94,6 +104,15 @@ class Scheduler():
    94 104
             self._suspendtime = None
    
    95 105
             self._queue_jobs = True      # Whether we should continue to queue jobs
    
    96 106
     
    
    107
    +        # Whether our exclusive jobs, like 'cleanup' are currently already
    
    108
    +        # waiting or active.
    
    109
    +        #
    
    110
    +        # This is just a bit quicker than scanning the wait queue and active
    
    111
    +        # queue and comparing job action names.
    
    112
    +        #
    
    113
    +        self._exclusive_waiting = set()
    
    114
    +        self._exclusive_active = set()
    
    115
    +
    
    97 116
             self._resources = Resources(context.sched_builders,
    
    98 117
                                         context.sched_fetchers,
    
    99 118
                                         context.sched_pushers)
    
    ... ... @@ -211,19 +230,6 @@ class Scheduler():
    211 230
                 starttime = timenow
    
    212 231
             return timenow - starttime
    
    213 232
     
    
    214
    -    # schedule_jobs()
    
    215
    -    #
    
    216
    -    # Args:
    
    217
    -    #     jobs ([Job]): A list of jobs to schedule
    
    218
    -    #
    
    219
    -    # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
    
    220
    -    # run as soon any other queueing jobs finish, provided sufficient
    
    221
    -    # resources are available for them to run
    
    222
    -    #
    
    223
    -    def schedule_jobs(self, jobs):
    
    224
    -        for job in jobs:
    
    225
    -            self.waiting_jobs.append(job)
    
    226
    -
    
    227 233
         # job_completed():
    
    228 234
         #
    
    229 235
         # Called when a Job completes
    
    ... ... @@ -231,12 +237,14 @@ class Scheduler():
    231 237
         # Args:
    
    232 238
         #    queue (Queue): The Queue holding a complete job
    
    233 239
         #    job (Job): The completed Job
    
    234
    -    #    success (bool): Whether the Job completed with a success status
    
    240
    +    #    status (JobStatus): The status of the completed job
    
    235 241
         #
    
    236
    -    def job_completed(self, job, success):
    
    242
    +    def job_completed(self, job, status):
    
    237 243
             self._resources.clear_job_resources(job)
    
    238 244
             self.active_jobs.remove(job)
    
    239
    -        self._job_complete_callback(job, success)
    
    245
    +        if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    246
    +            self._exclusive_active.remove(job.action_name)
    
    247
    +        self._job_complete_callback(job, status)
    
    240 248
             self._schedule_queue_jobs()
    
    241 249
             self._sched()
    
    242 250
     
    
    ... ... @@ -246,18 +254,13 @@ class Scheduler():
    246 254
         # size is calculated, a cleanup job will be run automatically
    
    247 255
         # if needed.
    
    248 256
         #
    
    249
    -    # FIXME: This should ensure that only one cache size job
    
    250
    -    #        is ever pending at a given time. If a cache size
    
    251
    -    #        job is already running, it is correct to queue
    
    252
    -    #        a new one, it is incorrect to have more than one
    
    253
    -    #        of these jobs pending at a given time, though.
    
    254
    -    #
    
    255 257
         def check_cache_size(self):
    
    256
    -        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
    
    258
    +        job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
    
    259
    +                           'cache_size/cache_size',
    
    257 260
                                resources=[ResourceType.CACHE,
    
    258 261
                                           ResourceType.PROCESS],
    
    259 262
                                complete_cb=self._run_cleanup)
    
    260
    -        self.schedule_jobs([job])
    
    263
    +        self._schedule_jobs([job])
    
    261 264
     
    
    262 265
         #######################################################
    
    263 266
         #                  Local Private Methods              #
    
    ... ... @@ -276,10 +279,19 @@ class Scheduler():
    276 279
                 if not self._resources.reserve_job_resources(job):
    
    277 280
                     continue
    
    278 281
     
    
    282
    +            # Postpone these jobs if one is already running
    
    283
    +            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
    
    284
    +               job.action_name in self._exclusive_active:
    
    285
    +                continue
    
    286
    +
    
    279 287
                 job.spawn()
    
    280 288
                 self.waiting_jobs.remove(job)
    
    281 289
                 self.active_jobs.append(job)
    
    282 290
     
    
    291
    +            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    292
    +                self._exclusive_waiting.remove(job.action_name)
    
    293
    +                self._exclusive_active.add(job.action_name)
    
    294
    +
    
    283 295
                 if self._job_start_callback:
    
    284 296
                     self._job_start_callback(job)
    
    285 297
     
    
    ... ... @@ -287,6 +299,33 @@ class Scheduler():
    287 299
             if not self.active_jobs and not self.waiting_jobs:
    
    288 300
                 self.loop.stop()
    
    289 301
     
    
    302
    +    # _schedule_jobs()
    
    303
    +    #
    
    304
    +    # The main entry point for jobs to be scheduled.
    
    305
    +    #
    
    306
    +    # This is called either as a result of scanning the queues
    
    307
    +    # in _schedule_queue_jobs(), or directly by the Scheduler
    
    308
    +    # to insert special jobs like cleanups.
    
    309
    +    #
    
    310
    +    # Args:
    
    311
    +    #     jobs ([Job]): A list of jobs to schedule
    
    312
    +    #
    
    313
    +    def _schedule_jobs(self, jobs):
    
    314
    +        for job in jobs:
    
    315
    +
    
    316
    +            # Special treatment of our redundant exclusive jobs
    
    317
    +            #
    
    318
    +            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    319
    +
    
    320
    +                # Drop the job if one is already queued
    
    321
    +                if job.action_name in self._exclusive_waiting:
    
    322
    +                    continue
    
    323
    +
    
    324
    +                # Mark this action type as queued
    
    325
    +                self._exclusive_waiting.add(job.action_name)
    
    326
    +
    
    327
    +            self.waiting_jobs.append(job)
    
    328
    +
    
    290 329
         # _schedule_queue_jobs()
    
    291 330
         #
    
    292 331
         # Ask the queues what jobs they want to schedule and schedule
    
    ... ... @@ -331,7 +370,7 @@ class Scheduler():
    331 370
                 # the next queue and process them.
    
    332 371
                 process_queues = any(q.dequeue_ready() for q in self.queues)
    
    333 372
     
    
    334
    -        self.schedule_jobs(ready)
    
    373
    +        self._schedule_jobs(ready)
    
    335 374
             self._sched()
    
    336 375
     
    
    337 376
         # _run_cleanup()
    
    ... ... @@ -353,11 +392,11 @@ class Scheduler():
    353 392
             if not artifacts.has_quota_exceeded():
    
    354 393
                 return
    
    355 394
     
    
    356
    -        job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
    
    395
    +        job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
    
    357 396
                              resources=[ResourceType.CACHE,
    
    358 397
                                         ResourceType.PROCESS],
    
    359 398
                              exclusive_resources=[ResourceType.CACHE])
    
    360
    -        self.schedule_jobs([job])
    
    399
    +        self._schedule_jobs([job])
    
    361 400
     
    
    362 401
         # _suspend_jobs()
    
    363 402
         #
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]