Tristan Van Berkom pushed to branch tristan/one-cache-size-job at BuildStream / buildstream
Commits:
-
570b4f1a
by Tristan Van Berkom at 2019-01-06T21:36:10Z
13 changed files:
- buildstream/_frontend/app.py
- buildstream/_scheduler/__init__.py
- buildstream/_scheduler/jobs/__init__.py
- buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/jobs/elementjob.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/fetchqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/queues/trackqueue.py
- buildstream/_scheduler/scheduler.py
Changes:
| ... | ... | @@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages |
| 38 | 38 |
from .._stream import Stream
|
| 39 | 39 |
from .._versions import BST_FORMAT_VERSION
|
| 40 | 40 |
from .. import _yaml
|
| 41 |
-from .._scheduler import ElementJob
|
|
| 41 |
+from .._scheduler import ElementJob, JobStatus
|
|
| 42 | 42 |
|
| 43 | 43 |
# Import frontend assets
|
| 44 | 44 |
from . import Profile, LogLine, Status
|
| ... | ... | @@ -515,13 +515,13 @@ class App(): |
| 515 | 515 |
self._status.add_job(job)
|
| 516 | 516 |
self._maybe_render_status()
|
| 517 | 517 |
|
| 518 |
- def _job_completed(self, job, success):
|
|
| 518 |
+ def _job_completed(self, job, status):
|
|
| 519 | 519 |
self._status.remove_job(job)
|
| 520 | 520 |
self._maybe_render_status()
|
| 521 | 521 |
|
| 522 | 522 |
# Dont attempt to handle a failure if the user has already opted to
|
| 523 | 523 |
# terminate
|
| 524 |
- if not success and not self.stream.terminated:
|
|
| 524 |
+ if status == JobStatus.FAIL and not self.stream.terminated:
|
|
| 525 | 525 |
|
| 526 | 526 |
if isinstance(job, ElementJob):
|
| 527 | 527 |
element = job.element
|
| ... | ... | @@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue |
| 26 | 26 |
from .queues.pullqueue import PullQueue
|
| 27 | 27 |
|
| 28 | 28 |
from .scheduler import Scheduler, SchedStatus
|
| 29 |
-from .jobs import ElementJob
|
|
| 29 |
+from .jobs import ElementJob, JobStatus
|
| ... | ... | @@ -20,3 +20,4 @@ |
| 20 | 20 |
from .elementjob import ElementJob
|
| 21 | 21 |
from .cachesizejob import CacheSizeJob
|
| 22 | 22 |
from .cleanupjob import CleanupJob
|
| 23 |
+from .job import JobStatus
|
| ... | ... | @@ -16,7 +16,7 @@ |
| 16 | 16 |
# Author:
|
| 17 | 17 |
# Tristan Daniël Maat <tristan maat codethink co uk>
|
| 18 | 18 |
#
|
| 19 |
-from .job import Job
|
|
| 19 |
+from .job import Job, JobStatus
|
|
| 20 | 20 |
|
| 21 | 21 |
|
| 22 | 22 |
class CacheSizeJob(Job):
|
| ... | ... | @@ -30,8 +30,8 @@ class CacheSizeJob(Job): |
| 30 | 30 |
def child_process(self):
|
| 31 | 31 |
return self._artifacts.compute_cache_size()
|
| 32 | 32 |
|
| 33 |
- def parent_complete(self, success, result):
|
|
| 34 |
- if success:
|
|
| 33 |
+ def parent_complete(self, status, result):
|
|
| 34 |
+ if status == JobStatus.OK:
|
|
| 35 | 35 |
self._artifacts.set_cache_size(result)
|
| 36 | 36 |
|
| 37 | 37 |
if self._complete_cb:
|
| ... | ... | @@ -16,7 +16,7 @@ |
| 16 | 16 |
# Author:
|
| 17 | 17 |
# Tristan Daniël Maat <tristan maat codethink co uk>
|
| 18 | 18 |
#
|
| 19 |
-from .job import Job
|
|
| 19 |
+from .job import Job, JobStatus
|
|
| 20 | 20 |
|
| 21 | 21 |
|
| 22 | 22 |
class CleanupJob(Job):
|
| ... | ... | @@ -29,6 +29,6 @@ class CleanupJob(Job): |
| 29 | 29 |
def child_process(self):
|
| 30 | 30 |
return self._artifacts.clean()
|
| 31 | 31 |
|
| 32 |
- def parent_complete(self, success, result):
|
|
| 33 |
- if success:
|
|
| 32 |
+ def parent_complete(self, status, result):
|
|
| 33 |
+ if status == JobStatus.OK:
|
|
| 34 | 34 |
self._artifacts.set_cache_size(result)
|
| ... | ... | @@ -60,7 +60,7 @@ from .job import Job |
| 60 | 60 |
# Args:
|
| 61 | 61 |
# job (Job): The job object which completed
|
| 62 | 62 |
# element (Element): The element passed to the Job() constructor
|
| 63 |
-# success (bool): True if the action_cb did not raise an exception
|
|
| 63 |
+# status (JobStatus): The status of whether the workload raised an exception
|
|
| 64 | 64 |
# result (object): The deserialized object returned by the `action_cb`, or None
|
| 65 | 65 |
# if `success` is False
|
| 66 | 66 |
#
|
| ... | ... | @@ -93,8 +93,8 @@ class ElementJob(Job): |
| 93 | 93 |
# Run the action
|
| 94 | 94 |
return self._action_cb(self._element)
|
| 95 | 95 |
|
| 96 |
- def parent_complete(self, success, result):
|
|
| 97 |
- self._complete_cb(self, self._element, success, self._result)
|
|
| 96 |
+ def parent_complete(self, status, result):
|
|
| 97 |
+ self._complete_cb(self, self._element, status, self._result)
|
|
| 98 | 98 |
|
| 99 | 99 |
def message(self, message_type, message, **kwargs):
|
| 100 | 100 |
args = dict(kwargs)
|
| ... | ... | @@ -35,12 +35,21 @@ from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob |
| 35 | 35 |
from ..._message import Message, MessageType, unconditional_messages
|
| 36 | 36 |
from ... import _signals, utils
|
| 37 | 37 |
|
| 38 |
-# Return code values shutdown of job handling child processes
|
|
| 38 |
+ |
|
| 39 |
+# The job status passed back to the parent
|
|
| 39 | 40 |
#
|
| 40 |
-RC_OK = 0
|
|
| 41 |
-RC_FAIL = 1
|
|
| 42 |
-RC_PERM_FAIL = 2
|
|
| 43 |
-RC_SKIPPED = 3
|
|
| 41 |
+class JobStatus():
|
|
| 42 |
+ # Job succeeded
|
|
| 43 |
+ OK = 0
|
|
| 44 |
+ |
|
| 45 |
+ # A temporary BstError was raised
|
|
| 46 |
+ FAIL = 1
|
|
| 47 |
+ |
|
| 48 |
+ # A permanent BstError was raised
|
|
| 49 |
+ PERM_FAIL = 2
|
|
| 50 |
+ |
|
| 51 |
+ # A SkipJob was raised
|
|
| 52 |
+ SKIPPED = 3
|
|
| 44 | 53 |
|
| 45 | 54 |
|
| 46 | 55 |
# Used to distinguish between status messages and return values
|
| ... | ... | @@ -304,10 +313,10 @@ class Job(): |
| 304 | 313 |
# pass the result to the main thread.
|
| 305 | 314 |
#
|
| 306 | 315 |
# Args:
|
| 307 |
- # success (bool): Whether the job was successful.
|
|
| 316 |
+ # status (JobStatus): Whether the job was successful.
|
|
| 308 | 317 |
# result (any): The result returned by child_process().
|
| 309 | 318 |
#
|
| 310 |
- def parent_complete(self, success, result):
|
|
| 319 |
+ def parent_complete(self, status, result):
|
|
| 311 | 320 |
raise ImplError("Job '{kind}' does not implement parent_complete()"
|
| 312 | 321 |
.format(kind=type(self).__name__))
|
| 313 | 322 |
|
| ... | ... | @@ -421,7 +430,7 @@ class Job(): |
| 421 | 430 |
elapsed=elapsed, logfile=filename)
|
| 422 | 431 |
|
| 423 | 432 |
# Alert parent of skip by return code
|
| 424 |
- self._child_shutdown(RC_SKIPPED)
|
|
| 433 |
+ self._child_shutdown(JobStatus.SKIPPED)
|
|
| 425 | 434 |
except BstError as e:
|
| 426 | 435 |
elapsed = datetime.datetime.now() - starttime
|
| 427 | 436 |
self._retry_flag = e.temporary
|
| ... | ... | @@ -442,7 +451,7 @@ class Job(): |
| 442 | 451 |
|
| 443 | 452 |
# Set return code based on whether or not the error was temporary.
|
| 444 | 453 |
#
|
| 445 |
- self._child_shutdown(RC_FAIL if self._retry_flag else RC_PERM_FAIL)
|
|
| 454 |
+ self._child_shutdown(JobStatus.FAIL if self._retry_flag else JobStatus.PERM_FAIL)
|
|
| 446 | 455 |
|
| 447 | 456 |
except Exception as e: # pylint: disable=broad-except
|
| 448 | 457 |
|
| ... | ... | @@ -457,7 +466,7 @@ class Job(): |
| 457 | 466 |
elapsed=elapsed, detail=detail,
|
| 458 | 467 |
logfile=filename)
|
| 459 | 468 |
# Unhandled exceptions should permenantly fail
|
| 460 |
- self._child_shutdown(RC_PERM_FAIL)
|
|
| 469 |
+ self._child_shutdown(JobStatus.PERM_FAIL)
|
|
| 461 | 470 |
|
| 462 | 471 |
else:
|
| 463 | 472 |
# No exception occurred in the action
|
| ... | ... | @@ -471,7 +480,7 @@ class Job(): |
| 471 | 480 |
# Shutdown needs to stay outside of the above context manager,
|
| 472 | 481 |
# make sure we dont try to handle SIGTERM while the process
|
| 473 | 482 |
# is already busy in sys.exit()
|
| 474 |
- self._child_shutdown(RC_OK)
|
|
| 483 |
+ self._child_shutdown(JobStatus.OK)
|
|
| 475 | 484 |
|
| 476 | 485 |
# _child_send_error()
|
| 477 | 486 |
#
|
| ... | ... | @@ -569,18 +578,17 @@ class Job(): |
| 569 | 578 |
# We don't want to retry if we got OK or a permanent fail.
|
| 570 | 579 |
# This is set in _child_action but must also be set for the parent.
|
| 571 | 580 |
#
|
| 572 |
- self._retry_flag = returncode == RC_FAIL
|
|
| 581 |
+ self._retry_flag = returncode == JobStatus.FAIL
|
|
| 573 | 582 |
|
| 574 | 583 |
# Set the flag to alert Queue that this job skipped.
|
| 575 |
- self._skipped_flag = returncode == RC_SKIPPED
|
|
| 584 |
+ self._skipped_flag = returncode == JobStatus.SKIPPED
|
|
| 576 | 585 |
|
| 577 | 586 |
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
|
| 578 | 587 |
self.spawn()
|
| 579 | 588 |
return
|
| 580 | 589 |
|
| 581 |
- success = returncode in (RC_OK, RC_SKIPPED)
|
|
| 582 |
- self.parent_complete(success, self._result)
|
|
| 583 |
- self._scheduler.job_completed(self, success)
|
|
| 590 |
+ self.parent_complete(returncode, self._result)
|
|
| 591 |
+ self._scheduler.job_completed(self, returncode)
|
|
| 584 | 592 |
|
| 585 | 593 |
# Force the deletion of the queue and process objects to try and clean up FDs
|
| 586 | 594 |
self._queue = self._process = None
|
| ... | ... | @@ -21,7 +21,7 @@ |
| 21 | 21 |
from datetime import timedelta
|
| 22 | 22 |
|
| 23 | 23 |
from . import Queue, QueueStatus
|
| 24 |
-from ..jobs import ElementJob
|
|
| 24 |
+from ..jobs import ElementJob, JobStatus
|
|
| 25 | 25 |
from ..resources import ResourceType
|
| 26 | 26 |
from ..._message import MessageType
|
| 27 | 27 |
|
| ... | ... | @@ -104,7 +104,7 @@ class BuildQueue(Queue): |
| 104 | 104 |
if artifacts.has_quota_exceeded():
|
| 105 | 105 |
self._scheduler.check_cache_size()
|
| 106 | 106 |
|
| 107 |
- def done(self, job, element, result, success):
|
|
| 107 |
+ def done(self, job, element, result, status):
|
|
| 108 | 108 |
|
| 109 | 109 |
# Inform element in main process that assembly is done
|
| 110 | 110 |
element._assemble_done()
|
| ... | ... | @@ -117,5 +117,5 @@ class BuildQueue(Queue): |
| 117 | 117 |
# artifact cache size for a successful build even though we know a
|
| 118 | 118 |
# failed build also grows the artifact cache size.
|
| 119 | 119 |
#
|
| 120 |
- if success:
|
|
| 120 |
+ if status == JobStatus.OK:
|
|
| 121 | 121 |
self._check_cache_size(job, element, result)
|
| ... | ... | @@ -24,6 +24,7 @@ from ... import Consistency |
| 24 | 24 |
# Local imports
|
| 25 | 25 |
from . import Queue, QueueStatus
|
| 26 | 26 |
from ..resources import ResourceType
|
| 27 |
+from ..jobs import JobStatus
|
|
| 27 | 28 |
|
| 28 | 29 |
|
| 29 | 30 |
# A queue which fetches element sources
|
| ... | ... | @@ -66,9 +67,9 @@ class FetchQueue(Queue): |
| 66 | 67 |
|
| 67 | 68 |
return QueueStatus.READY
|
| 68 | 69 |
|
| 69 |
- def done(self, _, element, result, success):
|
|
| 70 |
+ def done(self, _, element, result, status):
|
|
| 70 | 71 |
|
| 71 |
- if not success:
|
|
| 72 |
+ if status == JobStatus.FAIL:
|
|
| 72 | 73 |
return
|
| 73 | 74 |
|
| 74 | 75 |
element._update_state()
|
| ... | ... | @@ -21,6 +21,7 @@ |
| 21 | 21 |
# Local imports
|
| 22 | 22 |
from . import Queue, QueueStatus
|
| 23 | 23 |
from ..resources import ResourceType
|
| 24 |
+from ..jobs import JobStatus
|
|
| 24 | 25 |
from ..._exceptions import SkipJob
|
| 25 | 26 |
|
| 26 | 27 |
|
| ... | ... | @@ -54,9 +55,9 @@ class PullQueue(Queue): |
| 54 | 55 |
else:
|
| 55 | 56 |
return QueueStatus.SKIP
|
| 56 | 57 |
|
| 57 |
- def done(self, _, element, result, success):
|
|
| 58 |
+ def done(self, _, element, result, status):
|
|
| 58 | 59 |
|
| 59 |
- if not success:
|
|
| 60 |
+ if status == JobStatus.FAIL:
|
|
| 60 | 61 |
return
|
| 61 | 62 |
|
| 62 | 63 |
element._pull_done()
|
| ... | ... | @@ -64,4 +65,5 @@ class PullQueue(Queue): |
| 64 | 65 |
# Build jobs will check the "approximate" size first. Since we
|
| 65 | 66 |
# do not get an artifact size from pull jobs, we have to
|
| 66 | 67 |
# actually check the cache size.
|
| 67 |
- self._scheduler.check_cache_size()
|
|
| 68 |
+ if status == JobStatus.OK:
|
|
| 69 |
+ self._scheduler.check_cache_size()
|
| ... | ... | @@ -25,7 +25,7 @@ from enum import Enum |
| 25 | 25 |
import traceback
|
| 26 | 26 |
|
| 27 | 27 |
# Local imports
|
| 28 |
-from ..jobs import ElementJob
|
|
| 28 |
+from ..jobs import ElementJob, JobStatus
|
|
| 29 | 29 |
from ..resources import ResourceType
|
| 30 | 30 |
|
| 31 | 31 |
# BuildStream toplevel imports
|
| ... | ... | @@ -133,10 +133,9 @@ class Queue(): |
| 133 | 133 |
# job (Job): The job which completed processing
|
| 134 | 134 |
# element (Element): The element which completed processing
|
| 135 | 135 |
# result (any): The return value of the process() implementation
|
| 136 |
- # success (bool): True if the process() implementation did not
|
|
| 137 |
- # raise any exception
|
|
| 136 |
+ # status (JobStatus): The return status of the Job
|
|
| 138 | 137 |
#
|
| 139 |
- def done(self, job, element, result, success):
|
|
| 138 |
+ def done(self, job, element, result, status):
|
|
| 140 | 139 |
pass
|
| 141 | 140 |
|
| 142 | 141 |
#####################################################
|
| ... | ... | @@ -291,7 +290,7 @@ class Queue(): |
| 291 | 290 |
#
|
| 292 | 291 |
# See the Job object for an explanation of the call signature
|
| 293 | 292 |
#
|
| 294 |
- def _job_done(self, job, element, success, result):
|
|
| 293 |
+ def _job_done(self, job, element, status, result):
|
|
| 295 | 294 |
|
| 296 | 295 |
# Update values that need to be synchronized in the main task
|
| 297 | 296 |
# before calling any queue implementation
|
| ... | ... | @@ -301,7 +300,7 @@ class Queue(): |
| 301 | 300 |
# and determine if it should be considered as processed
|
| 302 | 301 |
# or skipped.
|
| 303 | 302 |
try:
|
| 304 |
- self.done(job, element, result, success)
|
|
| 303 |
+ self.done(job, element, result, status)
|
|
| 305 | 304 |
except BstError as e:
|
| 306 | 305 |
|
| 307 | 306 |
# Report error and mark as failed
|
| ... | ... | @@ -337,7 +336,7 @@ class Queue(): |
| 337 | 336 |
# if they are not skipped.
|
| 338 | 337 |
if job.skipped:
|
| 339 | 338 |
self.skipped_elements.append(element)
|
| 340 |
- elif success:
|
|
| 339 |
+ elif status == JobStatus.OK:
|
|
| 341 | 340 |
self.processed_elements.append(element)
|
| 342 | 341 |
else:
|
| 343 | 342 |
self.failed_elements.append(element)
|
| ... | ... | @@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup |
| 24 | 24 |
# Local imports
|
| 25 | 25 |
from . import Queue, QueueStatus
|
| 26 | 26 |
from ..resources import ResourceType
|
| 27 |
+from ..jobs import JobStatus
|
|
| 27 | 28 |
|
| 28 | 29 |
|
| 29 | 30 |
# A queue which tracks sources
|
| ... | ... | @@ -47,9 +48,9 @@ class TrackQueue(Queue): |
| 47 | 48 |
|
| 48 | 49 |
return QueueStatus.READY
|
| 49 | 50 |
|
| 50 |
- def done(self, _, element, result, success):
|
|
| 51 |
+ def done(self, _, element, result, status):
|
|
| 51 | 52 |
|
| 52 |
- if not success:
|
|
| 53 |
+ if status == JobStatus.FAIL:
|
|
| 53 | 54 |
return
|
| 54 | 55 |
|
| 55 | 56 |
# Set the new refs in the main process one by one as they complete
|
| ... | ... | @@ -237,14 +237,14 @@ class Scheduler(): |
| 237 | 237 |
# Args:
|
| 238 | 238 |
# queue (Queue): The Queue holding a complete job
|
| 239 | 239 |
# job (Job): The completed Job
|
| 240 |
- # success (bool): Whether the Job completed with a success status
|
|
| 240 |
+ # status (JobStatus): The status of the completed job
|
|
| 241 | 241 |
#
|
| 242 |
- def job_completed(self, job, success):
|
|
| 242 |
+ def job_completed(self, job, status):
|
|
| 243 | 243 |
self._resources.clear_job_resources(job)
|
| 244 | 244 |
self.active_jobs.remove(job)
|
| 245 | 245 |
if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
|
| 246 | 246 |
self._exclusive_active.remove(job.action_name)
|
| 247 |
- self._job_complete_callback(job, success)
|
|
| 247 |
+ self._job_complete_callback(job, status)
|
|
| 248 | 248 |
self._schedule_queue_jobs()
|
| 249 | 249 |
self._sched()
|
| 250 | 250 |
|
