Tristan Van Berkom pushed to branch tristan/cache-management at BuildStream / buildstream
Commits:
-
51803907
by Tristan Van Berkom at 2019-01-22T18:42:28Z
-
f85fcd8b
by Tristan Van Berkom at 2019-01-22T18:42:28Z
-
64103cf5
by Tristan Van Berkom at 2019-01-22T18:42:28Z
-
fff6a2b4
by Tristan Van Berkom at 2019-01-22T18:42:28Z
-
4197d20a
by Tristan Van Berkom at 2019-01-22T18:42:28Z
4 changed files:
- buildstream/_artifactcache.py
- buildstream/_frontend/status.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/jobs/job.py
Changes:
... | ... | @@ -250,11 +250,33 @@ class ArtifactCache(): |
250 | 250 |
#
|
251 | 251 |
# Clean the artifact cache as much as possible.
|
252 | 252 |
#
|
253 |
+ # Args:
|
|
254 |
+ # progress (callable): A callback to call when a ref is removed
|
|
255 |
+ #
|
|
253 | 256 |
# Returns:
|
254 | 257 |
# (int): The size of the cache after having cleaned up
|
255 | 258 |
#
|
256 |
- def clean(self):
|
|
259 |
+ def clean(self, progress=None):
|
|
257 | 260 |
artifacts = self.list_artifacts()
|
261 |
+ context = self.context
|
|
262 |
+ |
|
263 |
+ # Some accumulative statistics
|
|
264 |
+ removed_ref_count = 0
|
|
265 |
+ space_saved = 0
|
|
266 |
+ |
|
267 |
+ # Start off with an announcement with as much info as possible
|
|
268 |
+ volume_size, volume_avail = self._get_cache_volume_size()
|
|
269 |
+ self._message(MessageType.STATUS, "Starting cache cleanup",
|
|
270 |
+ detail=("Elements required by the current build plan: {}\n" +
|
|
271 |
+ "User specified quota: {} ({})\n" +
|
|
272 |
+ "Cache usage: {}\n" +
|
|
273 |
+ "Cache volume: {} total, {} available")
|
|
274 |
+ .format(len(self._required_elements),
|
|
275 |
+ context.config_cache_quota,
|
|
276 |
+ utils._pretty_size(self._cache_quota_original, dec_places=2),
|
|
277 |
+ utils._pretty_size(self.get_cache_size(), dec_places=2),
|
|
278 |
+ utils._pretty_size(volume_size, dec_places=2),
|
|
279 |
+ utils._pretty_size(volume_avail, dec_places=2)))
|
|
258 | 280 |
|
259 | 281 |
# Build a set of the cache keys which are required
|
260 | 282 |
# based on the required elements at cleanup time
|
... | ... | @@ -279,11 +301,15 @@ class ArtifactCache(): |
279 | 301 |
# can't remove them, we have to abort the build.
|
280 | 302 |
#
|
281 | 303 |
# FIXME: Asking the user what to do may be neater
|
304 |
+ #
|
|
282 | 305 |
default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
|
283 | 306 |
'buildstream.conf')
|
284 |
- detail = ("There is not enough space to complete the build.\n"
|
|
285 |
- "Please increase the cache-quota in {}."
|
|
286 |
- .format(self.context.config_origin or default_conf))
|
|
307 |
+ detail = ("Aborted after removing {} refs and saving {} disk space.\n\n"
|
|
308 |
+ "There is not enough space to complete the build.\n"
|
|
309 |
+ "Please increase the cache-quota in {} and/or make more disk space."
|
|
310 |
+ .format(removed_ref_count,
|
|
311 |
+ utils._pretty_size(space_saved, dec_places=2),
|
|
312 |
+ (context.config_origin or default_conf)))
|
|
287 | 313 |
|
288 | 314 |
if self.has_quota_exceeded():
|
289 | 315 |
raise ArtifactError("Cache too full. Aborting.",
|
... | ... | @@ -298,10 +324,24 @@ class ArtifactCache(): |
298 | 324 |
# Remove the actual artifact, if it's not required.
|
299 | 325 |
size = self.remove(to_remove)
|
300 | 326 |
|
327 |
+ removed_ref_count += 1
|
|
328 |
+ space_saved += size
|
|
329 |
+ |
|
301 | 330 |
# Remove the size from the removed size
|
302 | 331 |
self.set_cache_size(self._cache_size - size)
|
303 | 332 |
|
304 |
- # This should be O(1) if implemented correctly
|
|
333 |
+ # User callback
|
|
334 |
+ if progress:
|
|
335 |
+ progress()
|
|
336 |
+ |
|
337 |
+ # Informational message about the side effects of the cleanup
|
|
338 |
+ self._message(MessageType.INFO, "Cleanup completed",
|
|
339 |
+ detail=("Removed {} refs and saving {} disk space.\n" +
|
|
340 |
+ "Cache usage is now: {}")
|
|
341 |
+ .format(removed_ref_count,
|
|
342 |
+ utils._pretty_size(space_saved, dec_places=2),
|
|
343 |
+ utils._pretty_size(self.get_cache_size(), dec_places=2)))
|
|
344 |
+ |
|
305 | 345 |
return self.get_cache_size()
|
306 | 346 |
|
307 | 347 |
# compute_cache_size()
|
... | ... | @@ -313,7 +353,14 @@ class ArtifactCache(): |
313 | 353 |
# (int): The size of the artifact cache.
|
314 | 354 |
#
|
315 | 355 |
def compute_cache_size(self):
|
316 |
- self._cache_size = self.cas.calculate_cache_size()
|
|
356 |
+ old_cache_size = self._cache_size
|
|
357 |
+ new_cache_size = self.cas.calculate_cache_size()
|
|
358 |
+ |
|
359 |
+ if old_cache_size != new_cache_size:
|
|
360 |
+ self._cache_size = new_cache_size
|
|
361 |
+ |
|
362 |
+ usage = ArtifactCacheUsage(self)
|
|
363 |
+ self._message(MessageType.STATUS, "Cache usage recomputed: {}".format(usage))
|
|
317 | 364 |
|
318 | 365 |
return self._cache_size
|
319 | 366 |
|
... | ... | @@ -353,13 +353,17 @@ class _StatusHeader(): |
353 | 353 |
def render(self, line_length, elapsed):
|
354 | 354 |
project = self._context.get_toplevel_project()
|
355 | 355 |
line_length = max(line_length, 80)
|
356 |
- size = 0
|
|
357 |
- text = ''
|
|
358 | 356 |
|
357 |
+ #
|
|
358 |
+ # Line 1: Session time, project name, session / total elements
|
|
359 |
+ #
|
|
360 |
+ # ========= 00:00:00 project-name (143/387) =========
|
|
361 |
+ #
|
|
359 | 362 |
session = str(len(self._stream.session_elements))
|
360 | 363 |
total = str(len(self._stream.total_elements))
|
361 | 364 |
|
362 |
- # Format and calculate size for target and overall time code
|
|
365 |
+ size = 0
|
|
366 |
+ text = ''
|
|
363 | 367 |
size += len(total) + len(session) + 4 # Size for (N/N) with a leading space
|
364 | 368 |
size += 8 # Size of time code
|
365 | 369 |
size += len(project.name) + 1
|
... | ... | @@ -372,6 +376,12 @@ class _StatusHeader(): |
372 | 376 |
self._format_profile.fmt(')')
|
373 | 377 |
|
374 | 378 |
line1 = self._centered(text, size, line_length, '=')
|
379 |
+ |
|
380 |
+ #
|
|
381 |
+ # Line 2: Dynamic list of queue status reports
|
|
382 |
+ #
|
|
383 |
+ # (Fetched:0 117 0)→ (Built:4 0 0)
|
|
384 |
+ #
|
|
375 | 385 |
size = 0
|
376 | 386 |
text = ''
|
377 | 387 |
|
... | ... | @@ -389,10 +399,28 @@ class _StatusHeader(): |
389 | 399 |
|
390 | 400 |
line2 = self._centered(text, size, line_length, ' ')
|
391 | 401 |
|
392 |
- size = 24
|
|
393 |
- text = self._format_profile.fmt("~~~~~ ") + \
|
|
394 |
- self._content_profile.fmt('Active Tasks') + \
|
|
395 |
- self._format_profile.fmt(" ~~~~~")
|
|
402 |
+ #
|
|
403 |
+ # Line 3: Cache usage percentage report
|
|
404 |
+ #
|
|
405 |
+ # ~~~~~~ cache: 69% ~~~~~~
|
|
406 |
+ #
|
|
407 |
+ usage = self._context.get_artifact_cache_usage()
|
|
408 |
+ usage_percent = '{}%'.format(usage.used_percent)
|
|
409 |
+ |
|
410 |
+ size = 21
|
|
411 |
+ size += len(usage_percent)
|
|
412 |
+ if usage.used_percent >= 95:
|
|
413 |
+ formatted_usage_percent = self._error_profile.fmt(usage_percent)
|
|
414 |
+ elif usage.used_percent >= 80:
|
|
415 |
+ formatted_usage_percent = self._content_profile.fmt(usage_percent)
|
|
416 |
+ else:
|
|
417 |
+ formatted_usage_percent = self._success_profile.fmt(usage_percent)
|
|
418 |
+ |
|
419 |
+ text = self._format_profile.fmt("~~~~~~ ") + \
|
|
420 |
+ self._content_profile.fmt('cache') + \
|
|
421 |
+ self._format_profile.fmt(': ') + \
|
|
422 |
+ formatted_usage_percent + \
|
|
423 |
+ self._format_profile.fmt(' ~~~~~~')
|
|
396 | 424 |
line3 = self._centered(text, size, line_length, ' ')
|
397 | 425 |
|
398 | 426 |
return line1 + '\n' + line2 + '\n' + line3
|
... | ... | @@ -28,7 +28,17 @@ class CleanupJob(Job): |
28 | 28 |
self._artifacts = context.artifactcache
|
29 | 29 |
|
30 | 30 |
def child_process(self):
|
31 |
- return self._artifacts.clean()
|
|
31 |
+ def progress():
|
|
32 |
+ self.send_message('update-cache-size',
|
|
33 |
+ self._artifacts.get_cache_size())
|
|
34 |
+ return self._artifacts.clean(progress)
|
|
35 |
+ |
|
36 |
+ def handle_message(self, message_type, message):
|
|
37 |
+ |
|
38 |
+ # Update the cache size in the main process as we go,
|
|
39 |
+ # this provides better feedback in the UI.
|
|
40 |
+ if message_type == 'update-cache-size':
|
|
41 |
+ self._artifacts.set_cache_size(message)
|
|
32 | 42 |
|
33 | 43 |
def parent_complete(self, status, result):
|
34 | 44 |
if status == JobStatus.OK:
|
... | ... | @@ -58,10 +58,10 @@ class JobStatus(): |
58 | 58 |
|
59 | 59 |
|
60 | 60 |
# Used to distinguish between status messages and return values
|
61 |
-class Envelope():
|
|
61 |
+class _Envelope():
|
|
62 | 62 |
def __init__(self, message_type, message):
|
63 |
- self._message_type = message_type
|
|
64 |
- self._message = message
|
|
63 |
+ self.message_type = message_type
|
|
64 |
+ self.message = message
|
|
65 | 65 |
|
66 | 66 |
|
67 | 67 |
# Process class that doesn't call waitpid on its own.
|
... | ... | @@ -275,10 +275,37 @@ class Job(): |
275 | 275 |
def set_task_id(self, task_id):
|
276 | 276 |
self._task_id = task_id
|
277 | 277 |
|
278 |
+ # send_message()
|
|
279 |
+ #
|
|
280 |
+ # To be called from inside Job.child_process() implementations
|
|
281 |
+ # to send messages to the main process during processing.
|
|
282 |
+ #
|
|
283 |
+ # These messages will be processed by the class's Job.handle_message()
|
|
284 |
+ # implementation.
|
|
285 |
+ #
|
|
286 |
+ def send_message(self, message_type, message):
|
|
287 |
+ self._queue.put(_Envelope(message_type, message))
|
|
288 |
+ |
|
278 | 289 |
#######################################################
|
279 | 290 |
# Abstract Methods #
|
280 | 291 |
#######################################################
|
281 | 292 |
|
293 |
+ # handle_message()
|
|
294 |
+ #
|
|
295 |
+ # Handle a custom message. This will be called in the main process in
|
|
296 |
+ # response to any messages sent to the main proces using the
|
|
297 |
+ # Job.send_message() API from inside a Job.child_process() implementation
|
|
298 |
+ #
|
|
299 |
+ # Args:
|
|
300 |
+ # message_type (str): A string to identify the message type
|
|
301 |
+ # message (any): A simple serializable object
|
|
302 |
+ #
|
|
303 |
+ # Returns:
|
|
304 |
+ # (bool): Should return a truthy value if message_type is handled.
|
|
305 |
+ #
|
|
306 |
+ def handle_message(self, message_type, message):
|
|
307 |
+ return False
|
|
308 |
+ |
|
282 | 309 |
# parent_complete()
|
283 | 310 |
#
|
284 | 311 |
# This will be executed after the job finishes, and is expected to
|
... | ... | @@ -416,7 +443,7 @@ class Job(): |
416 | 443 |
elapsed=elapsed, detail=e.detail,
|
417 | 444 |
logfile=filename, sandbox=e.sandbox)
|
418 | 445 |
|
419 |
- self._queue.put(Envelope('child_data', self.child_process_data()))
|
|
446 |
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
|
|
420 | 447 |
|
421 | 448 |
# Report the exception to the parent (for internal testing purposes)
|
422 | 449 |
self._child_send_error(e)
|
... | ... | @@ -442,7 +469,7 @@ class Job(): |
442 | 469 |
|
443 | 470 |
else:
|
444 | 471 |
# No exception occurred in the action
|
445 |
- self._queue.put(Envelope('child_data', self.child_process_data()))
|
|
472 |
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
|
|
446 | 473 |
self._child_send_result(result)
|
447 | 474 |
|
448 | 475 |
elapsed = datetime.datetime.now() - starttime
|
... | ... | @@ -469,7 +496,7 @@ class Job(): |
469 | 496 |
domain = e.domain
|
470 | 497 |
reason = e.reason
|
471 | 498 |
|
472 |
- envelope = Envelope('error', {
|
|
499 |
+ envelope = _Envelope('error', {
|
|
473 | 500 |
'domain': domain,
|
474 | 501 |
'reason': reason
|
475 | 502 |
})
|
... | ... | @@ -487,7 +514,7 @@ class Job(): |
487 | 514 |
#
|
488 | 515 |
def _child_send_result(self, result):
|
489 | 516 |
if result is not None:
|
490 |
- envelope = Envelope('result', result)
|
|
517 |
+ envelope = _Envelope('result', result)
|
|
491 | 518 |
self._queue.put(envelope)
|
492 | 519 |
|
493 | 520 |
# _child_shutdown()
|
... | ... | @@ -524,7 +551,7 @@ class Job(): |
524 | 551 |
if message.message_type == MessageType.LOG:
|
525 | 552 |
return
|
526 | 553 |
|
527 |
- self._queue.put(Envelope('message', message))
|
|
554 |
+ self._queue.put(_Envelope('message', message))
|
|
528 | 555 |
|
529 | 556 |
# _parent_shutdown()
|
530 | 557 |
#
|
... | ... | @@ -588,24 +615,28 @@ class Job(): |
588 | 615 |
if not self._listening:
|
589 | 616 |
return
|
590 | 617 |
|
591 |
- if envelope._message_type == 'message':
|
|
618 |
+ if envelope.message_type == 'message':
|
|
592 | 619 |
# Propagate received messages from children
|
593 | 620 |
# back through the context.
|
594 |
- self._scheduler.context.message(envelope._message)
|
|
595 |
- elif envelope._message_type == 'error':
|
|
621 |
+ self._scheduler.context.message(envelope.message)
|
|
622 |
+ elif envelope.message_type == 'error':
|
|
596 | 623 |
# For regression tests only, save the last error domain / reason
|
597 | 624 |
# reported from a child task in the main process, this global state
|
598 | 625 |
# is currently managed in _exceptions.py
|
599 |
- set_last_task_error(envelope._message['domain'],
|
|
600 |
- envelope._message['reason'])
|
|
601 |
- elif envelope._message_type == 'result':
|
|
626 |
+ set_last_task_error(envelope.message['domain'],
|
|
627 |
+ envelope.message['reason'])
|
|
628 |
+ elif envelope.message_type == 'result':
|
|
602 | 629 |
assert self._result is None
|
603 |
- self._result = envelope._message
|
|
604 |
- elif envelope._message_type == 'child_data':
|
|
630 |
+ self._result = envelope.message
|
|
631 |
+ elif envelope.message_type == 'child_data':
|
|
605 | 632 |
# If we retry a job, we assign a new value to this
|
606 |
- self.child_data = envelope._message
|
|
607 |
- else:
|
|
608 |
- raise Exception()
|
|
633 |
+ self.child_data = envelope.message
|
|
634 |
+ |
|
635 |
+ # Try Job subclass specific messages now
|
|
636 |
+ elif not self.handle_message(envelope.message_type,
|
|
637 |
+ envelope.message):
|
|
638 |
+ assert 0, "Unhandled message type '{}': {}" \
|
|
639 |
+ .format(envelope.message_type, envelope.message)
|
|
609 | 640 |
|
610 | 641 |
# _parent_process_queue()
|
611 | 642 |
#
|