Qinusty pushed to branch Qinusty/message-helpers at BuildStream / buildstream
Commits:
-
1f88a1e9
by Jonathan Maw at 2018-08-20T12:36:02Z
-
d9507a9e
by Jonathan Maw at 2018-08-20T12:36:02Z
-
a9b81dc4
by Jonathan Maw at 2018-08-20T12:36:02Z
-
a67906e6
by Jonathan Maw at 2018-08-20T13:06:51Z
-
92e34ccd
by Jonathan Maw at 2018-08-20T16:27:18Z
-
4ef8fc69
by Tristan Maat at 2018-08-20T16:49:04Z
-
fa48eee8
by Tristan Maat at 2018-08-20T16:49:04Z
-
565d3aab
by Tristan Maat at 2018-08-20T16:49:04Z
-
57c75207
by Josh Smith at 2018-08-20T16:49:05Z
-
9dcbb179
by Josh Smith at 2018-08-20T16:49:05Z
18 changed files:
- buildstream/_artifactcache/__init__.py
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_context.py
- buildstream/_frontend/app.py
- buildstream/_pipeline.py
- buildstream/_platform/linux.py
- buildstream/_project.py
- buildstream/_scheduler/jobs/elementjob.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/scheduler.py
- buildstream/_stream.py
- buildstream/plugin.py
- buildstream/utils.py
- + tests/artifactcache/cache_size.py
- tests/testutils/artifactshare.py
Changes:
... | ... | @@ -17,4 +17,4 @@ |
17 | 17 |
# Authors:
|
18 | 18 |
# Tristan Van Berkom <tristan vanberkom codethink co uk>
|
19 | 19 |
|
20 |
-from .artifactcache import ArtifactCache, ArtifactCacheSpec
|
|
20 |
+from .artifactcache import ArtifactCache, ArtifactCacheSpec, CACHE_SIZE_FILE
|
... | ... | @@ -23,11 +23,13 @@ from collections import Mapping, namedtuple |
23 | 23 |
|
24 | 24 |
from ..element_enums import _KeyStrength
|
25 | 25 |
from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
|
26 |
-from .._message import Message, MessageType
|
|
27 | 26 |
from .. import utils
|
28 | 27 |
from .. import _yaml
|
29 | 28 |
|
30 | 29 |
|
30 |
+CACHE_SIZE_FILE = "cache_size"
|
|
31 |
+ |
|
32 |
+ |
|
31 | 33 |
# An ArtifactCacheSpec holds the user configuration for a single remote
|
32 | 34 |
# artifact cache.
|
33 | 35 |
#
|
... | ... | @@ -82,7 +84,6 @@ class ArtifactCache(): |
82 | 84 |
self.extractdir = os.path.join(context.artifactdir, 'extract')
|
83 | 85 |
self.tmpdir = os.path.join(context.artifactdir, 'tmp')
|
84 | 86 |
|
85 |
- self.max_size = context.cache_quota
|
|
86 | 87 |
self.estimated_size = None
|
87 | 88 |
|
88 | 89 |
self.global_remote_specs = []
|
... | ... | @@ -90,6 +91,8 @@ class ArtifactCache(): |
90 | 91 |
|
91 | 92 |
self._local = False
|
92 | 93 |
self.cache_size = None
|
94 |
+ self.cache_quota = None
|
|
95 |
+ self.cache_lower_threshold = None
|
|
93 | 96 |
|
94 | 97 |
os.makedirs(self.extractdir, exist_ok=True)
|
95 | 98 |
os.makedirs(self.tmpdir, exist_ok=True)
|
... | ... | @@ -227,7 +230,7 @@ class ArtifactCache(): |
227 | 230 |
def clean(self):
|
228 | 231 |
artifacts = self.list_artifacts()
|
229 | 232 |
|
230 |
- while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold:
|
|
233 |
+ while self.calculate_cache_size() >= self.cache_quota - self.cache_lower_threshold:
|
|
231 | 234 |
try:
|
232 | 235 |
to_remove = artifacts.pop(0)
|
233 | 236 |
except IndexError:
|
... | ... | @@ -241,7 +244,7 @@ class ArtifactCache(): |
241 | 244 |
"Please increase the cache-quota in {}."
|
242 | 245 |
.format(self.context.config_origin or default_conf))
|
243 | 246 |
|
244 |
- if self.calculate_cache_size() > self.context.cache_quota:
|
|
247 |
+ if self.calculate_cache_size() > self.cache_quota:
|
|
245 | 248 |
raise ArtifactError("Cache too full. Aborting.",
|
246 | 249 |
detail=detail,
|
247 | 250 |
reason="cache-too-full")
|
... | ... | @@ -282,7 +285,11 @@ class ArtifactCache(): |
282 | 285 |
# If we don't currently have an estimate, figure out the real
|
283 | 286 |
# cache size.
|
284 | 287 |
if self.estimated_size is None:
|
285 |
- self.estimated_size = self.calculate_cache_size()
|
|
288 |
+ stored_size = self._read_cache_size()
|
|
289 |
+ if stored_size is not None:
|
|
290 |
+ self.estimated_size = stored_size
|
|
291 |
+ else:
|
|
292 |
+ self.estimated_size = self.calculate_cache_size()
|
|
286 | 293 |
|
287 | 294 |
return self.estimated_size
|
288 | 295 |
|
... | ... | @@ -490,15 +497,6 @@ class ArtifactCache(): |
490 | 497 |
# Local Private Methods #
|
491 | 498 |
################################################
|
492 | 499 |
|
493 |
- # _message()
|
|
494 |
- #
|
|
495 |
- # Local message propagator
|
|
496 |
- #
|
|
497 |
- def _message(self, message_type, message, **kwargs):
|
|
498 |
- args = dict(kwargs)
|
|
499 |
- self.context.message(
|
|
500 |
- Message(None, message_type, message, **args))
|
|
501 |
- |
|
502 | 500 |
# _set_remotes():
|
503 | 501 |
#
|
504 | 502 |
# Set the list of remote caches. If project is None, the global list of
|
... | ... | @@ -522,7 +520,7 @@ class ArtifactCache(): |
522 | 520 |
#
|
523 | 521 |
def _initialize_remotes(self):
|
524 | 522 |
def remote_failed(url, error):
|
525 |
- self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error))
|
|
523 |
+ self.context.warn("Failed to fetch remote refs from {}: {}".format(url, error))
|
|
526 | 524 |
|
527 | 525 |
with self.context.timed_activity("Initializing remote caches", silent_nested=True):
|
528 | 526 |
self.initialize_remotes(on_failure=remote_failed)
|
... | ... | @@ -541,6 +539,7 @@ class ArtifactCache(): |
541 | 539 |
self.estimated_size = self.calculate_cache_size()
|
542 | 540 |
|
543 | 541 |
self.estimated_size += artifact_size
|
542 |
+ self._write_cache_size(self.estimated_size)
|
|
544 | 543 |
|
545 | 544 |
# _set_cache_size()
|
546 | 545 |
#
|
... | ... | @@ -551,6 +550,109 @@ class ArtifactCache(): |
551 | 550 |
def _set_cache_size(self, cache_size):
|
552 | 551 |
self.estimated_size = cache_size
|
553 | 552 |
|
553 |
+ # set_cache_size is called in cleanup, where it may set the cache to None
|
|
554 |
+ if self.estimated_size is not None:
|
|
555 |
+ self._write_cache_size(self.estimated_size)
|
|
556 |
+ |
|
557 |
+ # _write_cache_size()
|
|
558 |
+ #
|
|
559 |
+ # Writes the given size of the artifact to the cache's size file
|
|
560 |
+ #
|
|
561 |
+ def _write_cache_size(self, size):
|
|
562 |
+ assert isinstance(size, int)
|
|
563 |
+ size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
|
|
564 |
+ with open(size_file_path, "w") as f:
|
|
565 |
+ f.write(str(size))
|
|
566 |
+ |
|
567 |
+ # _read_cache_size()
|
|
568 |
+ #
|
|
569 |
+ # Reads and returns the size of the artifact cache that's stored in the
|
|
570 |
+ # cache's size file
|
|
571 |
+ #
|
|
572 |
+ def _read_cache_size(self):
|
|
573 |
+ size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
|
|
574 |
+ |
|
575 |
+ if not os.path.exists(size_file_path):
|
|
576 |
+ return None
|
|
577 |
+ |
|
578 |
+ with open(size_file_path, "r") as f:
|
|
579 |
+ size = f.read()
|
|
580 |
+ |
|
581 |
+ try:
|
|
582 |
+ num_size = int(size)
|
|
583 |
+ except ValueError as e:
|
|
584 |
+ raise ArtifactError("Size '{}' parsed from '{}' was not an integer".format(
|
|
585 |
+ size, size_file_path)) from e
|
|
586 |
+ |
|
587 |
+ return num_size
|
|
588 |
+ |
|
589 |
+ # _calculate_cache_quota()
|
|
590 |
+ #
|
|
591 |
+ # Calculates and sets the cache quota and lower threshold based on the
|
|
592 |
+ # quota set in Context.
|
|
593 |
+ # It checks that the quota is both a valid _expression_, and that there is
|
|
594 |
+ # enough disk space to satisfy that quota
|
|
595 |
+ #
|
|
596 |
+ def _calculate_cache_quota(self):
|
|
597 |
+ # Headroom intended to give BuildStream a bit of leeway.
|
|
598 |
+ # This acts as the minimum size of cache_quota and also
|
|
599 |
+ # is taken from the user requested cache_quota.
|
|
600 |
+ #
|
|
601 |
+ if 'BST_TEST_SUITE' in os.environ:
|
|
602 |
+ headroom = 0
|
|
603 |
+ else:
|
|
604 |
+ headroom = 2e9
|
|
605 |
+ |
|
606 |
+ artifactdir_volume = self.context.artifactdir
|
|
607 |
+ while not os.path.exists(artifactdir_volume):
|
|
608 |
+ artifactdir_volume = os.path.dirname(artifactdir_volume)
|
|
609 |
+ |
|
610 |
+ try:
|
|
611 |
+ cache_quota = utils._parse_size(self.context.config_cache_quota, artifactdir_volume)
|
|
612 |
+ except utils.UtilError as e:
|
|
613 |
+ raise LoadError(LoadErrorReason.INVALID_DATA,
|
|
614 |
+ "{}\nPlease specify the value in bytes or as a % of full disk space.\n"
|
|
615 |
+ "\nValid values are, for example: 800M 10G 1T 50%\n"
|
|
616 |
+ .format(str(e))) from e
|
|
617 |
+ |
|
618 |
+ stat = os.statvfs(artifactdir_volume)
|
|
619 |
+ available_space = (stat.f_bsize * stat.f_bavail)
|
|
620 |
+ |
|
621 |
+ cache_size = self.get_approximate_cache_size()
|
|
622 |
+ |
|
623 |
+ # Ensure system has enough storage for the cache_quota
|
|
624 |
+ #
|
|
625 |
+ # If cache_quota is none, set it to the maximum it could possibly be.
|
|
626 |
+ #
|
|
627 |
+ # Also check that cache_quota is atleast as large as our headroom.
|
|
628 |
+ #
|
|
629 |
+ if cache_quota is None: # Infinity, set to max system storage
|
|
630 |
+ cache_quota = cache_size + available_space
|
|
631 |
+ if cache_quota < headroom: # Check minimum
|
|
632 |
+ raise LoadError(LoadErrorReason.INVALID_DATA,
|
|
633 |
+ "Invalid cache quota ({}): ".format(utils._pretty_size(cache_quota)) +
|
|
634 |
+ "BuildStream requires a minimum cache quota of 2G.")
|
|
635 |
+ elif cache_quota > cache_size + available_space: # Check maximum
|
|
636 |
+ raise LoadError(LoadErrorReason.INVALID_DATA,
|
|
637 |
+ ("Your system does not have enough available " +
|
|
638 |
+ "space to support the cache quota specified.\n" +
|
|
639 |
+ "You currently have:\n" +
|
|
640 |
+ "- {used} of cache in use at {local_cache_path}\n" +
|
|
641 |
+ "- {available} of available system storage").format(
|
|
642 |
+ used=utils._pretty_size(cache_size),
|
|
643 |
+ local_cache_path=self.context.artifactdir,
|
|
644 |
+ available=utils._pretty_size(available_space)))
|
|
645 |
+ |
|
646 |
+ # Place a slight headroom (2e9 (2GB) on the cache_quota) into
|
|
647 |
+ # cache_quota to try and avoid exceptions.
|
|
648 |
+ #
|
|
649 |
+ # Of course, we might still end up running out during a build
|
|
650 |
+ # if we end up writing more than 2G, but hey, this stuff is
|
|
651 |
+ # already really fuzzy.
|
|
652 |
+ #
|
|
653 |
+ self.cache_quota = cache_quota - headroom
|
|
654 |
+ self.cache_lower_threshold = self.cache_quota / 2
|
|
655 |
+ |
|
554 | 656 |
|
555 | 657 |
# _configured_remote_artifact_cache_specs():
|
556 | 658 |
#
|
... | ... | @@ -34,7 +34,6 @@ from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc |
34 | 34 |
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
35 | 35 |
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
36 | 36 |
|
37 |
-from .._message import MessageType, Message
|
|
38 | 37 |
from .. import _signals, utils
|
39 | 38 |
from .._exceptions import ArtifactError
|
40 | 39 |
|
... | ... | @@ -61,6 +60,8 @@ class CASCache(ArtifactCache): |
61 | 60 |
os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
|
62 | 61 |
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
|
63 | 62 |
|
63 |
+ self._calculate_cache_quota()
|
|
64 |
+ |
|
64 | 65 |
self._enable_push = enable_push
|
65 | 66 |
|
66 | 67 |
# Per-project list of _CASRemote instances.
|
... | ... | @@ -330,7 +331,7 @@ class CASCache(ArtifactCache): |
330 | 331 |
request.write_offset = offset
|
331 | 332 |
# max. 64 kB chunks
|
332 | 333 |
request.data = f.read(chunk_size)
|
333 |
- request.resource_name = resource_name
|
|
334 |
+ request.resource_name = resource_name # pylint: disable=cell-var-from-loop
|
|
334 | 335 |
request.finish_write = remaining <= 0
|
335 | 336 |
yield request
|
336 | 337 |
offset += chunk_size
|
... | ... | @@ -350,12 +351,10 @@ class CASCache(ArtifactCache): |
350 | 351 |
raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
351 | 352 |
|
352 | 353 |
if skipped_remote:
|
353 |
- self.context.message(Message(
|
|
354 |
- None,
|
|
355 |
- MessageType.SKIPPED,
|
|
354 |
+ self.context.skipped(
|
|
356 | 355 |
"Remote ({}) already has {} cached".format(
|
357 | 356 |
remote.spec.url, element._get_brief_display_key())
|
358 |
- ))
|
|
357 |
+ )
|
|
359 | 358 |
return pushed
|
360 | 359 |
|
361 | 360 |
################################################
|
... | ... | @@ -19,6 +19,7 @@ |
19 | 19 |
|
20 | 20 |
import os
|
21 | 21 |
import datetime
|
22 |
+import traceback
|
|
22 | 23 |
from collections import deque, Mapping
|
23 | 24 |
from contextlib import contextmanager
|
24 | 25 |
from . import utils
|
... | ... | @@ -26,6 +27,7 @@ from . import _cachekey |
26 | 27 |
from . import _signals
|
27 | 28 |
from . import _site
|
28 | 29 |
from . import _yaml
|
30 |
+from .plugin import Plugin
|
|
29 | 31 |
from ._exceptions import LoadError, LoadErrorReason, BstError
|
30 | 32 |
from ._message import Message, MessageType
|
31 | 33 |
from ._profile import Topics, profile_start, profile_end
|
... | ... | @@ -64,12 +66,6 @@ class Context(): |
64 | 66 |
# The locations from which to push and pull prebuilt artifacts
|
65 | 67 |
self.artifact_cache_specs = []
|
66 | 68 |
|
67 |
- # The artifact cache quota
|
|
68 |
- self.cache_quota = None
|
|
69 |
- |
|
70 |
- # The lower threshold to which we aim to reduce the cache size
|
|
71 |
- self.cache_lower_threshold = None
|
|
72 |
- |
|
73 | 69 |
# The directory to store build logs
|
74 | 70 |
self.logdir = None
|
75 | 71 |
|
... | ... | @@ -124,6 +120,8 @@ class Context(): |
124 | 120 |
self._workspaces = None
|
125 | 121 |
self._log_handle = None
|
126 | 122 |
self._log_filename = None
|
123 |
+ self.config_cache_quota = 'infinity'
|
|
124 |
+ self.artifactdir_volume = None
|
|
127 | 125 |
|
128 | 126 |
# load()
|
129 | 127 |
#
|
... | ... | @@ -183,71 +181,7 @@ class Context(): |
183 | 181 |
cache = _yaml.node_get(defaults, Mapping, 'cache')
|
184 | 182 |
_yaml.node_validate(cache, ['quota'])
|
185 | 183 |
|
186 |
- artifactdir_volume = self.artifactdir
|
|
187 |
- while not os.path.exists(artifactdir_volume):
|
|
188 |
- artifactdir_volume = os.path.dirname(artifactdir_volume)
|
|
189 |
- |
|
190 |
- # We read and parse the cache quota as specified by the user
|
|
191 |
- cache_quota = _yaml.node_get(cache, str, 'quota', default_value='infinity')
|
|
192 |
- try:
|
|
193 |
- cache_quota = utils._parse_size(cache_quota, artifactdir_volume)
|
|
194 |
- except utils.UtilError as e:
|
|
195 |
- raise LoadError(LoadErrorReason.INVALID_DATA,
|
|
196 |
- "{}\nPlease specify the value in bytes or as a % of full disk space.\n"
|
|
197 |
- "\nValid values are, for example: 800M 10G 1T 50%\n"
|
|
198 |
- .format(str(e))) from e
|
|
199 |
- |
|
200 |
- # Headroom intended to give BuildStream a bit of leeway.
|
|
201 |
- # This acts as the minimum size of cache_quota and also
|
|
202 |
- # is taken from the user requested cache_quota.
|
|
203 |
- #
|
|
204 |
- if 'BST_TEST_SUITE' in os.environ:
|
|
205 |
- headroom = 0
|
|
206 |
- else:
|
|
207 |
- headroom = 2e9
|
|
208 |
- |
|
209 |
- stat = os.statvfs(artifactdir_volume)
|
|
210 |
- available_space = (stat.f_bsize * stat.f_bavail)
|
|
211 |
- |
|
212 |
- # Again, the artifact directory may not yet have been created yet
|
|
213 |
- #
|
|
214 |
- if not os.path.exists(self.artifactdir):
|
|
215 |
- cache_size = 0
|
|
216 |
- else:
|
|
217 |
- cache_size = utils._get_dir_size(self.artifactdir)
|
|
218 |
- |
|
219 |
- # Ensure system has enough storage for the cache_quota
|
|
220 |
- #
|
|
221 |
- # If cache_quota is none, set it to the maximum it could possibly be.
|
|
222 |
- #
|
|
223 |
- # Also check that cache_quota is atleast as large as our headroom.
|
|
224 |
- #
|
|
225 |
- if cache_quota is None: # Infinity, set to max system storage
|
|
226 |
- cache_quota = cache_size + available_space
|
|
227 |
- if cache_quota < headroom: # Check minimum
|
|
228 |
- raise LoadError(LoadErrorReason.INVALID_DATA,
|
|
229 |
- "Invalid cache quota ({}): ".format(utils._pretty_size(cache_quota)) +
|
|
230 |
- "BuildStream requires a minimum cache quota of 2G.")
|
|
231 |
- elif cache_quota > cache_size + available_space: # Check maximum
|
|
232 |
- raise LoadError(LoadErrorReason.INVALID_DATA,
|
|
233 |
- ("Your system does not have enough available " +
|
|
234 |
- "space to support the cache quota specified.\n" +
|
|
235 |
- "You currently have:\n" +
|
|
236 |
- "- {used} of cache in use at {local_cache_path}\n" +
|
|
237 |
- "- {available} of available system storage").format(
|
|
238 |
- used=utils._pretty_size(cache_size),
|
|
239 |
- local_cache_path=self.artifactdir,
|
|
240 |
- available=utils._pretty_size(available_space)))
|
|
241 |
- |
|
242 |
- # Place a slight headroom (2e9 (2GB) on the cache_quota) into
|
|
243 |
- # cache_quota to try and avoid exceptions.
|
|
244 |
- #
|
|
245 |
- # Of course, we might still end up running out during a build
|
|
246 |
- # if we end up writing more than 2G, but hey, this stuff is
|
|
247 |
- # already really fuzzy.
|
|
248 |
- #
|
|
249 |
- self.cache_quota = cache_quota - headroom
|
|
250 |
- self.cache_lower_threshold = self.cache_quota / 2
|
|
184 |
+ self.config_cache_quota = _yaml.node_get(cache, str, 'quota', default_value='infinity')
|
|
251 | 185 |
|
252 | 186 |
# Load artifact share configuration
|
253 | 187 |
self.artifact_cache_specs = ArtifactCache.specs_from_config_node(defaults)
|
... | ... | @@ -385,7 +319,7 @@ class Context(): |
385 | 319 |
# the context.
|
386 | 320 |
#
|
387 | 321 |
# The message handler should have the same signature as
|
388 |
- # the message() method
|
|
322 |
+ # the _send_message() method
|
|
389 | 323 |
def set_message_handler(self, handler):
|
390 | 324 |
self._message_handler = handler
|
391 | 325 |
|
... | ... | @@ -400,16 +334,15 @@ class Context(): |
400 | 334 |
return True
|
401 | 335 |
return False
|
402 | 336 |
|
403 |
- # message():
|
|
337 |
+ # _send_message():
|
|
404 | 338 |
#
|
405 |
- # Proxies a message back to the caller, this is the central
|
|
339 |
+ # Proxies a message back through the message handler, this is the central
|
|
406 | 340 |
# point through which all messages pass.
|
407 | 341 |
#
|
408 | 342 |
# Args:
|
409 | 343 |
# message: A Message object
|
410 | 344 |
#
|
411 |
- def message(self, message):
|
|
412 |
- |
|
345 |
+ def _send_message(self, message):
|
|
413 | 346 |
# Tag message only once
|
414 | 347 |
if message.depth is None:
|
415 | 348 |
message.depth = len(list(self._message_depth))
|
... | ... | @@ -423,7 +356,72 @@ class Context(): |
423 | 356 |
assert self._message_handler
|
424 | 357 |
|
425 | 358 |
self._message_handler(message, context=self)
|
426 |
- return
|
|
359 |
+ |
|
360 |
+ # _message():
|
|
361 |
+ #
|
|
362 |
+ # The global message API. Any message-sending functions should go
|
|
363 |
+ # through here. This will call `_send_message` to deliver the
|
|
364 |
+ # final message.
|
|
365 |
+ #
|
|
366 |
+ def _message(self, text, *, plugin=None, msg_type=None, **kwargs):
|
|
367 |
+ assert msg_type is not None
|
|
368 |
+ |
|
369 |
+ if isinstance(plugin, Plugin):
|
|
370 |
+ plugin_id = plugin._get_unique_id()
|
|
371 |
+ else:
|
|
372 |
+ plugin_id = plugin
|
|
373 |
+ |
|
374 |
+ self._send_message(Message(plugin_id, msg_type, str(text), **kwargs))
|
|
375 |
+ |
|
376 |
+ # skipped():
|
|
377 |
+ #
|
|
378 |
+ # Produce and send a skipped message through the context.
|
|
379 |
+ #
|
|
380 |
+ def skipped(self, text, **kwargs):
|
|
381 |
+ self._message(text, msg_type=MessageType.SKIPPED, **kwargs)
|
|
382 |
+ |
|
383 |
+ # debug():
|
|
384 |
+ #
|
|
385 |
+ # Produce and send a debug message through the context.
|
|
386 |
+ #
|
|
387 |
+ def debug(self, text, **kwargs):
|
|
388 |
+ if self.log_debug:
|
|
389 |
+ self._message(text, msg_type=MessageType.DEBUG, **kwargs)
|
|
390 |
+ |
|
391 |
+ # status():
|
|
392 |
+ #
|
|
393 |
+ # Produce and send a status message through the context.
|
|
394 |
+ #
|
|
395 |
+ def status(self, text, **kwargs):
|
|
396 |
+ self._message(text, msg_type=MessageType.STATUS, **kwargs)
|
|
397 |
+ |
|
398 |
+ # info():
|
|
399 |
+ #
|
|
400 |
+ # Produce and send a info message through the context.
|
|
401 |
+ #
|
|
402 |
+ def info(self, text, **kwargs):
|
|
403 |
+ self._message(text, msg_type=MessageType.INFO, **kwargs)
|
|
404 |
+ |
|
405 |
+ # warn():
|
|
406 |
+ #
|
|
407 |
+ # Produce and send a warning message through the context.
|
|
408 |
+ #
|
|
409 |
+ def warn(self, text, **kwargs):
|
|
410 |
+ self._message(text, msg_type=MessageType.WARN, **kwargs)
|
|
411 |
+ |
|
412 |
+ # error():
|
|
413 |
+ #
|
|
414 |
+ # Produce and send a error message through the context.
|
|
415 |
+ #
|
|
416 |
+ def error(self, text, **kwargs):
|
|
417 |
+ self._message(text, msg_type=MessageType.ERROR, **kwargs)
|
|
418 |
+ |
|
419 |
+ # log():
|
|
420 |
+ #
|
|
421 |
+ # Produce and send a log message through the context.
|
|
422 |
+ #
|
|
423 |
+ def log(self, text, **kwargs):
|
|
424 |
+ self._message(text, msg_type=MessageType.LOG, **kwargs)
|
|
427 | 425 |
|
428 | 426 |
# silence()
|
429 | 427 |
#
|
... | ... | @@ -440,6 +438,14 @@ class Context(): |
440 | 438 |
finally:
|
441 | 439 |
self._pop_message_depth()
|
442 | 440 |
|
441 |
+ @contextmanager
|
|
442 |
+ def report_unhandled_exceptions(self, brief="An unhandled exception occured", *, unique_id=None, **kwargs):
|
|
443 |
+ try:
|
|
444 |
+ yield
|
|
445 |
+ except Exception: # pylint: disable=broad-except
|
|
446 |
+ self._message(brief, plugin=unique_id, detail=traceback.format_exc(),
|
|
447 |
+ msg_type=MessageType.BUG, **kwargs)
|
|
448 |
+ |
|
443 | 449 |
# timed_activity()
|
444 | 450 |
#
|
445 | 451 |
# Context manager for performing timed activities and logging those
|
... | ... | @@ -469,8 +475,8 @@ class Context(): |
469 | 475 |
with _signals.suspendable(stop_time, resume_time):
|
470 | 476 |
try:
|
471 | 477 |
# Push activity depth for status messages
|
472 |
- message = Message(unique_id, MessageType.START, activity_name, detail=detail)
|
|
473 |
- self.message(message)
|
|
478 |
+ self._message(activity_name, detail=detail, plugin=unique_id,
|
|
479 |
+ msg_type=MessageType.START)
|
|
474 | 480 |
self._push_message_depth(silent_nested)
|
475 | 481 |
yield
|
476 | 482 |
|
... | ... | @@ -478,15 +484,16 @@ class Context(): |
478 | 484 |
# Note the failure in status messages and reraise, the scheduler
|
479 | 485 |
# expects an error when there is an error.
|
480 | 486 |
elapsed = datetime.datetime.now() - starttime
|
481 |
- message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
|
|
482 | 487 |
self._pop_message_depth()
|
483 |
- self.message(message)
|
|
488 |
+ self._message(activity_name, detail=detail, elapsed=elapsed, plugin=unique_id,
|
|
489 |
+ msg_type=MessageType.FAIL)
|
|
484 | 490 |
raise
|
485 | 491 |
|
486 | 492 |
elapsed = datetime.datetime.now() - starttime
|
487 |
- message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
|
|
488 | 493 |
self._pop_message_depth()
|
489 |
- self.message(message)
|
|
494 |
+ self._message(activity_name, detail=detail,
|
|
495 |
+ elapsed=elapsed, plugin=unique_id,
|
|
496 |
+ msg_type=MessageType.SUCCESS)
|
|
490 | 497 |
|
491 | 498 |
# recorded_messages()
|
492 | 499 |
#
|
... | ... | @@ -37,6 +37,7 @@ from .._platform import Platform |
37 | 37 |
from .._project import Project
|
38 | 38 |
from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
|
39 | 39 |
from .._message import Message, MessageType, unconditional_messages
|
40 |
+from .. import utils
|
|
40 | 41 |
from .._stream import Stream
|
41 | 42 |
from .._versions import BST_FORMAT_VERSION
|
42 | 43 |
from .. import _yaml
|
... | ... | @@ -198,8 +199,10 @@ class App(): |
198 | 199 |
option_value = self._main_options.get(cli_option)
|
199 | 200 |
if option_value is not None:
|
200 | 201 |
setattr(self.context, context_attr, option_value)
|
201 |
- |
|
202 |
- Platform.create_instance(self.context)
|
|
202 |
+ try:
|
|
203 |
+ Platform.create_instance(self.context)
|
|
204 |
+ except BstError as e:
|
|
205 |
+ self._error_exit(e, "Error instantiating platform")
|
|
203 | 206 |
|
204 | 207 |
# Create the logger right before setting the message handler
|
205 | 208 |
self.logger = LogLine(self.context,
|
... | ... | @@ -250,48 +253,38 @@ class App(): |
250 | 253 |
self._content_profile, self._format_profile,
|
251 | 254 |
self._success_profile, self._error_profile,
|
252 | 255 |
self.stream, colors=self.colors)
|
253 |
- |
|
254 |
- # Mark the beginning of the session
|
|
255 |
- if session_name:
|
|
256 |
- self._message(MessageType.START, session_name)
|
|
257 |
- |
|
258 |
- # Run the body of the session here, once everything is loaded
|
|
256 |
+ last_err = None
|
|
259 | 257 |
try:
|
260 |
- yield
|
|
261 |
- except BstError as e:
|
|
262 |
- |
|
263 |
- # Print a nice summary if this is a session
|
|
264 |
- if session_name:
|
|
265 |
- elapsed = self.stream.elapsed_time
|
|
266 |
- |
|
267 |
- if isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member
|
|
268 |
- self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
|
|
269 |
- else:
|
|
270 |
- self._message(MessageType.FAIL, session_name, elapsed=elapsed)
|
|
271 |
- |
|
272 |
- # Notify session failure
|
|
273 |
- self._notify("{} failed".format(session_name), "{}".format(e))
|
|
274 |
- |
|
275 |
- if self._started:
|
|
276 |
- self._print_summary()
|
|
277 |
- |
|
278 |
- # Exit with the error
|
|
279 |
- self._error_exit(e)
|
|
280 |
- except RecursionError:
|
|
281 |
- click.echo("RecursionError: Depency depth is too large. Maximum recursion depth exceeded.",
|
|
282 |
- err=True)
|
|
283 |
- sys.exit(-1)
|
|
284 |
- |
|
285 |
- else:
|
|
286 |
- # No exceptions occurred, print session time and summary
|
|
287 |
- if session_name:
|
|
288 |
- self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
|
|
289 |
- if self._started:
|
|
290 |
- self._print_summary()
|
|
291 |
- |
|
258 |
+ with (self.context.timed_activity(session_name) if session_name else utils._none_context()):
|
|
259 |
+ # Run the body of the session here, once everything is loaded
|
|
260 |
+ try:
|
|
261 |
+ yield
|
|
262 |
+ except BstError as e:
|
|
263 |
+ last_err = e
|
|
264 |
+ # Check for Stream error on termination
|
|
265 |
+ if session_name and isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member
|
|
266 |
+ elapsed = self.stream.elapsed_time
|
|
267 |
+ self.context.warn(session_name + ' Terminated', elapsed=elapsed)
|
|
268 |
+ else:
|
|
269 |
+ raise # Raise to timed_activity for failure.
|
|
270 |
+ except RecursionError:
|
|
271 |
+ click.echo("RecursionError: Depency depth is too large. Maximum recursion depth exceeded.",
|
|
272 |
+ err=True)
|
|
273 |
+ sys.exit(-1)
|
|
274 |
+ except BstError as e: # Catch from timed_activity()
|
|
275 |
+ # Notify session failure
|
|
276 |
+ self._notify("{} failed".format(session_name), "{}".format(e))
|
|
277 |
+ |
|
278 |
+ # No exceptions occurred, print session time and summary
|
|
279 |
+ if session_name and self._started:
|
|
280 |
+ self._print_summary()
|
|
281 |
+ if not last_err:
|
|
292 | 282 |
# Notify session success
|
293 | 283 |
self._notify("{} succeeded".format(session_name), "")
|
294 | 284 |
|
285 |
+ if last_err:
|
|
286 |
+ self._error_exit(last_err)
|
|
287 |
+ |
|
295 | 288 |
# init_project()
|
296 | 289 |
#
|
297 | 290 |
# Initialize a new BuildStream project, either with the explicitly passed options,
|
... | ... | @@ -431,21 +424,13 @@ class App(): |
431 | 424 |
if self.interactive:
|
432 | 425 |
self.notify(title, text)
|
433 | 426 |
|
434 |
- # Local message propagator
|
|
435 |
- #
|
|
436 |
- def _message(self, message_type, message, **kwargs):
|
|
437 |
- args = dict(kwargs)
|
|
438 |
- self.context.message(
|
|
439 |
- Message(None, message_type, message, **args))
|
|
440 |
- |
|
441 | 427 |
# Exception handler
|
442 | 428 |
#
|
443 | 429 |
def _global_exception_handler(self, etype, value, tb):
|
444 | 430 |
|
445 | 431 |
# Print the regular BUG message
|
446 | 432 |
formatted = "".join(traceback.format_exception(etype, value, tb))
|
447 |
- self._message(MessageType.BUG, str(value),
|
|
448 |
- detail=formatted)
|
|
433 |
+ self.context._message(str(value), detail=formatted, msg_type=MessageType.BUG)
|
|
449 | 434 |
|
450 | 435 |
# If the scheduler has started, try to terminate all jobs gracefully,
|
451 | 436 |
# otherwise exit immediately.
|
... | ... | @@ -24,7 +24,6 @@ import itertools |
24 | 24 |
from operator import itemgetter
|
25 | 25 |
|
26 | 26 |
from ._exceptions import PipelineError
|
27 |
-from ._message import Message, MessageType
|
|
28 | 27 |
from ._profile import Topics, profile_start, profile_end
|
29 | 28 |
from . import Scope, Consistency
|
30 | 29 |
from ._project import ProjectRefStorage
|
... | ... | @@ -201,8 +200,8 @@ class Pipeline(): |
201 | 200 |
for t in targets:
|
202 | 201 |
new_elm = t._get_source_element()
|
203 | 202 |
if new_elm != t and not silent:
|
204 |
- self._message(MessageType.INFO, "Element '{}' redirected to '{}'"
|
|
205 |
- .format(t.name, new_elm.name))
|
|
203 |
+ self._context.info("Element '{}' redirected to '{}'"
|
|
204 |
+ .format(t.name, new_elm.name))
|
|
206 | 205 |
if new_elm not in elements:
|
207 | 206 |
elements.append(new_elm)
|
208 | 207 |
elif mode == PipelineSelection.PLAN:
|
... | ... | @@ -419,15 +418,6 @@ class Pipeline(): |
419 | 418 |
|
420 | 419 |
raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
|
421 | 420 |
|
422 |
- # _message()
|
|
423 |
- #
|
|
424 |
- # Local message propagator
|
|
425 |
- #
|
|
426 |
- def _message(self, message_type, message, **kwargs):
|
|
427 |
- args = dict(kwargs)
|
|
428 |
- self._context.message(
|
|
429 |
- Message(None, message_type, message, **args))
|
|
430 |
- |
|
431 | 421 |
|
432 | 422 |
# _Planner()
|
433 | 423 |
#
|
... | ... | @@ -22,7 +22,6 @@ import subprocess |
22 | 22 |
from .. import _site
|
23 | 23 |
from .. import utils
|
24 | 24 |
from .._artifactcache.cascache import CASCache
|
25 |
-from .._message import Message, MessageType
|
|
26 | 25 |
from ..sandbox import SandboxBwrap
|
27 | 26 |
|
28 | 27 |
from . import Platform
|
... | ... | @@ -75,9 +74,9 @@ class Linux(Platform): |
75 | 74 |
return True
|
76 | 75 |
|
77 | 76 |
else:
|
78 |
- context.message(
|
|
79 |
- Message(None, MessageType.WARN,
|
|
80 |
- "Unable to create user namespaces with bubblewrap, resorting to fallback",
|
|
81 |
- detail="Some builds may not function due to lack of uid / gid 0, " +
|
|
82 |
- "artifacts created will not be trusted for push purposes."))
|
|
77 |
+ context.warn(
|
|
78 |
+ "Unable to create user namespaces with bubblewrap, resorting to fallback",
|
|
79 |
+ detail="Some builds may not function due to lack of uid / gid 0, " +
|
|
80 |
+ "artifacts created will not be trusted for push purposes."
|
|
81 |
+ )
|
|
83 | 82 |
return False
|
... | ... | @@ -36,7 +36,6 @@ from ._projectrefs import ProjectRefs, ProjectRefStorage |
36 | 36 |
from ._versions import BST_FORMAT_VERSION
|
37 | 37 |
from ._loader import Loader
|
38 | 38 |
from .element import Element
|
39 |
-from ._message import Message, MessageType
|
|
40 | 39 |
from ._includes import Includes
|
41 | 40 |
|
42 | 41 |
|
... | ... | @@ -334,8 +333,7 @@ class Project(): |
334 | 333 |
for source, ref in redundant_refs
|
335 | 334 |
]
|
336 | 335 |
detail += "\n".join(lines)
|
337 |
- self._context.message(
|
|
338 |
- Message(None, MessageType.WARN, "Ignoring redundant source references", detail=detail))
|
|
336 |
+ self._context.warn("Ignoring redundant source references", detail=detail)
|
|
339 | 337 |
|
340 | 338 |
return elements
|
341 | 339 |
|
... | ... | @@ -492,13 +490,9 @@ class Project(): |
492 | 490 |
|
493 | 491 |
# Deprecation check
|
494 | 492 |
if fail_on_overlap is not None:
|
495 |
- self._context.message(
|
|
496 |
- Message(
|
|
497 |
- None,
|
|
498 |
- MessageType.WARN,
|
|
499 |
- "Use of fail-on-overlap within project.conf " +
|
|
500 |
- "is deprecated. Consider using fatal-warnings instead."
|
|
501 |
- )
|
|
493 |
+ self._context.warn(
|
|
494 |
+ "Use of fail-on-overlap within project.conf " +
|
|
495 |
+ "is deprecated. Consider using fatal-warnings instead."
|
|
502 | 496 |
)
|
503 | 497 |
|
504 | 498 |
# Load project.refs if it exists, this may be ignored.
|
... | ... | @@ -18,8 +18,6 @@ |
18 | 18 |
#
|
19 | 19 |
from ruamel import yaml
|
20 | 20 |
|
21 |
-from ..._message import Message, MessageType
|
|
22 |
- |
|
23 | 21 |
from .job import Job
|
24 | 22 |
|
25 | 23 |
|
... | ... | @@ -86,9 +84,8 @@ class ElementJob(Job): |
86 | 84 |
# This should probably be omitted for non-build tasks but it's harmless here
|
87 | 85 |
elt_env = self._element.get_environment()
|
88 | 86 |
env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
|
89 |
- self.message(MessageType.LOG,
|
|
90 |
- "Build environment for element {}".format(self._element.name),
|
|
91 |
- detail=env_dump)
|
|
87 |
+ self._log("Build environment for element {}".format(self._element.name),
|
|
88 |
+ detail=env_dump, plugin=self.element, scheduler=True)
|
|
92 | 89 |
|
93 | 90 |
# Run the action
|
94 | 91 |
return self._action_cb(self._element)
|
... | ... | @@ -96,15 +93,6 @@ class ElementJob(Job): |
96 | 93 |
def parent_complete(self, success, result):
|
97 | 94 |
self._complete_cb(self, self._element, success, self._result)
|
98 | 95 |
|
99 |
- def message(self, message_type, message, **kwargs):
|
|
100 |
- args = dict(kwargs)
|
|
101 |
- args['scheduler'] = True
|
|
102 |
- self._scheduler.context.message(
|
|
103 |
- Message(self._element._get_unique_id(),
|
|
104 |
- message_type,
|
|
105 |
- message,
|
|
106 |
- **args))
|
|
107 |
- |
|
108 | 96 |
def child_process_data(self):
|
109 | 97 |
data = {}
|
110 | 98 |
|
... | ... | @@ -119,3 +107,10 @@ class ElementJob(Job): |
119 | 107 |
data['cache_size'] = cache_size
|
120 | 108 |
|
121 | 109 |
return data
|
110 |
+ |
|
111 |
+ # _fail()
|
|
112 |
+ #
|
|
113 |
+ # Override _fail to set scheduler kwarg to true.
|
|
114 |
+ #
|
|
115 |
+ def _fail(self, text, **kwargs):
|
|
116 |
+ super()._fail(text, scheduler=True, **kwargs)
|
... | ... | @@ -32,7 +32,7 @@ import psutil |
32 | 32 |
|
33 | 33 |
# BuildStream toplevel imports
|
34 | 34 |
from ..._exceptions import ImplError, BstError, set_last_task_error
|
35 |
-from ..._message import Message, MessageType, unconditional_messages
|
|
35 |
+from ..._message import MessageType, unconditional_messages
|
|
36 | 36 |
from ... import _signals, utils
|
37 | 37 |
|
38 | 38 |
# Return code values shutdown of job handling child processes
|
... | ... | @@ -109,6 +109,7 @@ class Job(): |
109 | 109 |
# Private members
|
110 | 110 |
#
|
111 | 111 |
self._scheduler = scheduler # The scheduler
|
112 |
+ self._context = scheduler.context # The context, used primarily for UI messaging.
|
|
112 | 113 |
self._queue = multiprocessing.Queue() # A message passing queue
|
113 | 114 |
self._process = None # The Process object
|
114 | 115 |
self._watcher = None # Child process watcher
|
... | ... | @@ -179,7 +180,7 @@ class Job(): |
179 | 180 |
# First resume the job if it's suspended
|
180 | 181 |
self.resume(silent=True)
|
181 | 182 |
|
182 |
- self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
|
|
183 |
+ self._status("{} terminating".format(self.action_name))
|
|
183 | 184 |
|
184 | 185 |
# Make sure there is no garbage on the queue
|
185 | 186 |
self._parent_stop_listening()
|
... | ... | @@ -210,8 +211,8 @@ class Job(): |
210 | 211 |
def kill(self):
|
211 | 212 |
|
212 | 213 |
# Force kill
|
213 |
- self.message(MessageType.WARN,
|
|
214 |
- "{} did not terminate gracefully, killing".format(self.action_name))
|
|
214 |
+ self._warn("{} did not terminate gracefully, killing"
|
|
215 |
+ .format(self.action_name))
|
|
215 | 216 |
|
216 | 217 |
try:
|
217 | 218 |
utils._kill_process_tree(self._process.pid)
|
... | ... | @@ -226,8 +227,7 @@ class Job(): |
226 | 227 |
#
|
227 | 228 |
def suspend(self):
|
228 | 229 |
if not self._suspended:
|
229 |
- self.message(MessageType.STATUS,
|
|
230 |
- "{} suspending".format(self.action_name))
|
|
230 |
+ self._status("{} suspending".format(self.action_name))
|
|
231 | 231 |
|
232 | 232 |
try:
|
233 | 233 |
# Use SIGTSTP so that child processes may handle and propagate
|
... | ... | @@ -251,8 +251,7 @@ class Job(): |
251 | 251 |
def resume(self, silent=False):
|
252 | 252 |
if self._suspended:
|
253 | 253 |
if not silent and not self._scheduler.terminated:
|
254 |
- self.message(MessageType.STATUS,
|
|
255 |
- "{} resuming".format(self.action_name))
|
|
254 |
+ self._status("{} resuming".format(self.action_name))
|
|
256 | 255 |
|
257 | 256 |
os.kill(self._process.pid, signal.SIGCONT)
|
258 | 257 |
self._suspended = False
|
... | ... | @@ -305,21 +304,6 @@ class Job(): |
305 | 304 |
raise ImplError("Job '{kind}' does not implement child_process()"
|
306 | 305 |
.format(kind=type(self).__name__))
|
307 | 306 |
|
308 |
- # message():
|
|
309 |
- #
|
|
310 |
- # Logs a message, this will be logged in the task's logfile and
|
|
311 |
- # conditionally also be sent to the frontend.
|
|
312 |
- #
|
|
313 |
- # Args:
|
|
314 |
- # message_type (MessageType): The type of message to send
|
|
315 |
- # message (str): The message
|
|
316 |
- # kwargs: Remaining Message() constructor arguments
|
|
317 |
- #
|
|
318 |
- def message(self, message_type, message, **kwargs):
|
|
319 |
- args = dict(kwargs)
|
|
320 |
- args['scheduler'] = True
|
|
321 |
- self._scheduler.context.message(Message(None, message_type, message, **args))
|
|
322 |
- |
|
323 | 307 |
# child_process_data()
|
324 | 308 |
#
|
325 | 309 |
# Abstract method to retrieve additional data that should be
|
... | ... | @@ -346,6 +330,32 @@ class Job(): |
346 | 330 |
#
|
347 | 331 |
#######################################################
|
348 | 332 |
|
333 |
+ def _debug(self, text, **kwargs):
|
|
334 |
+ self._context.debug(text, task_id=self._task_id, **kwargs)
|
|
335 |
+ |
|
336 |
+ def _status(self, text, **kwargs):
|
|
337 |
+ self._context.status(text, task_id=self._task_id, **kwargs)
|
|
338 |
+ |
|
339 |
+ def _info(self, text, **kwargs):
|
|
340 |
+ self._context.info(text, task_id=self._task_id, **kwargs)
|
|
341 |
+ |
|
342 |
+ def _warn(self, text, **kwargs):
|
|
343 |
+ self._context.warn(text, task_id=self._task_id, **kwargs)
|
|
344 |
+ |
|
345 |
+ def _error(self, text, **kwargs):
|
|
346 |
+ self._context.error(text, task_id=self._task_id, **kwargs)
|
|
347 |
+ |
|
348 |
+ def _log(self, text, **kwargs):
|
|
349 |
+ self._context.log(text, task_id=self._task_id, **kwargs)
|
|
350 |
+ |
|
351 |
+ # _fail()
|
|
352 |
+ #
|
|
353 |
+ # Only exists for sub classes to override and add kwargs to.
|
|
354 |
+ #
|
|
355 |
+ def _fail(self, text, **kwargs):
|
|
356 |
+ self._context._message(text, task_id=self._task_id,
|
|
357 |
+ msg_type=MessageType.FAIL, **kwargs)
|
|
358 |
+ |
|
349 | 359 |
# _child_action()
|
350 | 360 |
#
|
351 | 361 |
# Perform the action in the child process, this calls the action_cb.
|
... | ... | @@ -372,7 +382,7 @@ class Job(): |
372 | 382 |
# Set the global message handler in this child
|
373 | 383 |
# process to forward messages to the parent process
|
374 | 384 |
self._queue = queue
|
375 |
- self._scheduler.context.set_message_handler(self._child_message_handler)
|
|
385 |
+ self._context.set_message_handler(self._child_message_handler)
|
|
376 | 386 |
|
377 | 387 |
starttime = datetime.datetime.now()
|
378 | 388 |
stopped_time = None
|
... | ... | @@ -389,9 +399,10 @@ class Job(): |
389 | 399 |
# Time, log and and run the action function
|
390 | 400 |
#
|
391 | 401 |
with _signals.suspendable(stop_time, resume_time), \
|
392 |
- self._scheduler.context.recorded_messages(self._logfile) as filename:
|
|
402 |
+ self._context.recorded_messages(self._logfile) as filename:
|
|
393 | 403 |
|
394 |
- self.message(MessageType.START, self.action_name, logfile=filename)
|
|
404 |
+ self._context._message(self.action_name, logfile=filename,
|
|
405 |
+ msg_type=MessageType.START, task_id=self._task_id)
|
|
395 | 406 |
|
396 | 407 |
try:
|
397 | 408 |
# Try the task action
|
... | ... | @@ -401,13 +412,12 @@ class Job(): |
401 | 412 |
self._retry_flag = e.temporary
|
402 | 413 |
|
403 | 414 |
if self._retry_flag and (self._tries <= self._max_retries):
|
404 |
- self.message(MessageType.FAIL,
|
|
405 |
- "Try #{} failed, retrying".format(self._tries),
|
|
406 |
- elapsed=elapsed)
|
|
415 |
+ self._fail("Try #{} failed, retrying".format(self._tries),
|
|
416 |
+ elapsed=elapsed)
|
|
407 | 417 |
else:
|
408 |
- self.message(MessageType.FAIL, str(e),
|
|
409 |
- elapsed=elapsed, detail=e.detail,
|
|
410 |
- logfile=filename, sandbox=e.sandbox)
|
|
418 |
+ self._fail(str(e), elapsed=elapsed,
|
|
419 |
+ detail=e.detail, logfile=filename,
|
|
420 |
+ sandbox=e.sandbox)
|
|
411 | 421 |
|
412 | 422 |
self._queue.put(Envelope('child_data', self.child_process_data()))
|
413 | 423 |
|
... | ... | @@ -427,9 +437,9 @@ class Job(): |
427 | 437 |
elapsed = datetime.datetime.now() - starttime
|
428 | 438 |
detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
|
429 | 439 |
|
430 |
- self.message(MessageType.BUG, self.action_name,
|
|
431 |
- elapsed=elapsed, detail=detail,
|
|
432 |
- logfile=filename)
|
|
440 |
+ self._context._message(self.action_name, elapsed=elapsed,
|
|
441 |
+ detail=detail, logfile=filename,
|
|
442 |
+ task_id=self._task_id, msg_type=MessageType.BUG)
|
|
433 | 443 |
self._child_shutdown(RC_FAIL)
|
434 | 444 |
|
435 | 445 |
else:
|
... | ... | @@ -438,8 +448,10 @@ class Job(): |
438 | 448 |
self._child_send_result(result)
|
439 | 449 |
|
440 | 450 |
elapsed = datetime.datetime.now() - starttime
|
441 |
- self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
|
|
442 |
- logfile=filename)
|
|
451 |
+ self._context._message(self.action_name,
|
|
452 |
+ elapsed=elapsed, logfile=filename,
|
|
453 |
+ msg_type=MessageType.SUCCESS,
|
|
454 |
+ task_id=self._task_id)
|
|
443 | 455 |
|
444 | 456 |
# Shutdown needs to stay outside of the above context manager,
|
445 | 457 |
# make sure we dont try to handle SIGTERM while the process
|
... | ... | @@ -574,7 +586,7 @@ class Job(): |
574 | 586 |
if envelope._message_type == 'message':
|
575 | 587 |
# Propagate received messages from children
|
576 | 588 |
# back through the context.
|
577 |
- self._scheduler.context.message(envelope._message)
|
|
589 |
+ self._context._send_message(envelope._message)
|
|
578 | 590 |
elif envelope._message_type == 'error':
|
579 | 591 |
# For regression tests only, save the last error domain / reason
|
580 | 592 |
# reported from a child task in the main process, this global state
|
... | ... | @@ -50,10 +50,10 @@ class BuildQueue(Queue): |
50 | 50 |
self._tried.add(element)
|
51 | 51 |
_, description, detail = element._get_build_result()
|
52 | 52 |
logfile = element._get_build_log()
|
53 |
- self._message(element, MessageType.FAIL, description,
|
|
54 |
- detail=detail, action_name=self.action_name,
|
|
55 |
- elapsed=timedelta(seconds=0),
|
|
56 |
- logfile=logfile)
|
|
53 |
+ self._context._message(description, msg_type=MessageType.FAIL, plugin=element,
|
|
54 |
+ detail=detail, action_name=self.action_name,
|
|
55 |
+ elapsed=timedelta(seconds=0),
|
|
56 |
+ logfile=logfile)
|
|
57 | 57 |
job = ElementJob(self._scheduler, self.action_name,
|
58 | 58 |
logfile, element=element, queue=self,
|
59 | 59 |
resources=self.resources,
|
... | ... | @@ -97,7 +97,7 @@ class BuildQueue(Queue): |
97 | 97 |
cache = element._get_artifact_cache()
|
98 | 98 |
cache._add_artifact_size(artifact_size)
|
99 | 99 |
|
100 |
- if cache.get_approximate_cache_size() > self._scheduler.context.cache_quota:
|
|
100 |
+ if cache.get_approximate_cache_size() > cache.cache_quota:
|
|
101 | 101 |
self._scheduler._check_cache_size_real()
|
102 | 102 |
|
103 | 103 |
def done(self, job, element, result, success):
|
... | ... | @@ -30,7 +30,6 @@ from ..resources import ResourceType |
30 | 30 |
|
31 | 31 |
# BuildStream toplevel imports
|
32 | 32 |
from ..._exceptions import BstError, set_last_task_error
|
33 |
-from ..._message import Message, MessageType
|
|
34 | 33 |
|
35 | 34 |
|
36 | 35 |
# Queue status for a given element
|
... | ... | @@ -72,6 +71,7 @@ class Queue(): |
72 | 71 |
# Private members
|
73 | 72 |
#
|
74 | 73 |
self._scheduler = scheduler
|
74 |
+ self._context = scheduler.context
|
|
75 | 75 |
self._wait_queue = deque()
|
76 | 76 |
self._done_queue = deque()
|
77 | 77 |
self._max_retries = 0
|
... | ... | @@ -274,17 +274,17 @@ class Queue(): |
274 | 274 |
# Handle any workspace modifications now
|
275 | 275 |
#
|
276 | 276 |
if workspace_dict:
|
277 |
- context = element._get_context()
|
|
278 |
- workspaces = context.get_workspaces()
|
|
277 |
+ workspaces = self._context.get_workspaces()
|
|
279 | 278 |
if workspaces.update_workspace(element._get_full_name(), workspace_dict):
|
280 |
- try:
|
|
281 |
- workspaces.save_config()
|
|
282 |
- except BstError as e:
|
|
283 |
- self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
|
|
284 |
- except Exception as e: # pylint: disable=broad-except
|
|
285 |
- self._message(element, MessageType.BUG,
|
|
286 |
- "Unhandled exception while saving workspaces",
|
|
287 |
- detail=traceback.format_exc())
|
|
279 |
+ unique_id = element._get_unique_id()
|
|
280 |
+ with self._context.report_unhandled_exceptions("Unhandled exception while saving workspaces",
|
|
281 |
+ unique_id=unique_id):
|
|
282 |
+ try:
|
|
283 |
+ workspaces.save_config()
|
|
284 |
+ except BstError as e:
|
|
285 |
+ self._context.error("Error saving workspaces",
|
|
286 |
+ detail=str(e),
|
|
287 |
+ plugin=unique_id)
|
|
288 | 288 |
|
289 | 289 |
# _job_done()
|
290 | 290 |
#
|
... | ... | @@ -304,54 +304,43 @@ class Queue(): |
304 | 304 |
if job.child_data:
|
305 | 305 |
element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
|
306 | 306 |
|
307 |
- # Give the result of the job to the Queue implementor,
|
|
308 |
- # and determine if it should be considered as processed
|
|
309 |
- # or skipped.
|
|
310 |
- try:
|
|
311 |
- processed = self.done(job, element, result, success)
|
|
312 |
- |
|
313 |
- except BstError as e:
|
|
314 |
- |
|
315 |
- # Report error and mark as failed
|
|
316 |
- #
|
|
317 |
- self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
|
|
318 |
- self.failed_elements.append(element)
|
|
319 |
- |
|
320 |
- # Treat this as a task error as it's related to a task
|
|
321 |
- # even though it did not occur in the task context
|
|
322 |
- #
|
|
323 |
- # This just allows us stronger testing capability
|
|
324 |
- #
|
|
325 |
- set_last_task_error(e.domain, e.reason)
|
|
326 |
- |
|
327 |
- except Exception as e: # pylint: disable=broad-except
|
|
328 |
- |
|
329 |
- # Report unhandled exceptions and mark as failed
|
|
330 |
- #
|
|
331 |
- self._message(element, MessageType.BUG,
|
|
332 |
- "Unhandled exception in post processing",
|
|
333 |
- detail=traceback.format_exc())
|
|
334 |
- self.failed_elements.append(element)
|
|
335 |
- else:
|
|
336 |
- |
|
337 |
- # No exception occured, handle the success/failure state in the normal way
|
|
338 |
- #
|
|
339 |
- self._done_queue.append(job)
|
|
340 |
- |
|
341 |
- if success:
|
|
342 |
- if processed:
|
|
343 |
- self.processed_elements.append(element)
|
|
344 |
- else:
|
|
345 |
- self.skipped_elements.append(element)
|
|
346 |
- else:
|
|
307 |
+ with self._context.report_unhandled_exceptions("Unhandled exception in post processing",
|
|
308 |
+ unique_id=element._get_unique_id()):
|
|
309 |
+ # Give the result of the job to the Queue implementor,
|
|
310 |
+ # and determine if it should be considered as processed
|
|
311 |
+ # or skipped.
|
|
312 |
+ try:
|
|
313 |
+ processed = self.done(job, element, result, success)
|
|
314 |
+ except BstError as e:
|
|
315 |
+ # Report error and mark as failed
|
|
316 |
+ #
|
|
317 |
+ self._context.error("Post processing error",
|
|
318 |
+ plugin=element,
|
|
319 |
+ detail=str(e))
|
|
347 | 320 |
self.failed_elements.append(element)
|
348 | 321 |
|
349 |
- # Convenience wrapper for Queue implementations to send
|
|
350 |
- # a message for the element they are processing
|
|
351 |
- def _message(self, element, message_type, brief, **kwargs):
|
|
352 |
- context = element._get_context()
|
|
353 |
- message = Message(element._get_unique_id(), message_type, brief, **kwargs)
|
|
354 |
- context.message(message)
|
|
322 |
+ # Treat this as a task error as it's related to a task
|
|
323 |
+ # even though it did not occur in the task context
|
|
324 |
+ #
|
|
325 |
+ # This just allows us stronger testing capability
|
|
326 |
+ #
|
|
327 |
+ set_last_task_error(e.domain, e.reason)
|
|
328 |
+ except Exception: # pylint: disable=broad-except
|
|
329 |
+ self.failed_elements.append(element)
|
|
330 |
+ # Intentional reraise for report_unhandled_exceptions() to log.
|
|
331 |
+ raise
|
|
332 |
+ else:
|
|
333 |
+ # No exception occured, handle the success/failure state in the normal way
|
|
334 |
+ #
|
|
335 |
+ self._done_queue.append(job)
|
|
336 |
+ |
|
337 |
+ if success:
|
|
338 |
+ if processed:
|
|
339 |
+ self.processed_elements.append(element)
|
|
340 |
+ else:
|
|
341 |
+ self.skipped_elements.append(element)
|
|
342 |
+ else:
|
|
343 |
+ self.failed_elements.append(element)
|
|
355 | 344 |
|
356 | 345 |
def _element_log_path(self, element):
|
357 | 346 |
project = element._get_project()
|
... | ... | @@ -29,6 +29,7 @@ from contextlib import contextmanager |
29 | 29 |
# Local imports
|
30 | 30 |
from .resources import Resources, ResourceType
|
31 | 31 |
from .jobs import CacheSizeJob, CleanupJob
|
32 |
+from .._platform import Platform
|
|
32 | 33 |
|
33 | 34 |
|
34 | 35 |
# A decent return code for Scheduler.run()
|
... | ... | @@ -316,7 +317,8 @@ class Scheduler(): |
316 | 317 |
self._sched()
|
317 | 318 |
|
318 | 319 |
def _run_cleanup(self, cache_size):
|
319 |
- if cache_size and cache_size < self.context.cache_quota:
|
|
320 |
+ platform = Platform.get_platform()
|
|
321 |
+ if cache_size and cache_size < platform.artifactcache.cache_quota:
|
|
320 | 322 |
return
|
321 | 323 |
|
322 | 324 |
job = CleanupJob(self, 'cleanup', 'cleanup',
|
... | ... | @@ -29,7 +29,7 @@ from contextlib import contextmanager |
29 | 29 |
from tempfile import TemporaryDirectory
|
30 | 30 |
|
31 | 31 |
from ._exceptions import StreamError, ImplError, BstError, set_last_task_error
|
32 |
-from ._message import Message, MessageType
|
|
32 |
+from ._message import MessageType
|
|
33 | 33 |
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
|
34 | 34 |
from ._pipeline import Pipeline, PipelineSelection
|
35 | 35 |
from ._platform import Platform
|
... | ... | @@ -512,7 +512,7 @@ class Stream(): |
512 | 512 |
target._open_workspace()
|
513 | 513 |
|
514 | 514 |
workspaces.save_config()
|
515 |
- self._message(MessageType.INFO, "Saved workspace configuration")
|
|
515 |
+ self._context.info("Saved workspace configuration")
|
|
516 | 516 |
|
517 | 517 |
# workspace_close
|
518 | 518 |
#
|
... | ... | @@ -539,7 +539,7 @@ class Stream(): |
539 | 539 |
# Delete the workspace and save the configuration
|
540 | 540 |
workspaces.delete_workspace(element_name)
|
541 | 541 |
workspaces.save_config()
|
542 |
- self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
|
|
542 |
+ self._context.info("Closed workspace for {}".format(element_name))
|
|
543 | 543 |
|
544 | 544 |
# workspace_reset
|
545 | 545 |
#
|
... | ... | @@ -580,8 +580,8 @@ class Stream(): |
580 | 580 |
workspace_path = workspace.get_absolute_path()
|
581 | 581 |
if soft:
|
582 | 582 |
workspace.prepared = False
|
583 |
- self._message(MessageType.INFO, "Reset workspace state for {} at: {}"
|
|
584 |
- .format(element.name, workspace_path))
|
|
583 |
+ self._context.info("Reset workspace state for {} at: {}"
|
|
584 |
+ .format(element.name, workspace.path))
|
|
585 | 585 |
continue
|
586 | 586 |
|
587 | 587 |
with element.timed_activity("Removing workspace directory {}"
|
... | ... | @@ -598,9 +598,8 @@ class Stream(): |
598 | 598 |
with element.timed_activity("Staging sources to {}".format(workspace_path)):
|
599 | 599 |
element._open_workspace()
|
600 | 600 |
|
601 |
- self._message(MessageType.INFO,
|
|
602 |
- "Reset workspace for {} at: {}".format(element.name,
|
|
603 |
- workspace_path))
|
|
601 |
+ self._context.info("Reset workspace for {} at: {}"
|
|
602 |
+ .format(element.name, workspace._path))
|
|
604 | 603 |
|
605 | 604 |
workspaces.save_config()
|
606 | 605 |
|
... | ... | @@ -676,7 +675,7 @@ class Stream(): |
676 | 675 |
# source-bundle only supports one target
|
677 | 676 |
target = self.targets[0]
|
678 | 677 |
|
679 |
- self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
|
|
678 |
+ self._context.info("Bundling sources for target {}".format(target.name))
|
|
680 | 679 |
|
681 | 680 |
# Find the correct filename for the compression algorithm
|
682 | 681 |
tar_location = os.path.join(directory, target.normal_name + ".tar")
|
... | ... | @@ -958,15 +957,6 @@ class Stream(): |
958 | 957 |
|
959 | 958 |
return selected, track_selected
|
960 | 959 |
|
961 |
- # _message()
|
|
962 |
- #
|
|
963 |
- # Local message propagator
|
|
964 |
- #
|
|
965 |
- def _message(self, message_type, message, **kwargs):
|
|
966 |
- args = dict(kwargs)
|
|
967 |
- self._context.message(
|
|
968 |
- Message(None, message_type, message, **args))
|
|
969 |
- |
|
970 | 960 |
# _add_queue()
|
971 | 961 |
#
|
972 | 962 |
# Adds a queue to the stream
|
... | ... | @@ -1011,16 +1001,19 @@ class Stream(): |
1011 | 1001 |
|
1012 | 1002 |
_, status = self._scheduler.run(self.queues)
|
1013 | 1003 |
|
1014 |
- # Force update element states after a run, such that the summary
|
|
1015 |
- # is more coherent
|
|
1016 |
- try:
|
|
1017 |
- for element in self.total_elements:
|
|
1018 |
- element._update_state()
|
|
1019 |
- except BstError as e:
|
|
1020 |
- self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
|
|
1021 |
- set_last_task_error(e.domain, e.reason)
|
|
1022 |
- except Exception as e: # pylint: disable=broad-except
|
|
1023 |
- self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
|
|
1004 |
+ element = None
|
|
1005 |
+ |
|
1006 |
+ # Handle unhandled exceptions
|
|
1007 |
+ with self._context.report_unhandled_exceptions("Unhandled exception while resolving final state",
|
|
1008 |
+ unique_id=element):
|
|
1009 |
+ # Force update element states after a run, such that the summary
|
|
1010 |
+ # is more coherent
|
|
1011 |
+ try:
|
|
1012 |
+ for element in self.total_elements:
|
|
1013 |
+ element._update_state()
|
|
1014 |
+ except BstError as e:
|
|
1015 |
+ self._context.error("Error resolving final state", detail=str(e))
|
|
1016 |
+ set_last_task_error(e.domain, e.reason)
|
|
1024 | 1017 |
|
1025 | 1018 |
if status == SchedStatus.ERROR:
|
1026 | 1019 |
raise StreamError()
|
... | ... | @@ -117,7 +117,6 @@ from weakref import WeakValueDictionary |
117 | 117 |
from . import _yaml
|
118 | 118 |
from . import utils
|
119 | 119 |
from ._exceptions import PluginError, ImplError
|
120 |
-from ._message import Message, MessageType
|
|
121 | 120 |
|
122 | 121 |
|
123 | 122 |
class Plugin():
|
... | ... | @@ -463,8 +462,7 @@ class Plugin(): |
463 | 462 |
brief (str): The brief message
|
464 | 463 |
detail (str): An optional detailed message, can be multiline output
|
465 | 464 |
"""
|
466 |
- if self.__context.log_debug:
|
|
467 |
- self.__message(MessageType.DEBUG, brief, detail=detail)
|
|
465 |
+ self.__context.debug(brief, detail=detail, plugin=self)
|
|
468 | 466 |
|
469 | 467 |
def status(self, brief, *, detail=None):
|
470 | 468 |
"""Print a status message
|
... | ... | @@ -473,9 +471,9 @@ class Plugin(): |
473 | 471 |
brief (str): The brief message
|
474 | 472 |
detail (str): An optional detailed message, can be multiline output
|
475 | 473 |
|
476 |
- Note: Status messages tell about what a plugin is currently doing
|
|
474 |
+ Note: Status messages tell the user what a plugin is currently doing
|
|
477 | 475 |
"""
|
478 |
- self.__message(MessageType.STATUS, brief, detail=detail)
|
|
476 |
+ self.__context.status(brief, detail=detail, plugin=self)
|
|
479 | 477 |
|
480 | 478 |
def info(self, brief, *, detail=None):
|
481 | 479 |
"""Print an informative message
|
... | ... | @@ -487,7 +485,7 @@ class Plugin(): |
487 | 485 |
Note: Informative messages tell the user something they might want
|
488 | 486 |
to know, like if refreshing an element caused it to change.
|
489 | 487 |
"""
|
490 |
- self.__message(MessageType.INFO, brief, detail=detail)
|
|
488 |
+ self.__context.info(brief, detail=detail, plugin=self)
|
|
491 | 489 |
|
492 | 490 |
def warn(self, brief, *, detail=None, warning_token=None):
|
493 | 491 |
"""Print a warning message, checks warning_token against project configuration
|
... | ... | @@ -510,7 +508,18 @@ class Plugin(): |
510 | 508 |
if project._warning_is_fatal(warning_token):
|
511 | 509 |
raise PluginError(message="{}\n{}".format(brief, detail), reason=warning_token)
|
512 | 510 |
|
513 |
- self.__message(MessageType.WARN, brief=brief, detail=detail)
|
|
511 |
+ self.__context.warn(brief, detail=detail, plugin=self)
|
|
512 |
+ |
|
513 |
+ def skipped(self, brief, *, detail=None):
|
|
514 |
+ """Prints a message indicating that an action has been skipped.
|
|
515 |
+ |
|
516 |
+ Args:
|
|
517 |
+ brief (str): The brief message
|
|
518 |
+ detail (str): An optional detailed message, can be multiline output
|
|
519 |
+ |
|
520 |
+ (*Since 1.4*)
|
|
521 |
+ """
|
|
522 |
+ self.__context.skipped(brief, detail=detail, plugin=self)
|
|
514 | 523 |
|
515 | 524 |
def log(self, brief, *, detail=None):
|
516 | 525 |
"""Log a message into the plugin's log file
|
... | ... | @@ -522,7 +531,7 @@ class Plugin(): |
522 | 531 |
brief (str): The brief message
|
523 | 532 |
detail (str): An optional detailed message, can be multiline output
|
524 | 533 |
"""
|
525 |
- self.__message(MessageType.LOG, brief, detail=detail)
|
|
534 |
+ self.__context.log(brief, detail=detail, plugin=self)
|
|
526 | 535 |
|
527 | 536 |
@contextmanager
|
528 | 537 |
def timed_activity(self, activity_name, *, detail=None, silent_nested=False):
|
... | ... | @@ -718,14 +727,9 @@ class Plugin(): |
718 | 727 |
|
719 | 728 |
return (exit_code, output)
|
720 | 729 |
|
721 |
- def __message(self, message_type, brief, **kwargs):
|
|
722 |
- message = Message(self.__unique_id, message_type, brief, **kwargs)
|
|
723 |
- self.__context.message(message)
|
|
724 |
- |
|
725 | 730 |
def __note_command(self, output, *popenargs, **kwargs):
|
726 |
- workdir = os.getcwd()
|
|
727 |
- if 'cwd' in kwargs:
|
|
728 |
- workdir = kwargs['cwd']
|
|
731 |
+ workdir = kwargs.get("cwd", os.getcwd())
|
|
732 |
+ |
|
729 | 733 |
command = " ".join(popenargs[0])
|
730 | 734 |
output.write('Running host command {}: {}\n'.format(workdir, command))
|
731 | 735 |
output.flush()
|
... | ... | @@ -966,6 +966,17 @@ def _tempdir(suffix="", prefix="tmp", dir=None): # pylint: disable=redefined-bu |
966 | 966 |
cleanup_tempdir()
|
967 | 967 |
|
968 | 968 |
|
969 |
+# _none_context()
|
|
970 |
+#
|
|
971 |
+# An empty context, useful for optional contexts e.g.
|
|
972 |
+#
|
|
973 |
+# with (_tempdir() if <value> else _none_context())
|
|
974 |
+#
|
|
975 |
+@contextmanager
|
|
976 |
+def _none_context():
|
|
977 |
+ yield
|
|
978 |
+ |
|
979 |
+ |
|
969 | 980 |
# _kill_process_tree()
|
970 | 981 |
#
|
971 | 982 |
# Brutally murder a process and all of it's children
|
1 |
+import os
|
|
2 |
+import pytest
|
|
3 |
+ |
|
4 |
+from buildstream import _yaml
|
|
5 |
+from buildstream._artifactcache import CACHE_SIZE_FILE
|
|
6 |
+ |
|
7 |
+from tests.testutils import cli, create_element_size
|
|
8 |
+ |
|
9 |
+# XXX: Currently lacking:
|
|
10 |
+# * A way to check whether it's faster to read cache size on
|
|
11 |
+# successive invocations.
|
|
12 |
+# * A way to check whether the cache size file has been read.
|
|
13 |
+ |
|
14 |
+ |
|
15 |
+def create_project(project_dir):
|
|
16 |
+ project_file = os.path.join(project_dir, "project.conf")
|
|
17 |
+ project_conf = {
|
|
18 |
+ "name": "test"
|
|
19 |
+ }
|
|
20 |
+ _yaml.dump(project_conf, project_file)
|
|
21 |
+ element_name = "test.bst"
|
|
22 |
+ create_element_size(element_name, project_dir, ".", [], 1024)
|
|
23 |
+ |
|
24 |
+ |
|
25 |
+def test_cache_size_roundtrip(cli, tmpdir):
|
|
26 |
+ # Builds (to put files in the cache), then invokes buildstream again
|
|
27 |
+ # to check nothing breaks
|
|
28 |
+ |
|
29 |
+ # Create project
|
|
30 |
+ project_dir = str(tmpdir)
|
|
31 |
+ create_project(project_dir)
|
|
32 |
+ |
|
33 |
+ # Build, to populate the cache
|
|
34 |
+ res = cli.run(project=project_dir, args=["build", "test.bst"])
|
|
35 |
+ res.assert_success()
|
|
36 |
+ |
|
37 |
+ # Show, to check that nothing breaks while reading cache size
|
|
38 |
+ res = cli.run(project=project_dir, args=["show", "test.bst"])
|
|
39 |
+ res.assert_success()
|
|
40 |
+ |
|
41 |
+ |
|
42 |
+def test_cache_size_write(cli, tmpdir):
|
|
43 |
+ # Builds (to put files in the cache), then checks a number is
|
|
44 |
+ # written to the cache size file.
|
|
45 |
+ |
|
46 |
+ project_dir = str(tmpdir)
|
|
47 |
+ create_project(project_dir)
|
|
48 |
+ |
|
49 |
+ # Artifact cache must be in a known place
|
|
50 |
+ artifactdir = os.path.join(project_dir, "artifacts")
|
|
51 |
+ cli.configure({"artifactdir": artifactdir})
|
|
52 |
+ |
|
53 |
+ # Build, to populate the cache
|
|
54 |
+ res = cli.run(project=project_dir, args=["build", "test.bst"])
|
|
55 |
+ res.assert_success()
|
|
56 |
+ |
|
57 |
+ # Inspect the artifact cache
|
|
58 |
+ sizefile = os.path.join(artifactdir, CACHE_SIZE_FILE)
|
|
59 |
+ assert os.path.isfile(sizefile)
|
|
60 |
+ with open(sizefile, "r") as f:
|
|
61 |
+ size_data = f.read()
|
|
62 |
+ size = int(size_data)
|
... | ... | @@ -140,6 +140,7 @@ class ArtifactShare(): |
140 | 140 |
|
141 | 141 |
return statvfs_result(f_blocks=self.total_space,
|
142 | 142 |
f_bfree=self.free_space - repo_size,
|
143 |
+ f_bavail=self.free_space - repo_size,
|
|
143 | 144 |
f_bsize=1)
|
144 | 145 |
|
145 | 146 |
|
... | ... | @@ -156,4 +157,4 @@ def create_artifact_share(directory, *, total_space=None, free_space=None): |
156 | 157 |
share.close()
|
157 | 158 |
|
158 | 159 |
|
159 |
-statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize')
|
|
160 |
+statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize f_bavail')
|