[Notes] [Git][BuildStream/buildstream][tristan/one-cache-size-job] Scheduler: Introduced JobStatus instead of simple success boolean



Title: GitLab

Tristan Van Berkom pushed to branch tristan/one-cache-size-job at BuildStream / buildstream

Commits:

13 changed files:

Changes:

  • buildstream/_frontend/app.py
    ... ... @@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages
    38 38
     from .._stream import Stream
    
    39 39
     from .._versions import BST_FORMAT_VERSION
    
    40 40
     from .. import _yaml
    
    41
    -from .._scheduler import ElementJob
    
    41
    +from .._scheduler import ElementJob, JobStatus
    
    42 42
     
    
    43 43
     # Import frontend assets
    
    44 44
     from . import Profile, LogLine, Status
    
    ... ... @@ -515,13 +515,13 @@ class App():
    515 515
             self._status.add_job(job)
    
    516 516
             self._maybe_render_status()
    
    517 517
     
    
    518
    -    def _job_completed(self, job, success):
    
    518
    +    def _job_completed(self, job, status):
    
    519 519
             self._status.remove_job(job)
    
    520 520
             self._maybe_render_status()
    
    521 521
     
    
    522 522
             # Dont attempt to handle a failure if the user has already opted to
    
    523 523
             # terminate
    
    524
    -        if not success and not self.stream.terminated:
    
    524
    +        if status == JobStatus.FAIL and not self.stream.terminated:
    
    525 525
     
    
    526 526
                 if isinstance(job, ElementJob):
    
    527 527
                     element = job.element
    

  • buildstream/_scheduler/__init__.py
    ... ... @@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue
    26 26
     from .queues.pullqueue import PullQueue
    
    27 27
     
    
    28 28
     from .scheduler import Scheduler, SchedStatus
    
    29
    -from .jobs import ElementJob
    29
    +from .jobs import ElementJob, JobStatus

  • buildstream/_scheduler/jobs/__init__.py
    ... ... @@ -20,3 +20,4 @@
    20 20
     from .elementjob import ElementJob
    
    21 21
     from .cachesizejob import CacheSizeJob
    
    22 22
     from .cleanupjob import CleanupJob
    
    23
    +from .job import JobStatus

  • buildstream/_scheduler/jobs/cachesizejob.py
    ... ... @@ -16,7 +16,7 @@
    16 16
     #  Author:
    
    17 17
     #        Tristan Daniël Maat <tristan maat codethink co uk>
    
    18 18
     #
    
    19
    -from .job import Job
    
    19
    +from .job import Job, JobStatus
    
    20 20
     
    
    21 21
     
    
    22 22
     class CacheSizeJob(Job):
    
    ... ... @@ -30,8 +30,8 @@ class CacheSizeJob(Job):
    30 30
         def child_process(self):
    
    31 31
             return self._artifacts.compute_cache_size()
    
    32 32
     
    
    33
    -    def parent_complete(self, success, result):
    
    34
    -        if success:
    
    33
    +    def parent_complete(self, status, result):
    
    34
    +        if status == JobStatus.OK:
    
    35 35
                 self._artifacts.set_cache_size(result)
    
    36 36
     
    
    37 37
                 if self._complete_cb:
    

  • buildstream/_scheduler/jobs/cleanupjob.py
    ... ... @@ -16,7 +16,7 @@
    16 16
     #  Author:
    
    17 17
     #        Tristan Daniël Maat <tristan maat codethink co uk>
    
    18 18
     #
    
    19
    -from .job import Job
    
    19
    +from .job import Job, JobStatus
    
    20 20
     
    
    21 21
     
    
    22 22
     class CleanupJob(Job):
    
    ... ... @@ -29,6 +29,6 @@ class CleanupJob(Job):
    29 29
         def child_process(self):
    
    30 30
             return self._artifacts.clean()
    
    31 31
     
    
    32
    -    def parent_complete(self, success, result):
    
    33
    -        if success:
    
    32
    +    def parent_complete(self, status, result):
    
    33
    +        if status == JobStatus.OK:
    
    34 34
                 self._artifacts.set_cache_size(result)

  • buildstream/_scheduler/jobs/elementjob.py
    ... ... @@ -60,7 +60,7 @@ from .job import Job
    60 60
     #     Args:
    
    61 61
     #        job (Job): The job object which completed
    
    62 62
     #        element (Element): The element passed to the Job() constructor
    
    63
    -#        success (bool): True if the action_cb did not raise an exception
    
    63
    +#        status (JobStatus): The status of whether the workload raised an exception
    
    64 64
     #        result (object): The deserialized object returned by the `action_cb`, or None
    
    65 65
     #                         if `success` is False
    
    66 66
     #
    
    ... ... @@ -93,8 +93,8 @@ class ElementJob(Job):
    93 93
             # Run the action
    
    94 94
             return self._action_cb(self._element)
    
    95 95
     
    
    96
    -    def parent_complete(self, success, result):
    
    97
    -        self._complete_cb(self, self._element, success, self._result)
    
    96
    +    def parent_complete(self, status, result):
    
    97
    +        self._complete_cb(self, self._element, status, self._result)
    
    98 98
     
    
    99 99
         def message(self, message_type, message, **kwargs):
    
    100 100
             args = dict(kwargs)
    

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -35,12 +35,21 @@ from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
    35 35
     from ..._message import Message, MessageType, unconditional_messages
    
    36 36
     from ... import _signals, utils
    
    37 37
     
    
    38
    -# Return code values shutdown of job handling child processes
    
    38
    +
    
    39
    +# The job status passed back to the parent
    
    39 40
     #
    
    40
    -RC_OK = 0
    
    41
    -RC_FAIL = 1
    
    42
    -RC_PERM_FAIL = 2
    
    43
    -RC_SKIPPED = 3
    
    41
    +class JobStatus():
    
    42
    +    # Job succeeded
    
    43
    +    OK = 0
    
    44
    +
    
    45
    +    # A temporary BstError was raised
    
    46
    +    FAIL = 1
    
    47
    +
    
    48
    +    # A permanent BstError was raised
    
    49
    +    PERM_FAIL = 2
    
    50
    +
    
    51
    +    # A SkipJob was raised
    
    52
    +    SKIPPED = 3
    
    44 53
     
    
    45 54
     
    
    46 55
     # Used to distinguish between status messages and return values
    
    ... ... @@ -304,10 +313,10 @@ class Job():
    304 313
         # pass the result to the main thread.
    
    305 314
         #
    
    306 315
         # Args:
    
    307
    -    #    success (bool): Whether the job was successful.
    
    316
    +    #    status (JobStatus): Whether the job was successful.
    
    308 317
         #    result (any): The result returned by child_process().
    
    309 318
         #
    
    310
    -    def parent_complete(self, success, result):
    
    319
    +    def parent_complete(self, status, result):
    
    311 320
             raise ImplError("Job '{kind}' does not implement parent_complete()"
    
    312 321
                             .format(kind=type(self).__name__))
    
    313 322
     
    
    ... ... @@ -421,7 +430,7 @@ class Job():
    421 430
                                  elapsed=elapsed, logfile=filename)
    
    422 431
     
    
    423 432
                     # Alert parent of skip by return code
    
    424
    -                self._child_shutdown(RC_SKIPPED)
    
    433
    +                self._child_shutdown(JobStatus.SKIPPED)
    
    425 434
                 except BstError as e:
    
    426 435
                     elapsed = datetime.datetime.now() - starttime
    
    427 436
                     self._retry_flag = e.temporary
    
    ... ... @@ -442,7 +451,7 @@ class Job():
    442 451
     
    
    443 452
                     # Set return code based on whether or not the error was temporary.
    
    444 453
                     #
    
    445
    -                self._child_shutdown(RC_FAIL if self._retry_flag else RC_PERM_FAIL)
    
    454
    +                self._child_shutdown(JobStatus.FAIL if self._retry_flag else JobStatus.PERM_FAIL)
    
    446 455
     
    
    447 456
                 except Exception as e:                        # pylint: disable=broad-except
    
    448 457
     
    
    ... ... @@ -457,7 +466,7 @@ class Job():
    457 466
                                  elapsed=elapsed, detail=detail,
    
    458 467
                                  logfile=filename)
    
    459 468
                     # Unhandled exceptions should permenantly fail
    
    460
    -                self._child_shutdown(RC_PERM_FAIL)
    
    469
    +                self._child_shutdown(JobStatus.PERM_FAIL)
    
    461 470
     
    
    462 471
                 else:
    
    463 472
                     # No exception occurred in the action
    
    ... ... @@ -471,7 +480,7 @@ class Job():
    471 480
                     # Shutdown needs to stay outside of the above context manager,
    
    472 481
                     # make sure we dont try to handle SIGTERM while the process
    
    473 482
                     # is already busy in sys.exit()
    
    474
    -                self._child_shutdown(RC_OK)
    
    483
    +                self._child_shutdown(JobStatus.OK)
    
    475 484
     
    
    476 485
         # _child_send_error()
    
    477 486
         #
    
    ... ... @@ -569,18 +578,17 @@ class Job():
    569 578
             # We don't want to retry if we got OK or a permanent fail.
    
    570 579
             # This is set in _child_action but must also be set for the parent.
    
    571 580
             #
    
    572
    -        self._retry_flag = returncode == RC_FAIL
    
    581
    +        self._retry_flag = returncode == JobStatus.FAIL
    
    573 582
     
    
    574 583
             # Set the flag to alert Queue that this job skipped.
    
    575
    -        self._skipped_flag = returncode == RC_SKIPPED
    
    584
    +        self._skipped_flag = returncode == JobStatus.SKIPPED
    
    576 585
     
    
    577 586
             if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
    
    578 587
                 self.spawn()
    
    579 588
                 return
    
    580 589
     
    
    581
    -        success = returncode in (RC_OK, RC_SKIPPED)
    
    582
    -        self.parent_complete(success, self._result)
    
    583
    -        self._scheduler.job_completed(self, success)
    
    590
    +        self.parent_complete(returncode, self._result)
    
    591
    +        self._scheduler.job_completed(self, returncode)
    
    584 592
     
    
    585 593
             # Force the deletion of the queue and process objects to try and clean up FDs
    
    586 594
             self._queue = self._process = None
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -21,7 +21,7 @@
    21 21
     from datetime import timedelta
    
    22 22
     
    
    23 23
     from . import Queue, QueueStatus
    
    24
    -from ..jobs import ElementJob
    
    24
    +from ..jobs import ElementJob, JobStatus
    
    25 25
     from ..resources import ResourceType
    
    26 26
     from ..._message import MessageType
    
    27 27
     
    
    ... ... @@ -104,7 +104,7 @@ class BuildQueue(Queue):
    104 104
             if artifacts.has_quota_exceeded():
    
    105 105
                 self._scheduler.check_cache_size()
    
    106 106
     
    
    107
    -    def done(self, job, element, result, success):
    
    107
    +    def done(self, job, element, result, status):
    
    108 108
     
    
    109 109
             # Inform element in main process that assembly is done
    
    110 110
             element._assemble_done()
    
    ... ... @@ -117,5 +117,5 @@ class BuildQueue(Queue):
    117 117
             #        artifact cache size for a successful build even though we know a
    
    118 118
             #        failed build also grows the artifact cache size.
    
    119 119
             #
    
    120
    -        if success:
    
    120
    +        if status == JobStatus.OK:
    
    121 121
                 self._check_cache_size(job, element, result)

  • buildstream/_scheduler/queues/fetchqueue.py
    ... ... @@ -24,6 +24,7 @@ from ... import Consistency
    24 24
     # Local imports
    
    25 25
     from . import Queue, QueueStatus
    
    26 26
     from ..resources import ResourceType
    
    27
    +from ..jobs import JobStatus
    
    27 28
     
    
    28 29
     
    
    29 30
     # A queue which fetches element sources
    
    ... ... @@ -66,9 +67,9 @@ class FetchQueue(Queue):
    66 67
     
    
    67 68
             return QueueStatus.READY
    
    68 69
     
    
    69
    -    def done(self, _, element, result, success):
    
    70
    +    def done(self, _, element, result, status):
    
    70 71
     
    
    71
    -        if not success:
    
    72
    +        if status == JobStatus.FAIL:
    
    72 73
                 return
    
    73 74
     
    
    74 75
             element._update_state()
    

  • buildstream/_scheduler/queues/pullqueue.py
    ... ... @@ -21,6 +21,7 @@
    21 21
     # Local imports
    
    22 22
     from . import Queue, QueueStatus
    
    23 23
     from ..resources import ResourceType
    
    24
    +from ..jobs import JobStatus
    
    24 25
     from ..._exceptions import SkipJob
    
    25 26
     
    
    26 27
     
    
    ... ... @@ -54,9 +55,9 @@ class PullQueue(Queue):
    54 55
             else:
    
    55 56
                 return QueueStatus.SKIP
    
    56 57
     
    
    57
    -    def done(self, _, element, result, success):
    
    58
    +    def done(self, _, element, result, status):
    
    58 59
     
    
    59
    -        if not success:
    
    60
    +        if status == JobStatus.FAIL:
    
    60 61
                 return
    
    61 62
     
    
    62 63
             element._pull_done()
    
    ... ... @@ -64,4 +65,5 @@ class PullQueue(Queue):
    64 65
             # Build jobs will check the "approximate" size first. Since we
    
    65 66
             # do not get an artifact size from pull jobs, we have to
    
    66 67
             # actually check the cache size.
    
    67
    -        self._scheduler.check_cache_size()
    68
    +        if status == JobStatus.OK:
    
    69
    +            self._scheduler.check_cache_size()

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -25,7 +25,7 @@ from enum import Enum
    25 25
     import traceback
    
    26 26
     
    
    27 27
     # Local imports
    
    28
    -from ..jobs import ElementJob
    
    28
    +from ..jobs import ElementJob, JobStatus
    
    29 29
     from ..resources import ResourceType
    
    30 30
     
    
    31 31
     # BuildStream toplevel imports
    
    ... ... @@ -133,10 +133,9 @@ class Queue():
    133 133
         #    job (Job): The job which completed processing
    
    134 134
         #    element (Element): The element which completed processing
    
    135 135
         #    result (any): The return value of the process() implementation
    
    136
    -    #    success (bool): True if the process() implementation did not
    
    137
    -    #                    raise any exception
    
    136
    +    #    status (JobStatus): The return status of the Job
    
    138 137
         #
    
    139
    -    def done(self, job, element, result, success):
    
    138
    +    def done(self, job, element, result, status):
    
    140 139
             pass
    
    141 140
     
    
    142 141
         #####################################################
    
    ... ... @@ -291,7 +290,7 @@ class Queue():
    291 290
         #
    
    292 291
         # See the Job object for an explanation of the call signature
    
    293 292
         #
    
    294
    -    def _job_done(self, job, element, success, result):
    
    293
    +    def _job_done(self, job, element, status, result):
    
    295 294
     
    
    296 295
             # Update values that need to be synchronized in the main task
    
    297 296
             # before calling any queue implementation
    
    ... ... @@ -301,7 +300,7 @@ class Queue():
    301 300
             # and determine if it should be considered as processed
    
    302 301
             # or skipped.
    
    303 302
             try:
    
    304
    -            self.done(job, element, result, success)
    
    303
    +            self.done(job, element, result, status)
    
    305 304
             except BstError as e:
    
    306 305
     
    
    307 306
                 # Report error and mark as failed
    
    ... ... @@ -337,7 +336,7 @@ class Queue():
    337 336
                 # if they are not skipped.
    
    338 337
                 if job.skipped:
    
    339 338
                     self.skipped_elements.append(element)
    
    340
    -            elif success:
    
    339
    +            elif status == JobStatus.OK:
    
    341 340
                     self.processed_elements.append(element)
    
    342 341
                 else:
    
    343 342
                     self.failed_elements.append(element)
    

  • buildstream/_scheduler/queues/trackqueue.py
    ... ... @@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup
    24 24
     # Local imports
    
    25 25
     from . import Queue, QueueStatus
    
    26 26
     from ..resources import ResourceType
    
    27
    +from ..jobs import JobStatus
    
    27 28
     
    
    28 29
     
    
    29 30
     # A queue which tracks sources
    
    ... ... @@ -47,9 +48,9 @@ class TrackQueue(Queue):
    47 48
     
    
    48 49
             return QueueStatus.READY
    
    49 50
     
    
    50
    -    def done(self, _, element, result, success):
    
    51
    +    def done(self, _, element, result, status):
    
    51 52
     
    
    52
    -        if not success:
    
    53
    +        if status == JobStatus.FAIL:
    
    53 54
                 return
    
    54 55
     
    
    55 56
             # Set the new refs in the main process one by one as they complete
    

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -237,14 +237,14 @@ class Scheduler():
    237 237
         # Args:
    
    238 238
         #    queue (Queue): The Queue holding a complete job
    
    239 239
         #    job (Job): The completed Job
    
    240
    -    #    success (bool): Whether the Job completed with a success status
    
    240
    +    #    status (JobStatus): The status of the completed job
    
    241 241
         #
    
    242
    -    def job_completed(self, job, success):
    
    242
    +    def job_completed(self, job, status):
    
    243 243
             self._resources.clear_job_resources(job)
    
    244 244
             self.active_jobs.remove(job)
    
    245 245
             if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
    
    246 246
                 self._exclusive_active.remove(job.action_name)
    
    247
    -        self._job_complete_callback(job, success)
    
    247
    +        self._job_complete_callback(job, status)
    
    248 248
             self._schedule_queue_jobs()
    
    249 249
             self._sched()
    
    250 250
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]