Tristan Van Berkom pushed to branch master at BuildStream / buildstream
Commits:
-
3e36e363
by Tristan Van Berkom at 2019-01-24T16:55:24Z
-
ce01f87e
by Tristan Van Berkom at 2019-01-24T16:55:24Z
-
2479e8df
by Tristan Van Berkom at 2019-01-24T16:55:24Z
-
fdb8ff65
by Tristan Van Berkom at 2019-01-24T16:55:24Z
-
acd0bf22
by Tristan Van Berkom at 2019-01-24T18:01:41Z
5 changed files:
- buildstream/_frontend/widget.py
- buildstream/_scheduler/resources.py
- buildstream/_scheduler/scheduler.py
- tests/artifactcache/expiry.py
- tests/frontend/logging.py
Changes:
| ... | ... | @@ -175,29 +175,22 @@ class TypeName(Widget): |
| 175 | 175 |
# A widget for displaying the Element name
|
| 176 | 176 |
class ElementName(Widget):
|
| 177 | 177 |
|
| 178 |
- def __init__(self, context, content_profile, format_profile):
|
|
| 179 |
- super(ElementName, self).__init__(context, content_profile, format_profile)
|
|
| 180 |
- |
|
| 181 |
- # Pre initialization format string, before we know the length of
|
|
| 182 |
- # element names in the pipeline
|
|
| 183 |
- self._fmt_string = '{: <30}'
|
|
| 184 |
- |
|
| 185 | 178 |
def render(self, message):
|
| 179 |
+ action_name = message.action_name
|
|
| 186 | 180 |
element_id = message.task_id or message.unique_id
|
| 187 |
- if element_id is None:
|
|
| 188 |
- return ""
|
|
| 189 |
- |
|
| 190 |
- plugin = _plugin_lookup(element_id)
|
|
| 191 |
- name = plugin._get_full_name()
|
|
| 181 |
+ if element_id is not None:
|
|
| 182 |
+ plugin = _plugin_lookup(element_id)
|
|
| 183 |
+ name = plugin._get_full_name()
|
|
| 184 |
+ name = '{: <30}'.format(name)
|
|
| 185 |
+ else:
|
|
| 186 |
+ name = 'core activity'
|
|
| 187 |
+ name = '{: <30}'.format(name)
|
|
| 192 | 188 |
|
| 193 |
- # Sneak the action name in with the element name
|
|
| 194 |
- action_name = message.action_name
|
|
| 195 | 189 |
if not action_name:
|
| 196 | 190 |
action_name = "Main"
|
| 197 | 191 |
|
| 198 | 192 |
return self.content_profile.fmt("{: >5}".format(action_name.lower())) + \
|
| 199 |
- self.format_profile.fmt(':') + \
|
|
| 200 |
- self.content_profile.fmt(self._fmt_string.format(name))
|
|
| 193 |
+ self.format_profile.fmt(':') + self.content_profile.fmt(name)
|
|
| 201 | 194 |
|
| 202 | 195 |
|
| 203 | 196 |
# A widget for displaying the primary message text
|
| ... | ... | @@ -219,9 +212,12 @@ class CacheKey(Widget): |
| 219 | 212 |
def render(self, message):
|
| 220 | 213 |
|
| 221 | 214 |
element_id = message.task_id or message.unique_id
|
| 222 |
- if element_id is None or not self._key_length:
|
|
| 215 |
+ if not self._key_length:
|
|
| 223 | 216 |
return ""
|
| 224 | 217 |
|
| 218 |
+ if element_id is None:
|
|
| 219 |
+ return ' ' * self._key_length
|
|
| 220 |
+ |
|
| 225 | 221 |
missing = False
|
| 226 | 222 |
key = ' ' * self._key_length
|
| 227 | 223 |
plugin = _plugin_lookup(element_id)
|
| ... | ... | @@ -163,4 +163,4 @@ class Resources(): |
| 163 | 163 |
def unregister_exclusive_interest(self, resources, source):
|
| 164 | 164 |
|
| 165 | 165 |
for resource in resources:
|
| 166 |
- self._exclusive_resources[resource].remove(source)
|
|
| 166 |
+ self._exclusive_resources[resource].discard(source)
|
| ... | ... | @@ -40,8 +40,8 @@ class SchedStatus(): |
| 40 | 40 |
|
| 41 | 41 |
# Some action names for the internal jobs we launch
|
| 42 | 42 |
#
|
| 43 |
-_ACTION_NAME_CLEANUP = 'cleanup'
|
|
| 44 |
-_ACTION_NAME_CACHE_SIZE = 'cache_size'
|
|
| 43 |
+_ACTION_NAME_CLEANUP = 'clean'
|
|
| 44 |
+_ACTION_NAME_CACHE_SIZE = 'size'
|
|
| 45 | 45 |
|
| 46 | 46 |
|
| 47 | 47 |
# Scheduler()
|
| ... | ... | @@ -151,6 +151,9 @@ class Scheduler(): |
| 151 | 151 |
# Handle unix signals while running
|
| 152 | 152 |
self._connect_signals()
|
| 153 | 153 |
|
| 154 |
+ # Check if we need to start with some cache maintenance
|
|
| 155 |
+ self._check_cache_management()
|
|
| 156 |
+ |
|
| 154 | 157 |
# Run the queues
|
| 155 | 158 |
self._sched()
|
| 156 | 159 |
self.loop.run_forever()
|
| ... | ... | @@ -272,6 +275,31 @@ class Scheduler(): |
| 272 | 275 |
# Local Private Methods #
|
| 273 | 276 |
#######################################################
|
| 274 | 277 |
|
| 278 |
+ # _check_cache_management()
|
|
| 279 |
+ #
|
|
| 280 |
+ # Run an initial check if we need to lock the cache
|
|
| 281 |
+ # resource and check the size and possibly launch
|
|
| 282 |
+ # a cleanup.
|
|
| 283 |
+ #
|
|
| 284 |
+ # Sessions which do not add to the cache are not affected.
|
|
| 285 |
+ #
|
|
| 286 |
+ def _check_cache_management(self):
|
|
| 287 |
+ |
|
| 288 |
+ # Only trigger the check for a scheduler run which has
|
|
| 289 |
+ # queues which require the CACHE resource.
|
|
| 290 |
+ if not any(q for q in self.queues
|
|
| 291 |
+ if ResourceType.CACHE in q.resources):
|
|
| 292 |
+ return
|
|
| 293 |
+ |
|
| 294 |
+ # If the estimated size outgrows the quota, queue a job to
|
|
| 295 |
+ # actually check the real cache size initially, this one
|
|
| 296 |
+ # should have exclusive access to the cache to ensure nothing
|
|
| 297 |
+ # starts while we are checking the cache.
|
|
| 298 |
+ #
|
|
| 299 |
+ artifacts = self.context.artifactcache
|
|
| 300 |
+ if artifacts.has_quota_exceeded():
|
|
| 301 |
+ self._sched_cache_size_job(exclusive=True)
|
|
| 302 |
+ |
|
| 275 | 303 |
# _spawn_job()
|
| 276 | 304 |
#
|
| 277 | 305 |
# Spanws a job
|
| ... | ... | @@ -292,6 +320,11 @@ class Scheduler(): |
| 292 | 320 |
self._cache_size_running = None
|
| 293 | 321 |
self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
|
| 294 | 322 |
|
| 323 |
+ # Unregister the exclusive interest if there was any
|
|
| 324 |
+ self.resources.unregister_exclusive_interest(
|
|
| 325 |
+ [ResourceType.CACHE], 'cache-size'
|
|
| 326 |
+ )
|
|
| 327 |
+ |
|
| 295 | 328 |
# Schedule a cleanup job if we've hit the threshold
|
| 296 | 329 |
if status != JobStatus.OK:
|
| 297 | 330 |
return
|
| ... | ... | @@ -344,11 +377,35 @@ class Scheduler(): |
| 344 | 377 |
# Runs a cache size job if one is scheduled to run now and
|
| 345 | 378 |
# sufficient recources are available.
|
| 346 | 379 |
#
|
| 347 |
- def _sched_cache_size_job(self):
|
|
| 380 |
+ # Args:
|
|
| 381 |
+ # exclusive (bool): Run a cache size job immediately and
|
|
| 382 |
+ # hold the ResourceType.CACHE resource
|
|
| 383 |
+ # exclusively (used at startup).
|
|
| 384 |
+ #
|
|
| 385 |
+ def _sched_cache_size_job(self, *, exclusive=False):
|
|
| 386 |
+ |
|
| 387 |
+ # The exclusive argument is not intended (or safe) for arbitrary use.
|
|
| 388 |
+ if exclusive:
|
|
| 389 |
+ assert not self._cache_size_scheduled
|
|
| 390 |
+ assert not self._cache_size_running
|
|
| 391 |
+ assert not self._active_jobs
|
|
| 392 |
+ self._cache_size_scheduled = True
|
|
| 348 | 393 |
|
| 349 | 394 |
if self._cache_size_scheduled and not self._cache_size_running:
|
| 350 | 395 |
|
| 351 |
- if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
|
|
| 396 |
+ # Handle the exclusive launch
|
|
| 397 |
+ exclusive_resources = set()
|
|
| 398 |
+ if exclusive:
|
|
| 399 |
+ exclusive_resources.add(ResourceType.CACHE)
|
|
| 400 |
+ self.resources.register_exclusive_interest(
|
|
| 401 |
+ exclusive_resources, 'cache-size'
|
|
| 402 |
+ )
|
|
| 403 |
+ |
|
| 404 |
+ # Reserve the resources (with the possible exclusive cache resource)
|
|
| 405 |
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
|
|
| 406 |
+ exclusive_resources):
|
|
| 407 |
+ |
|
| 408 |
+ # Update state and launch
|
|
| 352 | 409 |
self._cache_size_scheduled = False
|
| 353 | 410 |
self._cache_size_running = \
|
| 354 | 411 |
CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
|
| ... | ... | @@ -18,6 +18,7 @@ |
| 18 | 18 |
#
|
| 19 | 19 |
|
| 20 | 20 |
import os
|
| 21 |
+import re
|
|
| 21 | 22 |
from unittest import mock
|
| 22 | 23 |
|
| 23 | 24 |
import pytest
|
| ... | ... | @@ -425,3 +426,66 @@ def test_extract_expiry(cli, datafiles, tmpdir): |
| 425 | 426 |
|
| 426 | 427 |
assert os.path.isdir(refsdirtarget2)
|
| 427 | 428 |
assert not os.path.exists(refsdirtarget)
|
| 429 |
+ |
|
| 430 |
+ |
|
| 431 |
+# Ensures that when launching BuildStream with a full artifact cache,
|
|
| 432 |
+# the cache size and cleanup jobs are run before any other jobs.
|
|
| 433 |
+#
|
|
| 434 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
| 435 |
+def test_cleanup_first(cli, datafiles, tmpdir):
|
|
| 436 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
| 437 |
+ element_path = 'elements'
|
|
| 438 |
+ cache_location = os.path.join(project, 'cache', 'artifacts', 'ostree')
|
|
| 439 |
+ checkout = os.path.join(project, 'checkout')
|
|
| 440 |
+ |
|
| 441 |
+ cli.configure({
|
|
| 442 |
+ 'cache': {
|
|
| 443 |
+ 'quota': 10000000,
|
|
| 444 |
+ }
|
|
| 445 |
+ })
|
|
| 446 |
+ |
|
| 447 |
+ # Create an element that uses almost the entire cache (an empty
|
|
| 448 |
+ # ostree cache starts at about ~10KiB, so we need a bit of a
|
|
| 449 |
+ # buffer)
|
|
| 450 |
+ create_element_size('target.bst', project, element_path, [], 8000000)
|
|
| 451 |
+ res = cli.run(project=project, args=['build', 'target.bst'])
|
|
| 452 |
+ res.assert_success()
|
|
| 453 |
+ |
|
| 454 |
+ assert cli.get_element_state(project, 'target.bst') == 'cached'
|
|
| 455 |
+ |
|
| 456 |
+ # Now configure with a smaller quota, create a situation
|
|
| 457 |
+ # where the cache must be cleaned up before building anything else.
|
|
| 458 |
+ #
|
|
| 459 |
+ # Fix the fetchers and builders just to ensure a predictable
|
|
| 460 |
+ # sequence of events (although it does not effect this test)
|
|
| 461 |
+ cli.configure({
|
|
| 462 |
+ 'cache': {
|
|
| 463 |
+ 'quota': 5000000,
|
|
| 464 |
+ },
|
|
| 465 |
+ 'scheduler': {
|
|
| 466 |
+ 'fetchers': 1,
|
|
| 467 |
+ 'builders': 1
|
|
| 468 |
+ }
|
|
| 469 |
+ })
|
|
| 470 |
+ |
|
| 471 |
+ # Our cache is now more than full, BuildStream
|
|
| 472 |
+ create_element_size('target2.bst', project, element_path, [], 4000000)
|
|
| 473 |
+ res = cli.run(project=project, args=['build', 'target2.bst'])
|
|
| 474 |
+ res.assert_success()
|
|
| 475 |
+ |
|
| 476 |
+ # Find all of the activity (like push, pull, fetch) lines
|
|
| 477 |
+ results = re.findall(r'\[.*\]\[.*\]\[\s*(\S+):.*\]\s*START\s*.*\.log', res.stderr)
|
|
| 478 |
+ |
|
| 479 |
+ # Don't bother checking the order of 'fetch', it is allowed to start
|
|
| 480 |
+ # before or after the initial cache size job, runs in parallel, and does
|
|
| 481 |
+ # not require ResourceType.CACHE.
|
|
| 482 |
+ results.remove('fetch')
|
|
| 483 |
+ print(results)
|
|
| 484 |
+ |
|
| 485 |
+ # Assert the expected sequence of events
|
|
| 486 |
+ assert results == ['size', 'clean', 'build']
|
|
| 487 |
+ |
|
| 488 |
+ # Check that the correct element remains in the cache
|
|
| 489 |
+ states = cli.get_element_states(project, ['target.bst', 'target2.bst'])
|
|
| 490 |
+ assert states['target.bst'] != 'cached'
|
|
| 491 |
+ assert states['target2.bst'] == 'cached'
|
| ... | ... | @@ -41,7 +41,7 @@ def test_default_logging(cli, tmpdir, datafiles): |
| 41 | 41 |
result = cli.run(project=project, args=['source', 'fetch', element_name])
|
| 42 | 42 |
result.assert_success()
|
| 43 | 43 |
|
| 44 |
- m = re.search(r"\[\d\d:\d\d:\d\d\]\[\]\[\] SUCCESS Checking sources", result.stderr)
|
|
| 44 |
+ m = re.search(r"\[\d\d:\d\d:\d\d\]\[\s*\]\[.*\] SUCCESS Checking sources", result.stderr)
|
|
| 45 | 45 |
assert(m is not None)
|
| 46 | 46 |
|
| 47 | 47 |
|
| ... | ... | @@ -77,7 +77,7 @@ def test_custom_logging(cli, tmpdir, datafiles): |
| 77 | 77 |
result = cli.run(project=project, args=['source', 'fetch', element_name])
|
| 78 | 78 |
result.assert_success()
|
| 79 | 79 |
|
| 80 |
- m = re.search(r"\d\d:\d\d:\d\d,\d\d:\d\d:\d\d.\d{6},\d\d:\d\d:\d\d,,,SUCCESS,Checking sources", result.stderr)
|
|
| 80 |
+ m = re.search(r"\d\d:\d\d:\d\d,\d\d:\d\d:\d\d.\d{6},\d\d:\d\d:\d\d,\s*,.*,SUCCESS,Checking sources", result.stderr)
|
|
| 81 | 81 |
assert(m is not None)
|
| 82 | 82 |
|
| 83 | 83 |
|
