Tristan Van Berkom pushed to branch master at BuildStream / buildstream
Commits:
-
3e36e363
by Tristan Van Berkom at 2019-01-24T16:55:24Z
-
ce01f87e
by Tristan Van Berkom at 2019-01-24T16:55:24Z
-
2479e8df
by Tristan Van Berkom at 2019-01-24T16:55:24Z
-
fdb8ff65
by Tristan Van Berkom at 2019-01-24T16:55:24Z
-
acd0bf22
by Tristan Van Berkom at 2019-01-24T18:01:41Z
5 changed files:
- buildstream/_frontend/widget.py
- buildstream/_scheduler/resources.py
- buildstream/_scheduler/scheduler.py
- tests/artifactcache/expiry.py
- tests/frontend/logging.py
Changes:
... | ... | @@ -175,29 +175,22 @@ class TypeName(Widget): |
175 | 175 |
# A widget for displaying the Element name
|
176 | 176 |
class ElementName(Widget):
|
177 | 177 |
|
178 |
- def __init__(self, context, content_profile, format_profile):
|
|
179 |
- super(ElementName, self).__init__(context, content_profile, format_profile)
|
|
180 |
- |
|
181 |
- # Pre initialization format string, before we know the length of
|
|
182 |
- # element names in the pipeline
|
|
183 |
- self._fmt_string = '{: <30}'
|
|
184 |
- |
|
185 | 178 |
def render(self, message):
|
179 |
+ action_name = message.action_name
|
|
186 | 180 |
element_id = message.task_id or message.unique_id
|
187 |
- if element_id is None:
|
|
188 |
- return ""
|
|
189 |
- |
|
190 |
- plugin = _plugin_lookup(element_id)
|
|
191 |
- name = plugin._get_full_name()
|
|
181 |
+ if element_id is not None:
|
|
182 |
+ plugin = _plugin_lookup(element_id)
|
|
183 |
+ name = plugin._get_full_name()
|
|
184 |
+ name = '{: <30}'.format(name)
|
|
185 |
+ else:
|
|
186 |
+ name = 'core activity'
|
|
187 |
+ name = '{: <30}'.format(name)
|
|
192 | 188 |
|
193 |
- # Sneak the action name in with the element name
|
|
194 |
- action_name = message.action_name
|
|
195 | 189 |
if not action_name:
|
196 | 190 |
action_name = "Main"
|
197 | 191 |
|
198 | 192 |
return self.content_profile.fmt("{: >5}".format(action_name.lower())) + \
|
199 |
- self.format_profile.fmt(':') + \
|
|
200 |
- self.content_profile.fmt(self._fmt_string.format(name))
|
|
193 |
+ self.format_profile.fmt(':') + self.content_profile.fmt(name)
|
|
201 | 194 |
|
202 | 195 |
|
203 | 196 |
# A widget for displaying the primary message text
|
... | ... | @@ -219,9 +212,12 @@ class CacheKey(Widget): |
219 | 212 |
def render(self, message):
|
220 | 213 |
|
221 | 214 |
element_id = message.task_id or message.unique_id
|
222 |
- if element_id is None or not self._key_length:
|
|
215 |
+ if not self._key_length:
|
|
223 | 216 |
return ""
|
224 | 217 |
|
218 |
+ if element_id is None:
|
|
219 |
+ return ' ' * self._key_length
|
|
220 |
+ |
|
225 | 221 |
missing = False
|
226 | 222 |
key = ' ' * self._key_length
|
227 | 223 |
plugin = _plugin_lookup(element_id)
|
... | ... | @@ -163,4 +163,4 @@ class Resources(): |
163 | 163 |
def unregister_exclusive_interest(self, resources, source):
|
164 | 164 |
|
165 | 165 |
for resource in resources:
|
166 |
- self._exclusive_resources[resource].remove(source)
|
|
166 |
+ self._exclusive_resources[resource].discard(source)
|
... | ... | @@ -40,8 +40,8 @@ class SchedStatus(): |
40 | 40 |
|
41 | 41 |
# Some action names for the internal jobs we launch
|
42 | 42 |
#
|
43 |
-_ACTION_NAME_CLEANUP = 'cleanup'
|
|
44 |
-_ACTION_NAME_CACHE_SIZE = 'cache_size'
|
|
43 |
+_ACTION_NAME_CLEANUP = 'clean'
|
|
44 |
+_ACTION_NAME_CACHE_SIZE = 'size'
|
|
45 | 45 |
|
46 | 46 |
|
47 | 47 |
# Scheduler()
|
... | ... | @@ -151,6 +151,9 @@ class Scheduler(): |
151 | 151 |
# Handle unix signals while running
|
152 | 152 |
self._connect_signals()
|
153 | 153 |
|
154 |
+ # Check if we need to start with some cache maintenance
|
|
155 |
+ self._check_cache_management()
|
|
156 |
+ |
|
154 | 157 |
# Run the queues
|
155 | 158 |
self._sched()
|
156 | 159 |
self.loop.run_forever()
|
... | ... | @@ -272,6 +275,31 @@ class Scheduler(): |
272 | 275 |
# Local Private Methods #
|
273 | 276 |
#######################################################
|
274 | 277 |
|
278 |
+ # _check_cache_management()
|
|
279 |
+ #
|
|
280 |
+ # Run an initial check if we need to lock the cache
|
|
281 |
+ # resource and check the size and possibly launch
|
|
282 |
+ # a cleanup.
|
|
283 |
+ #
|
|
284 |
+ # Sessions which do not add to the cache are not affected.
|
|
285 |
+ #
|
|
286 |
+ def _check_cache_management(self):
|
|
287 |
+ |
|
288 |
+ # Only trigger the check for a scheduler run which has
|
|
289 |
+ # queues which require the CACHE resource.
|
|
290 |
+ if not any(q for q in self.queues
|
|
291 |
+ if ResourceType.CACHE in q.resources):
|
|
292 |
+ return
|
|
293 |
+ |
|
294 |
+ # If the estimated size outgrows the quota, queue a job to
|
|
295 |
+ # actually check the real cache size initially, this one
|
|
296 |
+ # should have exclusive access to the cache to ensure nothing
|
|
297 |
+ # starts while we are checking the cache.
|
|
298 |
+ #
|
|
299 |
+ artifacts = self.context.artifactcache
|
|
300 |
+ if artifacts.has_quota_exceeded():
|
|
301 |
+ self._sched_cache_size_job(exclusive=True)
|
|
302 |
+ |
|
275 | 303 |
# _spawn_job()
|
276 | 304 |
#
|
277 | 305 |
# Spanws a job
|
... | ... | @@ -292,6 +320,11 @@ class Scheduler(): |
292 | 320 |
self._cache_size_running = None
|
293 | 321 |
self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
|
294 | 322 |
|
323 |
+ # Unregister the exclusive interest if there was any
|
|
324 |
+ self.resources.unregister_exclusive_interest(
|
|
325 |
+ [ResourceType.CACHE], 'cache-size'
|
|
326 |
+ )
|
|
327 |
+ |
|
295 | 328 |
# Schedule a cleanup job if we've hit the threshold
|
296 | 329 |
if status != JobStatus.OK:
|
297 | 330 |
return
|
... | ... | @@ -344,11 +377,35 @@ class Scheduler(): |
344 | 377 |
# Runs a cache size job if one is scheduled to run now and
|
345 | 378 |
# sufficient recources are available.
|
346 | 379 |
#
|
347 |
- def _sched_cache_size_job(self):
|
|
380 |
+ # Args:
|
|
381 |
+ # exclusive (bool): Run a cache size job immediately and
|
|
382 |
+ # hold the ResourceType.CACHE resource
|
|
383 |
+ # exclusively (used at startup).
|
|
384 |
+ #
|
|
385 |
+ def _sched_cache_size_job(self, *, exclusive=False):
|
|
386 |
+ |
|
387 |
+ # The exclusive argument is not intended (or safe) for arbitrary use.
|
|
388 |
+ if exclusive:
|
|
389 |
+ assert not self._cache_size_scheduled
|
|
390 |
+ assert not self._cache_size_running
|
|
391 |
+ assert not self._active_jobs
|
|
392 |
+ self._cache_size_scheduled = True
|
|
348 | 393 |
|
349 | 394 |
if self._cache_size_scheduled and not self._cache_size_running:
|
350 | 395 |
|
351 |
- if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
|
|
396 |
+ # Handle the exclusive launch
|
|
397 |
+ exclusive_resources = set()
|
|
398 |
+ if exclusive:
|
|
399 |
+ exclusive_resources.add(ResourceType.CACHE)
|
|
400 |
+ self.resources.register_exclusive_interest(
|
|
401 |
+ exclusive_resources, 'cache-size'
|
|
402 |
+ )
|
|
403 |
+ |
|
404 |
+ # Reserve the resources (with the possible exclusive cache resource)
|
|
405 |
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
|
|
406 |
+ exclusive_resources):
|
|
407 |
+ |
|
408 |
+ # Update state and launch
|
|
352 | 409 |
self._cache_size_scheduled = False
|
353 | 410 |
self._cache_size_running = \
|
354 | 411 |
CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
|
... | ... | @@ -18,6 +18,7 @@ |
18 | 18 |
#
|
19 | 19 |
|
20 | 20 |
import os
|
21 |
+import re
|
|
21 | 22 |
from unittest import mock
|
22 | 23 |
|
23 | 24 |
import pytest
|
... | ... | @@ -425,3 +426,66 @@ def test_extract_expiry(cli, datafiles, tmpdir): |
425 | 426 |
|
426 | 427 |
assert os.path.isdir(refsdirtarget2)
|
427 | 428 |
assert not os.path.exists(refsdirtarget)
|
429 |
+ |
|
430 |
+ |
|
431 |
+# Ensures that when launching BuildStream with a full artifact cache,
|
|
432 |
+# the cache size and cleanup jobs are run before any other jobs.
|
|
433 |
+#
|
|
434 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
435 |
+def test_cleanup_first(cli, datafiles, tmpdir):
|
|
436 |
+ project = os.path.join(datafiles.dirname, datafiles.basename)
|
|
437 |
+ element_path = 'elements'
|
|
438 |
+ cache_location = os.path.join(project, 'cache', 'artifacts', 'ostree')
|
|
439 |
+ checkout = os.path.join(project, 'checkout')
|
|
440 |
+ |
|
441 |
+ cli.configure({
|
|
442 |
+ 'cache': {
|
|
443 |
+ 'quota': 10000000,
|
|
444 |
+ }
|
|
445 |
+ })
|
|
446 |
+ |
|
447 |
+ # Create an element that uses almost the entire cache (an empty
|
|
448 |
+ # ostree cache starts at about ~10KiB, so we need a bit of a
|
|
449 |
+ # buffer)
|
|
450 |
+ create_element_size('target.bst', project, element_path, [], 8000000)
|
|
451 |
+ res = cli.run(project=project, args=['build', 'target.bst'])
|
|
452 |
+ res.assert_success()
|
|
453 |
+ |
|
454 |
+ assert cli.get_element_state(project, 'target.bst') == 'cached'
|
|
455 |
+ |
|
456 |
+ # Now configure with a smaller quota, create a situation
|
|
457 |
+ # where the cache must be cleaned up before building anything else.
|
|
458 |
+ #
|
|
459 |
+ # Fix the fetchers and builders just to ensure a predictable
|
|
460 |
+ # sequence of events (although it does not effect this test)
|
|
461 |
+ cli.configure({
|
|
462 |
+ 'cache': {
|
|
463 |
+ 'quota': 5000000,
|
|
464 |
+ },
|
|
465 |
+ 'scheduler': {
|
|
466 |
+ 'fetchers': 1,
|
|
467 |
+ 'builders': 1
|
|
468 |
+ }
|
|
469 |
+ })
|
|
470 |
+ |
|
471 |
+ # Our cache is now more than full, BuildStream
|
|
472 |
+ create_element_size('target2.bst', project, element_path, [], 4000000)
|
|
473 |
+ res = cli.run(project=project, args=['build', 'target2.bst'])
|
|
474 |
+ res.assert_success()
|
|
475 |
+ |
|
476 |
+ # Find all of the activity (like push, pull, fetch) lines
|
|
477 |
+ results = re.findall(r'\[.*\]\[.*\]\[\s*(\S+):.*\]\s*START\s*.*\.log', res.stderr)
|
|
478 |
+ |
|
479 |
+ # Don't bother checking the order of 'fetch', it is allowed to start
|
|
480 |
+ # before or after the initial cache size job, runs in parallel, and does
|
|
481 |
+ # not require ResourceType.CACHE.
|
|
482 |
+ results.remove('fetch')
|
|
483 |
+ print(results)
|
|
484 |
+ |
|
485 |
+ # Assert the expected sequence of events
|
|
486 |
+ assert results == ['size', 'clean', 'build']
|
|
487 |
+ |
|
488 |
+ # Check that the correct element remains in the cache
|
|
489 |
+ states = cli.get_element_states(project, ['target.bst', 'target2.bst'])
|
|
490 |
+ assert states['target.bst'] != 'cached'
|
|
491 |
+ assert states['target2.bst'] == 'cached'
|
... | ... | @@ -41,7 +41,7 @@ def test_default_logging(cli, tmpdir, datafiles): |
41 | 41 |
result = cli.run(project=project, args=['source', 'fetch', element_name])
|
42 | 42 |
result.assert_success()
|
43 | 43 |
|
44 |
- m = re.search(r"\[\d\d:\d\d:\d\d\]\[\]\[\] SUCCESS Checking sources", result.stderr)
|
|
44 |
+ m = re.search(r"\[\d\d:\d\d:\d\d\]\[\s*\]\[.*\] SUCCESS Checking sources", result.stderr)
|
|
45 | 45 |
assert(m is not None)
|
46 | 46 |
|
47 | 47 |
|
... | ... | @@ -77,7 +77,7 @@ def test_custom_logging(cli, tmpdir, datafiles): |
77 | 77 |
result = cli.run(project=project, args=['source', 'fetch', element_name])
|
78 | 78 |
result.assert_success()
|
79 | 79 |
|
80 |
- m = re.search(r"\d\d:\d\d:\d\d,\d\d:\d\d:\d\d.\d{6},\d\d:\d\d:\d\d,,,SUCCESS,Checking sources", result.stderr)
|
|
80 |
+ m = re.search(r"\d\d:\d\d:\d\d,\d\d:\d\d:\d\d.\d{6},\d\d:\d\d:\d\d,\s*,.*,SUCCESS,Checking sources", result.stderr)
|
|
81 | 81 |
assert(m is not None)
|
82 | 82 |
|
83 | 83 |
|