[Notes] [Git][BuildStream/buildstream][Qinusty/message-helpers] 10 commits: Use ArtifactCache's get_cache_size when calculating the quota



Title: GitLab

Qinusty pushed to branch Qinusty/message-helpers at BuildStream / buildstream

Commits:

18 changed files:

Changes:

  • buildstream/_artifactcache/__init__.py
    ... ... @@ -17,4 +17,4 @@
    17 17
     #  Authors:
    
    18 18
     #        Tristan Van Berkom <tristan vanberkom codethink co uk>
    
    19 19
     
    
    20
    -from .artifactcache import ArtifactCache, ArtifactCacheSpec
    20
    +from .artifactcache import ArtifactCache, ArtifactCacheSpec, CACHE_SIZE_FILE

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -23,11 +23,13 @@ from collections import Mapping, namedtuple
    23 23
     
    
    24 24
     from ..element_enums import _KeyStrength
    
    25 25
     from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
    
    26
    -from .._message import Message, MessageType
    
    27 26
     from .. import utils
    
    28 27
     from .. import _yaml
    
    29 28
     
    
    30 29
     
    
    30
    +CACHE_SIZE_FILE = "cache_size"
    
    31
    +
    
    32
    +
    
    31 33
     # An ArtifactCacheSpec holds the user configuration for a single remote
    
    32 34
     # artifact cache.
    
    33 35
     #
    
    ... ... @@ -82,7 +84,6 @@ class ArtifactCache():
    82 84
             self.extractdir = os.path.join(context.artifactdir, 'extract')
    
    83 85
             self.tmpdir = os.path.join(context.artifactdir, 'tmp')
    
    84 86
     
    
    85
    -        self.max_size = context.cache_quota
    
    86 87
             self.estimated_size = None
    
    87 88
     
    
    88 89
             self.global_remote_specs = []
    
    ... ... @@ -90,6 +91,8 @@ class ArtifactCache():
    90 91
     
    
    91 92
             self._local = False
    
    92 93
             self.cache_size = None
    
    94
    +        self.cache_quota = None
    
    95
    +        self.cache_lower_threshold = None
    
    93 96
     
    
    94 97
             os.makedirs(self.extractdir, exist_ok=True)
    
    95 98
             os.makedirs(self.tmpdir, exist_ok=True)
    
    ... ... @@ -227,7 +230,7 @@ class ArtifactCache():
    227 230
         def clean(self):
    
    228 231
             artifacts = self.list_artifacts()
    
    229 232
     
    
    230
    -        while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold:
    
    233
    +        while self.calculate_cache_size() >= self.cache_quota - self.cache_lower_threshold:
    
    231 234
                 try:
    
    232 235
                     to_remove = artifacts.pop(0)
    
    233 236
                 except IndexError:
    
    ... ... @@ -241,7 +244,7 @@ class ArtifactCache():
    241 244
                               "Please increase the cache-quota in {}."
    
    242 245
                               .format(self.context.config_origin or default_conf))
    
    243 246
     
    
    244
    -                if self.calculate_cache_size() > self.context.cache_quota:
    
    247
    +                if self.calculate_cache_size() > self.cache_quota:
    
    245 248
                         raise ArtifactError("Cache too full. Aborting.",
    
    246 249
                                             detail=detail,
    
    247 250
                                             reason="cache-too-full")
    
    ... ... @@ -282,7 +285,11 @@ class ArtifactCache():
    282 285
             # If we don't currently have an estimate, figure out the real
    
    283 286
             # cache size.
    
    284 287
             if self.estimated_size is None:
    
    285
    -            self.estimated_size = self.calculate_cache_size()
    
    288
    +            stored_size = self._read_cache_size()
    
    289
    +            if stored_size is not None:
    
    290
    +                self.estimated_size = stored_size
    
    291
    +            else:
    
    292
    +                self.estimated_size = self.calculate_cache_size()
    
    286 293
     
    
    287 294
             return self.estimated_size
    
    288 295
     
    
    ... ... @@ -490,15 +497,6 @@ class ArtifactCache():
    490 497
         #               Local Private Methods          #
    
    491 498
         ################################################
    
    492 499
     
    
    493
    -    # _message()
    
    494
    -    #
    
    495
    -    # Local message propagator
    
    496
    -    #
    
    497
    -    def _message(self, message_type, message, **kwargs):
    
    498
    -        args = dict(kwargs)
    
    499
    -        self.context.message(
    
    500
    -            Message(None, message_type, message, **args))
    
    501
    -
    
    502 500
         # _set_remotes():
    
    503 501
         #
    
    504 502
         # Set the list of remote caches. If project is None, the global list of
    
    ... ... @@ -522,7 +520,7 @@ class ArtifactCache():
    522 520
         #
    
    523 521
         def _initialize_remotes(self):
    
    524 522
             def remote_failed(url, error):
    
    525
    -            self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error))
    
    523
    +            self.context.warn("Failed to fetch remote refs from {}: {}".format(url, error))
    
    526 524
     
    
    527 525
             with self.context.timed_activity("Initializing remote caches", silent_nested=True):
    
    528 526
                 self.initialize_remotes(on_failure=remote_failed)
    
    ... ... @@ -541,6 +539,7 @@ class ArtifactCache():
    541 539
                 self.estimated_size = self.calculate_cache_size()
    
    542 540
     
    
    543 541
             self.estimated_size += artifact_size
    
    542
    +        self._write_cache_size(self.estimated_size)
    
    544 543
     
    
    545 544
         # _set_cache_size()
    
    546 545
         #
    
    ... ... @@ -551,6 +550,109 @@ class ArtifactCache():
    551 550
         def _set_cache_size(self, cache_size):
    
    552 551
             self.estimated_size = cache_size
    
    553 552
     
    
    553
    +        # set_cache_size is called in cleanup, where it may set the cache to None
    
    554
    +        if self.estimated_size is not None:
    
    555
    +            self._write_cache_size(self.estimated_size)
    
    556
    +
    
    557
    +    # _write_cache_size()
    
    558
    +    #
    
    559
    +    # Writes the given size of the artifact to the cache's size file
    
    560
    +    #
    
    561
    +    def _write_cache_size(self, size):
    
    562
    +        assert isinstance(size, int)
    
    563
    +        size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
    
    564
    +        with open(size_file_path, "w") as f:
    
    565
    +            f.write(str(size))
    
    566
    +
    
    567
    +    # _read_cache_size()
    
    568
    +    #
    
    569
    +    # Reads and returns the size of the artifact cache that's stored in the
    
    570
    +    # cache's size file
    
    571
    +    #
    
    572
    +    def _read_cache_size(self):
    
    573
    +        size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
    
    574
    +
    
    575
    +        if not os.path.exists(size_file_path):
    
    576
    +            return None
    
    577
    +
    
    578
    +        with open(size_file_path, "r") as f:
    
    579
    +            size = f.read()
    
    580
    +
    
    581
    +        try:
    
    582
    +            num_size = int(size)
    
    583
    +        except ValueError as e:
    
    584
    +            raise ArtifactError("Size '{}' parsed from '{}' was not an integer".format(
    
    585
    +                size, size_file_path)) from e
    
    586
    +
    
    587
    +        return num_size
    
    588
    +
    
    589
    +    # _calculate_cache_quota()
    
    590
    +    #
    
    591
    +    # Calculates and sets the cache quota and lower threshold based on the
    
    592
    +    # quota set in Context.
    
    593
    +    # It checks that the quota is both a valid _expression_, and that there is
    
    594
    +    # enough disk space to satisfy that quota
    
    595
    +    #
    
    596
    +    def _calculate_cache_quota(self):
    
    597
    +        # Headroom intended to give BuildStream a bit of leeway.
    
    598
    +        # This acts as the minimum size of cache_quota and also
    
    599
    +        # is taken from the user requested cache_quota.
    
    600
    +        #
    
    601
    +        if 'BST_TEST_SUITE' in os.environ:
    
    602
    +            headroom = 0
    
    603
    +        else:
    
    604
    +            headroom = 2e9
    
    605
    +
    
    606
    +        artifactdir_volume = self.context.artifactdir
    
    607
    +        while not os.path.exists(artifactdir_volume):
    
    608
    +            artifactdir_volume = os.path.dirname(artifactdir_volume)
    
    609
    +
    
    610
    +        try:
    
    611
    +            cache_quota = utils._parse_size(self.context.config_cache_quota, artifactdir_volume)
    
    612
    +        except utils.UtilError as e:
    
    613
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    614
    +                            "{}\nPlease specify the value in bytes or as a % of full disk space.\n"
    
    615
    +                            "\nValid values are, for example: 800M 10G 1T 50%\n"
    
    616
    +                            .format(str(e))) from e
    
    617
    +
    
    618
    +        stat = os.statvfs(artifactdir_volume)
    
    619
    +        available_space = (stat.f_bsize * stat.f_bavail)
    
    620
    +
    
    621
    +        cache_size = self.get_approximate_cache_size()
    
    622
    +
    
    623
    +        # Ensure system has enough storage for the cache_quota
    
    624
    +        #
    
    625
    +        # If cache_quota is none, set it to the maximum it could possibly be.
    
    626
    +        #
    
    627
    +        # Also check that cache_quota is atleast as large as our headroom.
    
    628
    +        #
    
    629
    +        if cache_quota is None:  # Infinity, set to max system storage
    
    630
    +            cache_quota = cache_size + available_space
    
    631
    +        if cache_quota < headroom:  # Check minimum
    
    632
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    633
    +                            "Invalid cache quota ({}): ".format(utils._pretty_size(cache_quota)) +
    
    634
    +                            "BuildStream requires a minimum cache quota of 2G.")
    
    635
    +        elif cache_quota > cache_size + available_space:  # Check maximum
    
    636
    +            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    637
    +                            ("Your system does not have enough available " +
    
    638
    +                             "space to support the cache quota specified.\n" +
    
    639
    +                             "You currently have:\n" +
    
    640
    +                             "- {used} of cache in use at {local_cache_path}\n" +
    
    641
    +                             "- {available} of available system storage").format(
    
    642
    +                                 used=utils._pretty_size(cache_size),
    
    643
    +                                 local_cache_path=self.context.artifactdir,
    
    644
    +                                 available=utils._pretty_size(available_space)))
    
    645
    +
    
    646
    +        # Place a slight headroom (2e9 (2GB) on the cache_quota) into
    
    647
    +        # cache_quota to try and avoid exceptions.
    
    648
    +        #
    
    649
    +        # Of course, we might still end up running out during a build
    
    650
    +        # if we end up writing more than 2G, but hey, this stuff is
    
    651
    +        # already really fuzzy.
    
    652
    +        #
    
    653
    +        self.cache_quota = cache_quota - headroom
    
    654
    +        self.cache_lower_threshold = self.cache_quota / 2
    
    655
    +
    
    554 656
     
    
    555 657
     # _configured_remote_artifact_cache_specs():
    
    556 658
     #
    

  • buildstream/_artifactcache/cascache.py
    ... ... @@ -34,7 +34,6 @@ from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    34 34
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    35 35
     from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
    
    36 36
     
    
    37
    -from .._message import MessageType, Message
    
    38 37
     from .. import _signals, utils
    
    39 38
     from .._exceptions import ArtifactError
    
    40 39
     
    
    ... ... @@ -61,6 +60,8 @@ class CASCache(ArtifactCache):
    61 60
             os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
    
    62 61
             os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
    
    63 62
     
    
    63
    +        self._calculate_cache_quota()
    
    64
    +
    
    64 65
             self._enable_push = enable_push
    
    65 66
     
    
    66 67
             # Per-project list of _CASRemote instances.
    
    ... ... @@ -330,7 +331,7 @@ class CASCache(ArtifactCache):
    330 331
                                         request.write_offset = offset
    
    331 332
                                         # max. 64 kB chunks
    
    332 333
                                         request.data = f.read(chunk_size)
    
    333
    -                                    request.resource_name = resource_name
    
    334
    +                                    request.resource_name = resource_name  # pylint: disable=cell-var-from-loop
    
    334 335
                                         request.finish_write = remaining <= 0
    
    335 336
                                         yield request
    
    336 337
                                         offset += chunk_size
    
    ... ... @@ -350,12 +351,10 @@ class CASCache(ArtifactCache):
    350 351
                         raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
    
    351 352
     
    
    352 353
                 if skipped_remote:
    
    353
    -                self.context.message(Message(
    
    354
    -                    None,
    
    355
    -                    MessageType.SKIPPED,
    
    354
    +                self.context.skipped(
    
    356 355
                         "Remote ({}) already has {} cached".format(
    
    357 356
                             remote.spec.url, element._get_brief_display_key())
    
    358
    -                ))
    
    357
    +                )
    
    359 358
             return pushed
    
    360 359
     
    
    361 360
         ################################################
    

  • buildstream/_context.py
    ... ... @@ -19,6 +19,7 @@
    19 19
     
    
    20 20
     import os
    
    21 21
     import datetime
    
    22
    +import traceback
    
    22 23
     from collections import deque, Mapping
    
    23 24
     from contextlib import contextmanager
    
    24 25
     from . import utils
    
    ... ... @@ -26,6 +27,7 @@ from . import _cachekey
    26 27
     from . import _signals
    
    27 28
     from . import _site
    
    28 29
     from . import _yaml
    
    30
    +from .plugin import Plugin
    
    29 31
     from ._exceptions import LoadError, LoadErrorReason, BstError
    
    30 32
     from ._message import Message, MessageType
    
    31 33
     from ._profile import Topics, profile_start, profile_end
    
    ... ... @@ -64,12 +66,6 @@ class Context():
    64 66
             # The locations from which to push and pull prebuilt artifacts
    
    65 67
             self.artifact_cache_specs = []
    
    66 68
     
    
    67
    -        # The artifact cache quota
    
    68
    -        self.cache_quota = None
    
    69
    -
    
    70
    -        # The lower threshold to which we aim to reduce the cache size
    
    71
    -        self.cache_lower_threshold = None
    
    72
    -
    
    73 69
             # The directory to store build logs
    
    74 70
             self.logdir = None
    
    75 71
     
    
    ... ... @@ -124,6 +120,8 @@ class Context():
    124 120
             self._workspaces = None
    
    125 121
             self._log_handle = None
    
    126 122
             self._log_filename = None
    
    123
    +        self.config_cache_quota = 'infinity'
    
    124
    +        self.artifactdir_volume = None
    
    127 125
     
    
    128 126
         # load()
    
    129 127
         #
    
    ... ... @@ -183,71 +181,7 @@ class Context():
    183 181
             cache = _yaml.node_get(defaults, Mapping, 'cache')
    
    184 182
             _yaml.node_validate(cache, ['quota'])
    
    185 183
     
    
    186
    -        artifactdir_volume = self.artifactdir
    
    187
    -        while not os.path.exists(artifactdir_volume):
    
    188
    -            artifactdir_volume = os.path.dirname(artifactdir_volume)
    
    189
    -
    
    190
    -        # We read and parse the cache quota as specified by the user
    
    191
    -        cache_quota = _yaml.node_get(cache, str, 'quota', default_value='infinity')
    
    192
    -        try:
    
    193
    -            cache_quota = utils._parse_size(cache_quota, artifactdir_volume)
    
    194
    -        except utils.UtilError as e:
    
    195
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    196
    -                            "{}\nPlease specify the value in bytes or as a % of full disk space.\n"
    
    197
    -                            "\nValid values are, for example: 800M 10G 1T 50%\n"
    
    198
    -                            .format(str(e))) from e
    
    199
    -
    
    200
    -        # Headroom intended to give BuildStream a bit of leeway.
    
    201
    -        # This acts as the minimum size of cache_quota and also
    
    202
    -        # is taken from the user requested cache_quota.
    
    203
    -        #
    
    204
    -        if 'BST_TEST_SUITE' in os.environ:
    
    205
    -            headroom = 0
    
    206
    -        else:
    
    207
    -            headroom = 2e9
    
    208
    -
    
    209
    -        stat = os.statvfs(artifactdir_volume)
    
    210
    -        available_space = (stat.f_bsize * stat.f_bavail)
    
    211
    -
    
    212
    -        # Again, the artifact directory may not yet have been created yet
    
    213
    -        #
    
    214
    -        if not os.path.exists(self.artifactdir):
    
    215
    -            cache_size = 0
    
    216
    -        else:
    
    217
    -            cache_size = utils._get_dir_size(self.artifactdir)
    
    218
    -
    
    219
    -        # Ensure system has enough storage for the cache_quota
    
    220
    -        #
    
    221
    -        # If cache_quota is none, set it to the maximum it could possibly be.
    
    222
    -        #
    
    223
    -        # Also check that cache_quota is atleast as large as our headroom.
    
    224
    -        #
    
    225
    -        if cache_quota is None:  # Infinity, set to max system storage
    
    226
    -            cache_quota = cache_size + available_space
    
    227
    -        if cache_quota < headroom:  # Check minimum
    
    228
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    229
    -                            "Invalid cache quota ({}): ".format(utils._pretty_size(cache_quota)) +
    
    230
    -                            "BuildStream requires a minimum cache quota of 2G.")
    
    231
    -        elif cache_quota > cache_size + available_space:  # Check maximum
    
    232
    -            raise LoadError(LoadErrorReason.INVALID_DATA,
    
    233
    -                            ("Your system does not have enough available " +
    
    234
    -                             "space to support the cache quota specified.\n" +
    
    235
    -                             "You currently have:\n" +
    
    236
    -                             "- {used} of cache in use at {local_cache_path}\n" +
    
    237
    -                             "- {available} of available system storage").format(
    
    238
    -                                 used=utils._pretty_size(cache_size),
    
    239
    -                                 local_cache_path=self.artifactdir,
    
    240
    -                                 available=utils._pretty_size(available_space)))
    
    241
    -
    
    242
    -        # Place a slight headroom (2e9 (2GB) on the cache_quota) into
    
    243
    -        # cache_quota to try and avoid exceptions.
    
    244
    -        #
    
    245
    -        # Of course, we might still end up running out during a build
    
    246
    -        # if we end up writing more than 2G, but hey, this stuff is
    
    247
    -        # already really fuzzy.
    
    248
    -        #
    
    249
    -        self.cache_quota = cache_quota - headroom
    
    250
    -        self.cache_lower_threshold = self.cache_quota / 2
    
    184
    +        self.config_cache_quota = _yaml.node_get(cache, str, 'quota', default_value='infinity')
    
    251 185
     
    
    252 186
             # Load artifact share configuration
    
    253 187
             self.artifact_cache_specs = ArtifactCache.specs_from_config_node(defaults)
    
    ... ... @@ -385,7 +319,7 @@ class Context():
    385 319
         # the context.
    
    386 320
         #
    
    387 321
         # The message handler should have the same signature as
    
    388
    -    # the message() method
    
    322
    +    # the _send_message() method
    
    389 323
         def set_message_handler(self, handler):
    
    390 324
             self._message_handler = handler
    
    391 325
     
    
    ... ... @@ -400,16 +334,15 @@ class Context():
    400 334
                     return True
    
    401 335
             return False
    
    402 336
     
    
    403
    -    # message():
    
    337
    +    # _send_message():
    
    404 338
         #
    
    405
    -    # Proxies a message back to the caller, this is the central
    
    339
    +    # Proxies a message back through the message handler, this is the central
    
    406 340
         # point through which all messages pass.
    
    407 341
         #
    
    408 342
         # Args:
    
    409 343
         #    message: A Message object
    
    410 344
         #
    
    411
    -    def message(self, message):
    
    412
    -
    
    345
    +    def _send_message(self, message):
    
    413 346
             # Tag message only once
    
    414 347
             if message.depth is None:
    
    415 348
                 message.depth = len(list(self._message_depth))
    
    ... ... @@ -423,7 +356,72 @@ class Context():
    423 356
             assert self._message_handler
    
    424 357
     
    
    425 358
             self._message_handler(message, context=self)
    
    426
    -        return
    
    359
    +
    
    360
    +    # _message():
    
    361
    +    #
    
    362
    +    # The global message API. Any message-sending functions should go
    
    363
    +    # through here. This will call `_send_message` to deliver the
    
    364
    +    # final message.
    
    365
    +    #
    
    366
    +    def _message(self, text, *, plugin=None, msg_type=None, **kwargs):
    
    367
    +        assert msg_type is not None
    
    368
    +
    
    369
    +        if isinstance(plugin, Plugin):
    
    370
    +            plugin_id = plugin._get_unique_id()
    
    371
    +        else:
    
    372
    +            plugin_id = plugin
    
    373
    +
    
    374
    +        self._send_message(Message(plugin_id, msg_type, str(text), **kwargs))
    
    375
    +
    
    376
    +    # skipped():
    
    377
    +    #
    
    378
    +    # Produce and send a skipped message through the context.
    
    379
    +    #
    
    380
    +    def skipped(self, text, **kwargs):
    
    381
    +        self._message(text, msg_type=MessageType.SKIPPED, **kwargs)
    
    382
    +
    
    383
    +    # debug():
    
    384
    +    #
    
    385
    +    # Produce and send a debug message through the context.
    
    386
    +    #
    
    387
    +    def debug(self, text, **kwargs):
    
    388
    +        if self.log_debug:
    
    389
    +            self._message(text, msg_type=MessageType.DEBUG, **kwargs)
    
    390
    +
    
    391
    +    # status():
    
    392
    +    #
    
    393
    +    # Produce and send a status message through the context.
    
    394
    +    #
    
    395
    +    def status(self, text, **kwargs):
    
    396
    +        self._message(text, msg_type=MessageType.STATUS, **kwargs)
    
    397
    +
    
    398
    +    # info():
    
    399
    +    #
    
    400
    +    # Produce and send a info message through the context.
    
    401
    +    #
    
    402
    +    def info(self, text, **kwargs):
    
    403
    +        self._message(text, msg_type=MessageType.INFO, **kwargs)
    
    404
    +
    
    405
    +    # warn():
    
    406
    +    #
    
    407
    +    # Produce and send a warning message through the context.
    
    408
    +    #
    
    409
    +    def warn(self, text, **kwargs):
    
    410
    +        self._message(text, msg_type=MessageType.WARN, **kwargs)
    
    411
    +
    
    412
    +    # error():
    
    413
    +    #
    
    414
    +    # Produce and send a error message through the context.
    
    415
    +    #
    
    416
    +    def error(self, text, **kwargs):
    
    417
    +        self._message(text, msg_type=MessageType.ERROR, **kwargs)
    
    418
    +
    
    419
    +    # log():
    
    420
    +    #
    
    421
    +    # Produce and send a log message through the context.
    
    422
    +    #
    
    423
    +    def log(self, text, **kwargs):
    
    424
    +        self._message(text, msg_type=MessageType.LOG, **kwargs)
    
    427 425
     
    
    428 426
         # silence()
    
    429 427
         #
    
    ... ... @@ -440,6 +438,14 @@ class Context():
    440 438
             finally:
    
    441 439
                 self._pop_message_depth()
    
    442 440
     
    
    441
    +    @contextmanager
    
    442
    +    def report_unhandled_exceptions(self, brief="An unhandled exception occured", *, unique_id=None, **kwargs):
    
    443
    +        try:
    
    444
    +            yield
    
    445
    +        except Exception:  # pylint: disable=broad-except
    
    446
    +            self._message(brief, plugin=unique_id, detail=traceback.format_exc(),
    
    447
    +                          msg_type=MessageType.BUG, **kwargs)
    
    448
    +
    
    443 449
         # timed_activity()
    
    444 450
         #
    
    445 451
         # Context manager for performing timed activities and logging those
    
    ... ... @@ -469,8 +475,8 @@ class Context():
    469 475
             with _signals.suspendable(stop_time, resume_time):
    
    470 476
                 try:
    
    471 477
                     # Push activity depth for status messages
    
    472
    -                message = Message(unique_id, MessageType.START, activity_name, detail=detail)
    
    473
    -                self.message(message)
    
    478
    +                self._message(activity_name, detail=detail, plugin=unique_id,
    
    479
    +                              msg_type=MessageType.START)
    
    474 480
                     self._push_message_depth(silent_nested)
    
    475 481
                     yield
    
    476 482
     
    
    ... ... @@ -478,15 +484,16 @@ class Context():
    478 484
                     # Note the failure in status messages and reraise, the scheduler
    
    479 485
                     # expects an error when there is an error.
    
    480 486
                     elapsed = datetime.datetime.now() - starttime
    
    481
    -                message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
    
    482 487
                     self._pop_message_depth()
    
    483
    -                self.message(message)
    
    488
    +                self._message(activity_name, detail=detail, elapsed=elapsed, plugin=unique_id,
    
    489
    +                              msg_type=MessageType.FAIL)
    
    484 490
                     raise
    
    485 491
     
    
    486 492
                 elapsed = datetime.datetime.now() - starttime
    
    487
    -            message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
    
    488 493
                 self._pop_message_depth()
    
    489
    -            self.message(message)
    
    494
    +            self._message(activity_name, detail=detail,
    
    495
    +                           elapsed=elapsed, plugin=unique_id,
    
    496
    +                           msg_type=MessageType.SUCCESS)
    
    490 497
     
    
    491 498
         # recorded_messages()
    
    492 499
         #
    

  • buildstream/_frontend/app.py
    ... ... @@ -37,6 +37,7 @@ from .._platform import Platform
    37 37
     from .._project import Project
    
    38 38
     from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
    
    39 39
     from .._message import Message, MessageType, unconditional_messages
    
    40
    +from .. import utils
    
    40 41
     from .._stream import Stream
    
    41 42
     from .._versions import BST_FORMAT_VERSION
    
    42 43
     from .. import _yaml
    
    ... ... @@ -198,8 +199,10 @@ class App():
    198 199
                 option_value = self._main_options.get(cli_option)
    
    199 200
                 if option_value is not None:
    
    200 201
                     setattr(self.context, context_attr, option_value)
    
    201
    -
    
    202
    -        Platform.create_instance(self.context)
    
    202
    +        try:
    
    203
    +            Platform.create_instance(self.context)
    
    204
    +        except BstError as e:
    
    205
    +            self._error_exit(e, "Error instantiating platform")
    
    203 206
     
    
    204 207
             # Create the logger right before setting the message handler
    
    205 208
             self.logger = LogLine(self.context,
    
    ... ... @@ -250,48 +253,38 @@ class App():
    250 253
                                   self._content_profile, self._format_profile,
    
    251 254
                                   self._success_profile, self._error_profile,
    
    252 255
                                   self.stream, colors=self.colors)
    
    253
    -
    
    254
    -        # Mark the beginning of the session
    
    255
    -        if session_name:
    
    256
    -            self._message(MessageType.START, session_name)
    
    257
    -
    
    258
    -        # Run the body of the session here, once everything is loaded
    
    256
    +        last_err = None
    
    259 257
             try:
    
    260
    -            yield
    
    261
    -        except BstError as e:
    
    262
    -
    
    263
    -            # Print a nice summary if this is a session
    
    264
    -            if session_name:
    
    265
    -                elapsed = self.stream.elapsed_time
    
    266
    -
    
    267
    -                if isinstance(e, StreamError) and e.terminated:  # pylint: disable=no-member
    
    268
    -                    self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
    
    269
    -                else:
    
    270
    -                    self._message(MessageType.FAIL, session_name, elapsed=elapsed)
    
    271
    -
    
    272
    -                    # Notify session failure
    
    273
    -                    self._notify("{} failed".format(session_name), "{}".format(e))
    
    274
    -
    
    275
    -                if self._started:
    
    276
    -                    self._print_summary()
    
    277
    -
    
    278
    -            # Exit with the error
    
    279
    -            self._error_exit(e)
    
    280
    -        except RecursionError:
    
    281
    -            click.echo("RecursionError: Depency depth is too large. Maximum recursion depth exceeded.",
    
    282
    -                       err=True)
    
    283
    -            sys.exit(-1)
    
    284
    -
    
    285
    -        else:
    
    286
    -            # No exceptions occurred, print session time and summary
    
    287
    -            if session_name:
    
    288
    -                self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
    
    289
    -                if self._started:
    
    290
    -                    self._print_summary()
    
    291
    -
    
    258
    +            with (self.context.timed_activity(session_name) if session_name else utils._none_context()):
    
    259
    +                # Run the body of the session here, once everything is loaded
    
    260
    +                try:
    
    261
    +                    yield
    
    262
    +                except BstError as e:
    
    263
    +                    last_err = e
    
    264
    +                    # Check for Stream error on termination
    
    265
    +                    if session_name and isinstance(e, StreamError) and e.terminated:  # pylint: disable=no-member
    
    266
    +                        elapsed = self.stream.elapsed_time
    
    267
    +                        self.context.warn(session_name + ' Terminated', elapsed=elapsed)
    
    268
    +                    else:
    
    269
    +                        raise  # Raise to timed_activity for failure.
    
    270
    +                except RecursionError:
    
    271
    +                    click.echo("RecursionError: Depency depth is too large. Maximum recursion depth exceeded.",
    
    272
    +                            err=True)
    
    273
    +                    sys.exit(-1)
    
    274
    +        except BstError as e:  # Catch from timed_activity()
    
    275
    +            # Notify session failure
    
    276
    +            self._notify("{} failed".format(session_name), "{}".format(e))
    
    277
    +
    
    278
    +        # No exceptions occurred, print session time and summary
    
    279
    +        if session_name and self._started:
    
    280
    +            self._print_summary()
    
    281
    +            if not last_err:
    
    292 282
                     # Notify session success
    
    293 283
                     self._notify("{} succeeded".format(session_name), "")
    
    294 284
     
    
    285
    +        if last_err:
    
    286
    +            self._error_exit(last_err)
    
    287
    +
    
    295 288
         # init_project()
    
    296 289
         #
    
    297 290
         # Initialize a new BuildStream project, either with the explicitly passed options,
    
    ... ... @@ -431,21 +424,13 @@ class App():
    431 424
             if self.interactive:
    
    432 425
                 self.notify(title, text)
    
    433 426
     
    
    434
    -    # Local message propagator
    
    435
    -    #
    
    436
    -    def _message(self, message_type, message, **kwargs):
    
    437
    -        args = dict(kwargs)
    
    438
    -        self.context.message(
    
    439
    -            Message(None, message_type, message, **args))
    
    440
    -
    
    441 427
         # Exception handler
    
    442 428
         #
    
    443 429
         def _global_exception_handler(self, etype, value, tb):
    
    444 430
     
    
    445 431
             # Print the regular BUG message
    
    446 432
             formatted = "".join(traceback.format_exception(etype, value, tb))
    
    447
    -        self._message(MessageType.BUG, str(value),
    
    448
    -                      detail=formatted)
    
    433
    +        self.context._message(str(value), detail=formatted, msg_type=MessageType.BUG)
    
    449 434
     
    
    450 435
             # If the scheduler has started, try to terminate all jobs gracefully,
    
    451 436
             # otherwise exit immediately.
    

  • buildstream/_pipeline.py
    ... ... @@ -24,7 +24,6 @@ import itertools
    24 24
     from operator import itemgetter
    
    25 25
     
    
    26 26
     from ._exceptions import PipelineError
    
    27
    -from ._message import Message, MessageType
    
    28 27
     from ._profile import Topics, profile_start, profile_end
    
    29 28
     from . import Scope, Consistency
    
    30 29
     from ._project import ProjectRefStorage
    
    ... ... @@ -201,8 +200,8 @@ class Pipeline():
    201 200
                 for t in targets:
    
    202 201
                     new_elm = t._get_source_element()
    
    203 202
                     if new_elm != t and not silent:
    
    204
    -                    self._message(MessageType.INFO, "Element '{}' redirected to '{}'"
    
    205
    -                                  .format(t.name, new_elm.name))
    
    203
    +                    self._context.info("Element '{}' redirected to '{}'"
    
    204
    +                                       .format(t.name, new_elm.name))
    
    206 205
                     if new_elm not in elements:
    
    207 206
                         elements.append(new_elm)
    
    208 207
             elif mode == PipelineSelection.PLAN:
    
    ... ... @@ -419,15 +418,6 @@ class Pipeline():
    419 418
     
    
    420 419
                     raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
    
    421 420
     
    
    422
    -    # _message()
    
    423
    -    #
    
    424
    -    # Local message propagator
    
    425
    -    #
    
    426
    -    def _message(self, message_type, message, **kwargs):
    
    427
    -        args = dict(kwargs)
    
    428
    -        self._context.message(
    
    429
    -            Message(None, message_type, message, **args))
    
    430
    -
    
    431 421
     
    
    432 422
     # _Planner()
    
    433 423
     #
    

  • buildstream/_platform/linux.py
    ... ... @@ -22,7 +22,6 @@ import subprocess
    22 22
     from .. import _site
    
    23 23
     from .. import utils
    
    24 24
     from .._artifactcache.cascache import CASCache
    
    25
    -from .._message import Message, MessageType
    
    26 25
     from ..sandbox import SandboxBwrap
    
    27 26
     
    
    28 27
     from . import Platform
    
    ... ... @@ -75,9 +74,9 @@ class Linux(Platform):
    75 74
                 return True
    
    76 75
     
    
    77 76
             else:
    
    78
    -            context.message(
    
    79
    -                Message(None, MessageType.WARN,
    
    80
    -                        "Unable to create user namespaces with bubblewrap, resorting to fallback",
    
    81
    -                        detail="Some builds may not function due to lack of uid / gid 0, " +
    
    82
    -                        "artifacts created will not be trusted for push purposes."))
    
    77
    +            context.warn(
    
    78
    +                "Unable to create user namespaces with bubblewrap, resorting to fallback",
    
    79
    +                detail="Some builds may not function due to lack of uid / gid 0, " +
    
    80
    +                "artifacts created will not be trusted for push purposes."
    
    81
    +            )
    
    83 82
                 return False

  • buildstream/_project.py
    ... ... @@ -36,7 +36,6 @@ from ._projectrefs import ProjectRefs, ProjectRefStorage
    36 36
     from ._versions import BST_FORMAT_VERSION
    
    37 37
     from ._loader import Loader
    
    38 38
     from .element import Element
    
    39
    -from ._message import Message, MessageType
    
    40 39
     from ._includes import Includes
    
    41 40
     
    
    42 41
     
    
    ... ... @@ -334,8 +333,7 @@ class Project():
    334 333
                     for source, ref in redundant_refs
    
    335 334
                 ]
    
    336 335
                 detail += "\n".join(lines)
    
    337
    -            self._context.message(
    
    338
    -                Message(None, MessageType.WARN, "Ignoring redundant source references", detail=detail))
    
    336
    +            self._context.warn("Ignoring redundant source references", detail=detail)
    
    339 337
     
    
    340 338
             return elements
    
    341 339
     
    
    ... ... @@ -492,13 +490,9 @@ class Project():
    492 490
     
    
    493 491
             # Deprecation check
    
    494 492
             if fail_on_overlap is not None:
    
    495
    -            self._context.message(
    
    496
    -                Message(
    
    497
    -                    None,
    
    498
    -                    MessageType.WARN,
    
    499
    -                    "Use of fail-on-overlap within project.conf " +
    
    500
    -                    "is deprecated. Consider using fatal-warnings instead."
    
    501
    -                )
    
    493
    +            self._context.warn(
    
    494
    +                "Use of fail-on-overlap within project.conf " +
    
    495
    +                "is deprecated. Consider using fatal-warnings instead."
    
    502 496
                 )
    
    503 497
     
    
    504 498
             # Load project.refs if it exists, this may be ignored.
    

  • buildstream/_scheduler/jobs/elementjob.py
    ... ... @@ -18,8 +18,6 @@
    18 18
     #
    
    19 19
     from ruamel import yaml
    
    20 20
     
    
    21
    -from ..._message import Message, MessageType
    
    22
    -
    
    23 21
     from .job import Job
    
    24 22
     
    
    25 23
     
    
    ... ... @@ -86,9 +84,8 @@ class ElementJob(Job):
    86 84
             # This should probably be omitted for non-build tasks but it's harmless here
    
    87 85
             elt_env = self._element.get_environment()
    
    88 86
             env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
    
    89
    -        self.message(MessageType.LOG,
    
    90
    -                     "Build environment for element {}".format(self._element.name),
    
    91
    -                     detail=env_dump)
    
    87
    +        self._log("Build environment for element {}".format(self._element.name),
    
    88
    +                  detail=env_dump, plugin=self.element, scheduler=True)
    
    92 89
     
    
    93 90
             # Run the action
    
    94 91
             return self._action_cb(self._element)
    
    ... ... @@ -96,15 +93,6 @@ class ElementJob(Job):
    96 93
         def parent_complete(self, success, result):
    
    97 94
             self._complete_cb(self, self._element, success, self._result)
    
    98 95
     
    
    99
    -    def message(self, message_type, message, **kwargs):
    
    100
    -        args = dict(kwargs)
    
    101
    -        args['scheduler'] = True
    
    102
    -        self._scheduler.context.message(
    
    103
    -            Message(self._element._get_unique_id(),
    
    104
    -                    message_type,
    
    105
    -                    message,
    
    106
    -                    **args))
    
    107
    -
    
    108 96
         def child_process_data(self):
    
    109 97
             data = {}
    
    110 98
     
    
    ... ... @@ -119,3 +107,10 @@ class ElementJob(Job):
    119 107
             data['cache_size'] = cache_size
    
    120 108
     
    
    121 109
             return data
    
    110
    +
    
    111
    +    # _fail()
    
    112
    +    #
    
    113
    +    # Override _fail to set scheduler kwarg to true.
    
    114
    +    #
    
    115
    +    def _fail(self, text, **kwargs):
    
    116
    +        super()._fail(text, scheduler=True, **kwargs)

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -32,7 +32,7 @@ import psutil
    32 32
     
    
    33 33
     # BuildStream toplevel imports
    
    34 34
     from ..._exceptions import ImplError, BstError, set_last_task_error
    
    35
    -from ..._message import Message, MessageType, unconditional_messages
    
    35
    +from ..._message import MessageType, unconditional_messages
    
    36 36
     from ... import _signals, utils
    
    37 37
     
    
    38 38
     # Return code values shutdown of job handling child processes
    
    ... ... @@ -109,6 +109,7 @@ class Job():
    109 109
             # Private members
    
    110 110
             #
    
    111 111
             self._scheduler = scheduler            # The scheduler
    
    112
    +        self._context = scheduler.context     # The context, used primarily for UI messaging.
    
    112 113
             self._queue = multiprocessing.Queue()  # A message passing queue
    
    113 114
             self._process = None                   # The Process object
    
    114 115
             self._watcher = None                   # Child process watcher
    
    ... ... @@ -179,7 +180,7 @@ class Job():
    179 180
             # First resume the job if it's suspended
    
    180 181
             self.resume(silent=True)
    
    181 182
     
    
    182
    -        self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
    
    183
    +        self._status("{} terminating".format(self.action_name))
    
    183 184
     
    
    184 185
             # Make sure there is no garbage on the queue
    
    185 186
             self._parent_stop_listening()
    
    ... ... @@ -210,8 +211,8 @@ class Job():
    210 211
         def kill(self):
    
    211 212
     
    
    212 213
             # Force kill
    
    213
    -        self.message(MessageType.WARN,
    
    214
    -                     "{} did not terminate gracefully, killing".format(self.action_name))
    
    214
    +        self._warn("{} did not terminate gracefully, killing"
    
    215
    +                   .format(self.action_name))
    
    215 216
     
    
    216 217
             try:
    
    217 218
                 utils._kill_process_tree(self._process.pid)
    
    ... ... @@ -226,8 +227,7 @@ class Job():
    226 227
         #
    
    227 228
         def suspend(self):
    
    228 229
             if not self._suspended:
    
    229
    -            self.message(MessageType.STATUS,
    
    230
    -                         "{} suspending".format(self.action_name))
    
    230
    +            self._status("{} suspending".format(self.action_name))
    
    231 231
     
    
    232 232
                 try:
    
    233 233
                     # Use SIGTSTP so that child processes may handle and propagate
    
    ... ... @@ -251,8 +251,7 @@ class Job():
    251 251
         def resume(self, silent=False):
    
    252 252
             if self._suspended:
    
    253 253
                 if not silent and not self._scheduler.terminated:
    
    254
    -                self.message(MessageType.STATUS,
    
    255
    -                             "{} resuming".format(self.action_name))
    
    254
    +                self._status("{} resuming".format(self.action_name))
    
    256 255
     
    
    257 256
                 os.kill(self._process.pid, signal.SIGCONT)
    
    258 257
                 self._suspended = False
    
    ... ... @@ -305,21 +304,6 @@ class Job():
    305 304
             raise ImplError("Job '{kind}' does not implement child_process()"
    
    306 305
                             .format(kind=type(self).__name__))
    
    307 306
     
    
    308
    -    # message():
    
    309
    -    #
    
    310
    -    # Logs a message, this will be logged in the task's logfile and
    
    311
    -    # conditionally also be sent to the frontend.
    
    312
    -    #
    
    313
    -    # Args:
    
    314
    -    #    message_type (MessageType): The type of message to send
    
    315
    -    #    message (str): The message
    
    316
    -    #    kwargs: Remaining Message() constructor arguments
    
    317
    -    #
    
    318
    -    def message(self, message_type, message, **kwargs):
    
    319
    -        args = dict(kwargs)
    
    320
    -        args['scheduler'] = True
    
    321
    -        self._scheduler.context.message(Message(None, message_type, message, **args))
    
    322
    -
    
    323 307
         # child_process_data()
    
    324 308
         #
    
    325 309
         # Abstract method to retrieve additional data that should be
    
    ... ... @@ -346,6 +330,32 @@ class Job():
    346 330
         #
    
    347 331
         #######################################################
    
    348 332
     
    
    333
    +    def _debug(self, text, **kwargs):
    
    334
    +        self._context.debug(text, task_id=self._task_id, **kwargs)
    
    335
    +
    
    336
    +    def _status(self, text, **kwargs):
    
    337
    +        self._context.status(text, task_id=self._task_id, **kwargs)
    
    338
    +
    
    339
    +    def _info(self, text, **kwargs):
    
    340
    +        self._context.info(text, task_id=self._task_id, **kwargs)
    
    341
    +
    
    342
    +    def _warn(self, text, **kwargs):
    
    343
    +        self._context.warn(text, task_id=self._task_id, **kwargs)
    
    344
    +
    
    345
    +    def _error(self, text, **kwargs):
    
    346
    +        self._context.error(text, task_id=self._task_id, **kwargs)
    
    347
    +
    
    348
    +    def _log(self, text, **kwargs):
    
    349
    +        self._context.log(text, task_id=self._task_id, **kwargs)
    
    350
    +
    
    351
    +    # _fail()
    
    352
    +    #
    
    353
    +    # Only exists for sub classes to override and add kwargs to.
    
    354
    +    #
    
    355
    +    def _fail(self, text, **kwargs):
    
    356
    +        self._context._message(text, task_id=self._task_id,
    
    357
    +                               msg_type=MessageType.FAIL, **kwargs)
    
    358
    +
    
    349 359
         # _child_action()
    
    350 360
         #
    
    351 361
         # Perform the action in the child process, this calls the action_cb.
    
    ... ... @@ -372,7 +382,7 @@ class Job():
    372 382
             # Set the global message handler in this child
    
    373 383
             # process to forward messages to the parent process
    
    374 384
             self._queue = queue
    
    375
    -        self._scheduler.context.set_message_handler(self._child_message_handler)
    
    385
    +        self._context.set_message_handler(self._child_message_handler)
    
    376 386
     
    
    377 387
             starttime = datetime.datetime.now()
    
    378 388
             stopped_time = None
    
    ... ... @@ -389,9 +399,10 @@ class Job():
    389 399
             # Time, log and and run the action function
    
    390 400
             #
    
    391 401
             with _signals.suspendable(stop_time, resume_time), \
    
    392
    -            self._scheduler.context.recorded_messages(self._logfile) as filename:
    
    402
    +            self._context.recorded_messages(self._logfile) as filename:
    
    393 403
     
    
    394
    -            self.message(MessageType.START, self.action_name, logfile=filename)
    
    404
    +            self._context._message(self.action_name, logfile=filename,
    
    405
    +                                   msg_type=MessageType.START, task_id=self._task_id)
    
    395 406
     
    
    396 407
                 try:
    
    397 408
                     # Try the task action
    
    ... ... @@ -401,13 +412,12 @@ class Job():
    401 412
                     self._retry_flag = e.temporary
    
    402 413
     
    
    403 414
                     if self._retry_flag and (self._tries <= self._max_retries):
    
    404
    -                    self.message(MessageType.FAIL,
    
    405
    -                                 "Try #{} failed, retrying".format(self._tries),
    
    406
    -                                 elapsed=elapsed)
    
    415
    +                    self._fail("Try #{} failed, retrying".format(self._tries),
    
    416
    +                                  elapsed=elapsed)
    
    407 417
                     else:
    
    408
    -                    self.message(MessageType.FAIL, str(e),
    
    409
    -                                 elapsed=elapsed, detail=e.detail,
    
    410
    -                                 logfile=filename, sandbox=e.sandbox)
    
    418
    +                    self._fail(str(e), elapsed=elapsed,
    
    419
    +                                  detail=e.detail, logfile=filename,
    
    420
    +                                  sandbox=e.sandbox)
    
    411 421
     
    
    412 422
                     self._queue.put(Envelope('child_data', self.child_process_data()))
    
    413 423
     
    
    ... ... @@ -427,9 +437,9 @@ class Job():
    427 437
                     elapsed = datetime.datetime.now() - starttime
    
    428 438
                     detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
    
    429 439
     
    
    430
    -                self.message(MessageType.BUG, self.action_name,
    
    431
    -                             elapsed=elapsed, detail=detail,
    
    432
    -                             logfile=filename)
    
    440
    +                self._context._message(self.action_name, elapsed=elapsed,
    
    441
    +                                       detail=detail, logfile=filename,
    
    442
    +                                       task_id=self._task_id, msg_type=MessageType.BUG)
    
    433 443
                     self._child_shutdown(RC_FAIL)
    
    434 444
     
    
    435 445
                 else:
    
    ... ... @@ -438,8 +448,10 @@ class Job():
    438 448
                     self._child_send_result(result)
    
    439 449
     
    
    440 450
                     elapsed = datetime.datetime.now() - starttime
    
    441
    -                self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
    
    442
    -                             logfile=filename)
    
    451
    +                self._context._message(self.action_name,
    
    452
    +                                       elapsed=elapsed, logfile=filename,
    
    453
    +                                       msg_type=MessageType.SUCCESS,
    
    454
    +                                       task_id=self._task_id)
    
    443 455
     
    
    444 456
                     # Shutdown needs to stay outside of the above context manager,
    
    445 457
                     # make sure we dont try to handle SIGTERM while the process
    
    ... ... @@ -574,7 +586,7 @@ class Job():
    574 586
             if envelope._message_type == 'message':
    
    575 587
                 # Propagate received messages from children
    
    576 588
                 # back through the context.
    
    577
    -            self._scheduler.context.message(envelope._message)
    
    589
    +            self._context._send_message(envelope._message)
    
    578 590
             elif envelope._message_type == 'error':
    
    579 591
                 # For regression tests only, save the last error domain / reason
    
    580 592
                 # reported from a child task in the main process, this global state
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -50,10 +50,10 @@ class BuildQueue(Queue):
    50 50
                 self._tried.add(element)
    
    51 51
                 _, description, detail = element._get_build_result()
    
    52 52
                 logfile = element._get_build_log()
    
    53
    -            self._message(element, MessageType.FAIL, description,
    
    54
    -                          detail=detail, action_name=self.action_name,
    
    55
    -                          elapsed=timedelta(seconds=0),
    
    56
    -                          logfile=logfile)
    
    53
    +            self._context._message(description, msg_type=MessageType.FAIL, plugin=element,
    
    54
    +                                   detail=detail, action_name=self.action_name,
    
    55
    +                                   elapsed=timedelta(seconds=0),
    
    56
    +                                   logfile=logfile)
    
    57 57
                 job = ElementJob(self._scheduler, self.action_name,
    
    58 58
                                  logfile, element=element, queue=self,
    
    59 59
                                  resources=self.resources,
    
    ... ... @@ -97,7 +97,7 @@ class BuildQueue(Queue):
    97 97
                 cache = element._get_artifact_cache()
    
    98 98
                 cache._add_artifact_size(artifact_size)
    
    99 99
     
    
    100
    -            if cache.get_approximate_cache_size() > self._scheduler.context.cache_quota:
    
    100
    +            if cache.get_approximate_cache_size() > cache.cache_quota:
    
    101 101
                     self._scheduler._check_cache_size_real()
    
    102 102
     
    
    103 103
         def done(self, job, element, result, success):
    

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -30,7 +30,6 @@ from ..resources import ResourceType
    30 30
     
    
    31 31
     # BuildStream toplevel imports
    
    32 32
     from ..._exceptions import BstError, set_last_task_error
    
    33
    -from ..._message import Message, MessageType
    
    34 33
     
    
    35 34
     
    
    36 35
     # Queue status for a given element
    
    ... ... @@ -72,6 +71,7 @@ class Queue():
    72 71
             # Private members
    
    73 72
             #
    
    74 73
             self._scheduler = scheduler
    
    74
    +        self._context = scheduler.context
    
    75 75
             self._wait_queue = deque()
    
    76 76
             self._done_queue = deque()
    
    77 77
             self._max_retries = 0
    
    ... ... @@ -274,17 +274,17 @@ class Queue():
    274 274
             # Handle any workspace modifications now
    
    275 275
             #
    
    276 276
             if workspace_dict:
    
    277
    -            context = element._get_context()
    
    278
    -            workspaces = context.get_workspaces()
    
    277
    +            workspaces = self._context.get_workspaces()
    
    279 278
                 if workspaces.update_workspace(element._get_full_name(), workspace_dict):
    
    280
    -                try:
    
    281
    -                    workspaces.save_config()
    
    282
    -                except BstError as e:
    
    283
    -                    self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
    
    284
    -                except Exception as e:   # pylint: disable=broad-except
    
    285
    -                    self._message(element, MessageType.BUG,
    
    286
    -                                  "Unhandled exception while saving workspaces",
    
    287
    -                                  detail=traceback.format_exc())
    
    279
    +                unique_id = element._get_unique_id()
    
    280
    +                with self._context.report_unhandled_exceptions("Unhandled exception while saving workspaces",
    
    281
    +                                                               unique_id=unique_id):
    
    282
    +                    try:
    
    283
    +                        workspaces.save_config()
    
    284
    +                    except BstError as e:
    
    285
    +                        self._context.error("Error saving workspaces",
    
    286
    +                                            detail=str(e),
    
    287
    +                                            plugin=unique_id)
    
    288 288
     
    
    289 289
         # _job_done()
    
    290 290
         #
    
    ... ... @@ -304,54 +304,43 @@ class Queue():
    304 304
             if job.child_data:
    
    305 305
                 element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
    
    306 306
     
    
    307
    -        # Give the result of the job to the Queue implementor,
    
    308
    -        # and determine if it should be considered as processed
    
    309
    -        # or skipped.
    
    310
    -        try:
    
    311
    -            processed = self.done(job, element, result, success)
    
    312
    -
    
    313
    -        except BstError as e:
    
    314
    -
    
    315
    -            # Report error and mark as failed
    
    316
    -            #
    
    317
    -            self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
    
    318
    -            self.failed_elements.append(element)
    
    319
    -
    
    320
    -            # Treat this as a task error as it's related to a task
    
    321
    -            # even though it did not occur in the task context
    
    322
    -            #
    
    323
    -            # This just allows us stronger testing capability
    
    324
    -            #
    
    325
    -            set_last_task_error(e.domain, e.reason)
    
    326
    -
    
    327
    -        except Exception as e:   # pylint: disable=broad-except
    
    328
    -
    
    329
    -            # Report unhandled exceptions and mark as failed
    
    330
    -            #
    
    331
    -            self._message(element, MessageType.BUG,
    
    332
    -                          "Unhandled exception in post processing",
    
    333
    -                          detail=traceback.format_exc())
    
    334
    -            self.failed_elements.append(element)
    
    335
    -        else:
    
    336
    -
    
    337
    -            # No exception occured, handle the success/failure state in the normal way
    
    338
    -            #
    
    339
    -            self._done_queue.append(job)
    
    340
    -
    
    341
    -            if success:
    
    342
    -                if processed:
    
    343
    -                    self.processed_elements.append(element)
    
    344
    -                else:
    
    345
    -                    self.skipped_elements.append(element)
    
    346
    -            else:
    
    307
    +        with self._context.report_unhandled_exceptions("Unhandled exception in post processing",
    
    308
    +                                                       unique_id=element._get_unique_id()):
    
    309
    +            # Give the result of the job to the Queue implementor,
    
    310
    +            # and determine if it should be considered as processed
    
    311
    +            # or skipped.
    
    312
    +            try:
    
    313
    +                processed = self.done(job, element, result, success)
    
    314
    +            except BstError as e:
    
    315
    +                # Report error and mark as failed
    
    316
    +                #
    
    317
    +                self._context.error("Post processing error",
    
    318
    +                                    plugin=element,
    
    319
    +                                    detail=str(e))
    
    347 320
                     self.failed_elements.append(element)
    
    348 321
     
    
    349
    -    # Convenience wrapper for Queue implementations to send
    
    350
    -    # a message for the element they are processing
    
    351
    -    def _message(self, element, message_type, brief, **kwargs):
    
    352
    -        context = element._get_context()
    
    353
    -        message = Message(element._get_unique_id(), message_type, brief, **kwargs)
    
    354
    -        context.message(message)
    
    322
    +                # Treat this as a task error as it's related to a task
    
    323
    +                # even though it did not occur in the task context
    
    324
    +                #
    
    325
    +                # This just allows us stronger testing capability
    
    326
    +                #
    
    327
    +                set_last_task_error(e.domain, e.reason)
    
    328
    +            except Exception:   # pylint: disable=broad-except
    
    329
    +                self.failed_elements.append(element)
    
    330
    +                # Intentional reraise for report_unhandled_exceptions() to log.
    
    331
    +                raise
    
    332
    +            else:
    
    333
    +                # No exception occured, handle the success/failure state in the normal way
    
    334
    +                #
    
    335
    +                self._done_queue.append(job)
    
    336
    +
    
    337
    +                if success:
    
    338
    +                    if processed:
    
    339
    +                        self.processed_elements.append(element)
    
    340
    +                    else:
    
    341
    +                        self.skipped_elements.append(element)
    
    342
    +                else:
    
    343
    +                    self.failed_elements.append(element)
    
    355 344
     
    
    356 345
         def _element_log_path(self, element):
    
    357 346
             project = element._get_project()
    

  • buildstream/_scheduler/scheduler.py
    ... ... @@ -29,6 +29,7 @@ from contextlib import contextmanager
    29 29
     # Local imports
    
    30 30
     from .resources import Resources, ResourceType
    
    31 31
     from .jobs import CacheSizeJob, CleanupJob
    
    32
    +from .._platform import Platform
    
    32 33
     
    
    33 34
     
    
    34 35
     # A decent return code for Scheduler.run()
    
    ... ... @@ -316,7 +317,8 @@ class Scheduler():
    316 317
             self._sched()
    
    317 318
     
    
    318 319
         def _run_cleanup(self, cache_size):
    
    319
    -        if cache_size and cache_size < self.context.cache_quota:
    
    320
    +        platform = Platform.get_platform()
    
    321
    +        if cache_size and cache_size < platform.artifactcache.cache_quota:
    
    320 322
                 return
    
    321 323
     
    
    322 324
             job = CleanupJob(self, 'cleanup', 'cleanup',
    

  • buildstream/_stream.py
    ... ... @@ -29,7 +29,7 @@ from contextlib import contextmanager
    29 29
     from tempfile import TemporaryDirectory
    
    30 30
     
    
    31 31
     from ._exceptions import StreamError, ImplError, BstError, set_last_task_error
    
    32
    -from ._message import Message, MessageType
    
    32
    +from ._message import MessageType
    
    33 33
     from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
    
    34 34
     from ._pipeline import Pipeline, PipelineSelection
    
    35 35
     from ._platform import Platform
    
    ... ... @@ -512,7 +512,7 @@ class Stream():
    512 512
                     target._open_workspace()
    
    513 513
     
    
    514 514
             workspaces.save_config()
    
    515
    -        self._message(MessageType.INFO, "Saved workspace configuration")
    
    515
    +        self._context.info("Saved workspace configuration")
    
    516 516
     
    
    517 517
         # workspace_close
    
    518 518
         #
    
    ... ... @@ -539,7 +539,7 @@ class Stream():
    539 539
             # Delete the workspace and save the configuration
    
    540 540
             workspaces.delete_workspace(element_name)
    
    541 541
             workspaces.save_config()
    
    542
    -        self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
    
    542
    +        self._context.info("Closed workspace for {}".format(element_name))
    
    543 543
     
    
    544 544
         # workspace_reset
    
    545 545
         #
    
    ... ... @@ -580,8 +580,8 @@ class Stream():
    580 580
                 workspace_path = workspace.get_absolute_path()
    
    581 581
                 if soft:
    
    582 582
                     workspace.prepared = False
    
    583
    -                self._message(MessageType.INFO, "Reset workspace state for {} at: {}"
    
    584
    -                              .format(element.name, workspace_path))
    
    583
    +                self._context.info("Reset workspace state for {} at: {}"
    
    584
    +                                   .format(element.name, workspace.path))
    
    585 585
                     continue
    
    586 586
     
    
    587 587
                 with element.timed_activity("Removing workspace directory {}"
    
    ... ... @@ -598,9 +598,8 @@ class Stream():
    598 598
                 with element.timed_activity("Staging sources to {}".format(workspace_path)):
    
    599 599
                     element._open_workspace()
    
    600 600
     
    
    601
    -            self._message(MessageType.INFO,
    
    602
    -                          "Reset workspace for {} at: {}".format(element.name,
    
    603
    -                                                                 workspace_path))
    
    601
    +            self._context.info("Reset workspace for {} at: {}"
    
    602
    +                               .format(element.name, workspace._path))
    
    604 603
     
    
    605 604
             workspaces.save_config()
    
    606 605
     
    
    ... ... @@ -676,7 +675,7 @@ class Stream():
    676 675
             # source-bundle only supports one target
    
    677 676
             target = self.targets[0]
    
    678 677
     
    
    679
    -        self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
    
    678
    +        self._context.info("Bundling sources for target {}".format(target.name))
    
    680 679
     
    
    681 680
             # Find the correct filename for the compression algorithm
    
    682 681
             tar_location = os.path.join(directory, target.normal_name + ".tar")
    
    ... ... @@ -958,15 +957,6 @@ class Stream():
    958 957
     
    
    959 958
             return selected, track_selected
    
    960 959
     
    
    961
    -    # _message()
    
    962
    -    #
    
    963
    -    # Local message propagator
    
    964
    -    #
    
    965
    -    def _message(self, message_type, message, **kwargs):
    
    966
    -        args = dict(kwargs)
    
    967
    -        self._context.message(
    
    968
    -            Message(None, message_type, message, **args))
    
    969
    -
    
    970 960
         # _add_queue()
    
    971 961
         #
    
    972 962
         # Adds a queue to the stream
    
    ... ... @@ -1011,16 +1001,19 @@ class Stream():
    1011 1001
     
    
    1012 1002
             _, status = self._scheduler.run(self.queues)
    
    1013 1003
     
    
    1014
    -        # Force update element states after a run, such that the summary
    
    1015
    -        # is more coherent
    
    1016
    -        try:
    
    1017
    -            for element in self.total_elements:
    
    1018
    -                element._update_state()
    
    1019
    -        except BstError as e:
    
    1020
    -            self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
    
    1021
    -            set_last_task_error(e.domain, e.reason)
    
    1022
    -        except Exception as e:   # pylint: disable=broad-except
    
    1023
    -            self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
    
    1004
    +        element = None
    
    1005
    +
    
    1006
    +        # Handle unhandled exceptions
    
    1007
    +        with self._context.report_unhandled_exceptions("Unhandled exception while resolving final state",
    
    1008
    +                                                       unique_id=element):
    
    1009
    +            # Force update element states after a run, such that the summary
    
    1010
    +            # is more coherent
    
    1011
    +            try:
    
    1012
    +                for element in self.total_elements:
    
    1013
    +                    element._update_state()
    
    1014
    +            except BstError as e:
    
    1015
    +                self._context.error("Error resolving final state", detail=str(e))
    
    1016
    +                set_last_task_error(e.domain, e.reason)
    
    1024 1017
     
    
    1025 1018
             if status == SchedStatus.ERROR:
    
    1026 1019
                 raise StreamError()
    

  • buildstream/plugin.py
    ... ... @@ -117,7 +117,6 @@ from weakref import WeakValueDictionary
    117 117
     from . import _yaml
    
    118 118
     from . import utils
    
    119 119
     from ._exceptions import PluginError, ImplError
    
    120
    -from ._message import Message, MessageType
    
    121 120
     
    
    122 121
     
    
    123 122
     class Plugin():
    
    ... ... @@ -463,8 +462,7 @@ class Plugin():
    463 462
                brief (str): The brief message
    
    464 463
                detail (str): An optional detailed message, can be multiline output
    
    465 464
             """
    
    466
    -        if self.__context.log_debug:
    
    467
    -            self.__message(MessageType.DEBUG, brief, detail=detail)
    
    465
    +        self.__context.debug(brief, detail=detail, plugin=self)
    
    468 466
     
    
    469 467
         def status(self, brief, *, detail=None):
    
    470 468
             """Print a status message
    
    ... ... @@ -473,9 +471,9 @@ class Plugin():
    473 471
                brief (str): The brief message
    
    474 472
                detail (str): An optional detailed message, can be multiline output
    
    475 473
     
    
    476
    -        Note: Status messages tell about what a plugin is currently doing
    
    474
    +        Note: Status messages tell the user what a plugin is currently doing
    
    477 475
             """
    
    478
    -        self.__message(MessageType.STATUS, brief, detail=detail)
    
    476
    +        self.__context.status(brief, detail=detail, plugin=self)
    
    479 477
     
    
    480 478
         def info(self, brief, *, detail=None):
    
    481 479
             """Print an informative message
    
    ... ... @@ -487,7 +485,7 @@ class Plugin():
    487 485
             Note: Informative messages tell the user something they might want
    
    488 486
                   to know, like if refreshing an element caused it to change.
    
    489 487
             """
    
    490
    -        self.__message(MessageType.INFO, brief, detail=detail)
    
    488
    +        self.__context.info(brief, detail=detail, plugin=self)
    
    491 489
     
    
    492 490
         def warn(self, brief, *, detail=None, warning_token=None):
    
    493 491
             """Print a warning message, checks warning_token against project configuration
    
    ... ... @@ -510,7 +508,18 @@ class Plugin():
    510 508
                 if project._warning_is_fatal(warning_token):
    
    511 509
                     raise PluginError(message="{}\n{}".format(brief, detail), reason=warning_token)
    
    512 510
     
    
    513
    -        self.__message(MessageType.WARN, brief=brief, detail=detail)
    
    511
    +        self.__context.warn(brief, detail=detail, plugin=self)
    
    512
    +
    
    513
    +    def skipped(self, brief, *, detail=None):
    
    514
    +        """Prints a message indicating that an action has been skipped.
    
    515
    +
    
    516
    +        Args:
    
    517
    +           brief (str): The brief message
    
    518
    +           detail (str): An optional detailed message, can be multiline output
    
    519
    +
    
    520
    +        (*Since 1.4*)
    
    521
    +        """
    
    522
    +        self.__context.skipped(brief, detail=detail, plugin=self)
    
    514 523
     
    
    515 524
         def log(self, brief, *, detail=None):
    
    516 525
             """Log a message into the plugin's log file
    
    ... ... @@ -522,7 +531,7 @@ class Plugin():
    522 531
                brief (str): The brief message
    
    523 532
                detail (str): An optional detailed message, can be multiline output
    
    524 533
             """
    
    525
    -        self.__message(MessageType.LOG, brief, detail=detail)
    
    534
    +        self.__context.log(brief, detail=detail, plugin=self)
    
    526 535
     
    
    527 536
         @contextmanager
    
    528 537
         def timed_activity(self, activity_name, *, detail=None, silent_nested=False):
    
    ... ... @@ -718,14 +727,9 @@ class Plugin():
    718 727
     
    
    719 728
             return (exit_code, output)
    
    720 729
     
    
    721
    -    def __message(self, message_type, brief, **kwargs):
    
    722
    -        message = Message(self.__unique_id, message_type, brief, **kwargs)
    
    723
    -        self.__context.message(message)
    
    724
    -
    
    725 730
         def __note_command(self, output, *popenargs, **kwargs):
    
    726
    -        workdir = os.getcwd()
    
    727
    -        if 'cwd' in kwargs:
    
    728
    -            workdir = kwargs['cwd']
    
    731
    +        workdir = kwargs.get("cwd", os.getcwd())
    
    732
    +
    
    729 733
             command = " ".join(popenargs[0])
    
    730 734
             output.write('Running host command {}: {}\n'.format(workdir, command))
    
    731 735
             output.flush()
    

  • buildstream/utils.py
    ... ... @@ -966,6 +966,17 @@ def _tempdir(suffix="", prefix="tmp", dir=None): # pylint: disable=redefined-bu
    966 966
             cleanup_tempdir()
    
    967 967
     
    
    968 968
     
    
    969
    +# _none_context()
    
    970
    +#
    
    971
    +# An empty context, useful for optional contexts e.g.
    
    972
    +#
    
    973
    +# with (_tempdir() if <value> else _none_context())
    
    974
    +#
    
    975
    +@contextmanager
    
    976
    +def _none_context():
    
    977
    +    yield
    
    978
    +
    
    979
    +
    
    969 980
     # _kill_process_tree()
    
    970 981
     #
    
    971 982
     # Brutally murder a process and all of it's children
    

  • tests/artifactcache/cache_size.py
    1
    +import os
    
    2
    +import pytest
    
    3
    +
    
    4
    +from buildstream import _yaml
    
    5
    +from buildstream._artifactcache import CACHE_SIZE_FILE
    
    6
    +
    
    7
    +from tests.testutils import cli, create_element_size
    
    8
    +
    
    9
    +# XXX: Currently lacking:
    
    10
    +#      * A way to check whether it's faster to read cache size on
    
    11
    +#        successive invocations.
    
    12
    +#      * A way to check whether the cache size file has been read.
    
    13
    +
    
    14
    +
    
    15
    +def create_project(project_dir):
    
    16
    +    project_file = os.path.join(project_dir, "project.conf")
    
    17
    +    project_conf = {
    
    18
    +        "name": "test"
    
    19
    +    }
    
    20
    +    _yaml.dump(project_conf, project_file)
    
    21
    +    element_name = "test.bst"
    
    22
    +    create_element_size(element_name, project_dir, ".", [], 1024)
    
    23
    +
    
    24
    +
    
    25
    +def test_cache_size_roundtrip(cli, tmpdir):
    
    26
    +    # Builds (to put files in the cache), then invokes buildstream again
    
    27
    +    # to check nothing breaks
    
    28
    +
    
    29
    +    # Create project
    
    30
    +    project_dir = str(tmpdir)
    
    31
    +    create_project(project_dir)
    
    32
    +
    
    33
    +    # Build, to populate the cache
    
    34
    +    res = cli.run(project=project_dir, args=["build", "test.bst"])
    
    35
    +    res.assert_success()
    
    36
    +
    
    37
    +    # Show, to check that nothing breaks while reading cache size
    
    38
    +    res = cli.run(project=project_dir, args=["show", "test.bst"])
    
    39
    +    res.assert_success()
    
    40
    +
    
    41
    +
    
    42
    +def test_cache_size_write(cli, tmpdir):
    
    43
    +    # Builds (to put files in the cache), then checks a number is
    
    44
    +    # written to the cache size file.
    
    45
    +
    
    46
    +    project_dir = str(tmpdir)
    
    47
    +    create_project(project_dir)
    
    48
    +
    
    49
    +    # Artifact cache must be in a known place
    
    50
    +    artifactdir = os.path.join(project_dir, "artifacts")
    
    51
    +    cli.configure({"artifactdir": artifactdir})
    
    52
    +
    
    53
    +    # Build, to populate the cache
    
    54
    +    res = cli.run(project=project_dir, args=["build", "test.bst"])
    
    55
    +    res.assert_success()
    
    56
    +
    
    57
    +    # Inspect the artifact cache
    
    58
    +    sizefile = os.path.join(artifactdir, CACHE_SIZE_FILE)
    
    59
    +    assert os.path.isfile(sizefile)
    
    60
    +    with open(sizefile, "r") as f:
    
    61
    +        size_data = f.read()
    
    62
    +    size = int(size_data)

  • tests/testutils/artifactshare.py
    ... ... @@ -140,6 +140,7 @@ class ArtifactShare():
    140 140
     
    
    141 141
             return statvfs_result(f_blocks=self.total_space,
    
    142 142
                                   f_bfree=self.free_space - repo_size,
    
    143
    +                              f_bavail=self.free_space - repo_size,
    
    143 144
                                   f_bsize=1)
    
    144 145
     
    
    145 146
     
    
    ... ... @@ -156,4 +157,4 @@ def create_artifact_share(directory, *, total_space=None, free_space=None):
    156 157
             share.close()
    
    157 158
     
    
    158 159
     
    
    159
    -statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize')
    160
    +statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize f_bavail')



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]