[Notes] [Git][BuildStream/buildstream][Qinusty/message-helpers] 2 commits: Overhaul internal messaging API



Title: GitLab

Tristan Maat pushed to branch Qinusty/message-helpers at BuildStream / buildstream

Commits:

11 changed files:

Changes:

  • buildstream/_artifactcache/artifactcache.py
    ... ... @@ -24,7 +24,6 @@ from collections.abc import Mapping
    24 24
     
    
    25 25
     from ..types import _KeyStrength
    
    26 26
     from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
    
    27
    -from .._message import Message, MessageType
    
    28 27
     from .. import utils
    
    29 28
     from .. import _yaml
    
    30 29
     
    
    ... ... @@ -589,15 +588,6 @@ class ArtifactCache():
    589 588
         #               Local Private Methods          #
    
    590 589
         ################################################
    
    591 590
     
    
    592
    -    # _message()
    
    593
    -    #
    
    594
    -    # Local message propagator
    
    595
    -    #
    
    596
    -    def _message(self, message_type, message, **kwargs):
    
    597
    -        args = dict(kwargs)
    
    598
    -        self.context.message(
    
    599
    -            Message(None, message_type, message, **args))
    
    600
    -
    
    601 591
         # _set_remotes():
    
    602 592
         #
    
    603 593
         # Set the list of remote caches. If project is None, the global list of
    
    ... ... @@ -621,7 +611,7 @@ class ArtifactCache():
    621 611
         #
    
    622 612
         def _initialize_remotes(self):
    
    623 613
             def remote_failed(url, error):
    
    624
    -            self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error))
    
    614
    +            self.context.warn("Failed to fetch remote refs from {}: {}".format(url, error))
    
    625 615
     
    
    626 616
             with self.context.timed_activity("Initializing remote caches", silent_nested=True):
    
    627 617
                 self.initialize_remotes(on_failure=remote_failed)
    

  • buildstream/_context.py
    ... ... @@ -27,6 +27,7 @@ from . import _cachekey
    27 27
     from . import _signals
    
    28 28
     from . import _site
    
    29 29
     from . import _yaml
    
    30
    +from .plugin import Plugin
    
    30 31
     from ._exceptions import LoadError, LoadErrorReason, BstError
    
    31 32
     from ._message import Message, MessageType
    
    32 33
     from ._profile import Topics, profile_start, profile_end
    
    ... ... @@ -326,7 +327,7 @@ class Context():
    326 327
         # the context.
    
    327 328
         #
    
    328 329
         # The message handler should have the same signature as
    
    329
    -    # the message() method
    
    330
    +    # the _send_message() method
    
    330 331
         def set_message_handler(self, handler):
    
    331 332
             self._message_handler = handler
    
    332 333
     
    
    ... ... @@ -341,16 +342,15 @@ class Context():
    341 342
                     return True
    
    342 343
             return False
    
    343 344
     
    
    344
    -    # message():
    
    345
    +    # _send_message():
    
    345 346
         #
    
    346
    -    # Proxies a message back to the caller, this is the central
    
    347
    +    # Proxies a message back through the message handler, this is the central
    
    347 348
         # point through which all messages pass.
    
    348 349
         #
    
    349 350
         # Args:
    
    350 351
         #    message: A Message object
    
    351 352
         #
    
    352
    -    def message(self, message):
    
    353
    -
    
    353
    +    def _send_message(self, message):
    
    354 354
             # Tag message only once
    
    355 355
             if message.depth is None:
    
    356 356
                 message.depth = len(list(self._message_depth))
    
    ... ... @@ -366,6 +366,87 @@ class Context():
    366 366
             self._message_handler(message, context=self)
    
    367 367
             return
    
    368 368
     
    
    369
    +    # message():
    
    370
    +    #
    
    371
    +    # The global message API. Any message-sending functions should go
    
    372
    +    # through here. This will call `_send_message` to deliver the
    
    373
    +    # final message.
    
    374
    +    #
    
    375
    +    # Args:
    
    376
    +    #     text (str): The text of the message.
    
    377
    +    #
    
    378
    +    # Kwargs:
    
    379
    +    #     msg_type (MessageType): The type of the message (required).
    
    380
    +    #     plugin (Plugin|str|None): The id of the plugin
    
    381
    +    #                               (i.e. Element, Source subclass
    
    382
    +    #                               instance) sending the message. If
    
    383
    +    #                               a plugin is given, this will be
    
    384
    +    #                               determined automatically, if
    
    385
    +    #                               omitted the message will be sent
    
    386
    +    #                               without a plugin context.
    
    387
    +    #
    
    388
    +    #    For other kwargs, see `Message`.
    
    389
    +    #
    
    390
    +    def message(self, text, *, plugin=None, msg_type=None, **kwargs):
    
    391
    +        assert msg_type is not None
    
    392
    +
    
    393
    +        if isinstance(plugin, Plugin):
    
    394
    +            plugin_id = plugin._get_unique_id()
    
    395
    +        else:
    
    396
    +            plugin_id = plugin
    
    397
    +
    
    398
    +        self._send_message(Message(plugin_id, msg_type, str(text), **kwargs))
    
    399
    +
    
    400
    +    # skipped():
    
    401
    +    #
    
    402
    +    # Produce and send a skipped message through the context.
    
    403
    +    #
    
    404
    +    def skipped(self, text, **kwargs):
    
    405
    +        self.message(text, msg_type=MessageType.SKIPPED, **kwargs)
    
    406
    +
    
    407
    +    # debug():
    
    408
    +    #
    
    409
    +    # Produce and send a debug message through the context.
    
    410
    +    #
    
    411
    +    def debug(self, text, **kwargs):
    
    412
    +        if self.log_debug:
    
    413
    +            self.message(text, msg_type=MessageType.DEBUG, **kwargs)
    
    414
    +
    
    415
    +    # status():
    
    416
    +    #
    
    417
    +    # Produce and send a status message through the context.
    
    418
    +    #
    
    419
    +    def status(self, text, **kwargs):
    
    420
    +        self.message(text, msg_type=MessageType.STATUS, **kwargs)
    
    421
    +
    
    422
    +    # info():
    
    423
    +    #
    
    424
    +    # Produce and send a info message through the context.
    
    425
    +    #
    
    426
    +    def info(self, text, **kwargs):
    
    427
    +        self.message(text, msg_type=MessageType.INFO, **kwargs)
    
    428
    +
    
    429
    +    # warn():
    
    430
    +    #
    
    431
    +    # Produce and send a warning message through the context.
    
    432
    +    #
    
    433
    +    def warn(self, text, **kwargs):
    
    434
    +        self.message(text, msg_type=MessageType.WARN, **kwargs)
    
    435
    +
    
    436
    +    # error():
    
    437
    +    #
    
    438
    +    # Produce and send a error message through the context.
    
    439
    +    #
    
    440
    +    def error(self, text, **kwargs):
    
    441
    +        self.message(text, msg_type=MessageType.ERROR, **kwargs)
    
    442
    +
    
    443
    +    # log():
    
    444
    +    #
    
    445
    +    # Produce and send a log message through the context.
    
    446
    +    #
    
    447
    +    def log(self, text, **kwargs):
    
    448
    +        self.message(text, msg_type=MessageType.LOG, **kwargs)
    
    449
    +
    
    369 450
         # silence()
    
    370 451
         #
    
    371 452
         # A context manager to silence messages, this behaves in
    
    ... ... @@ -410,8 +491,8 @@ class Context():
    410 491
             with _signals.suspendable(stop_time, resume_time):
    
    411 492
                 try:
    
    412 493
                     # Push activity depth for status messages
    
    413
    -                message = Message(unique_id, MessageType.START, activity_name, detail=detail)
    
    414
    -                self.message(message)
    
    494
    +                self.message(activity_name, detail=detail, plugin=unique_id,
    
    495
    +                             msg_type=MessageType.START)
    
    415 496
                     self._push_message_depth(silent_nested)
    
    416 497
                     yield
    
    417 498
     
    
    ... ... @@ -419,15 +500,16 @@ class Context():
    419 500
                     # Note the failure in status messages and reraise, the scheduler
    
    420 501
                     # expects an error when there is an error.
    
    421 502
                     elapsed = datetime.datetime.now() - starttime
    
    422
    -                message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
    
    423 503
                     self._pop_message_depth()
    
    424
    -                self.message(message)
    
    504
    +                self.message(activity_name, detail=detail, elapsed=elapsed, plugin=unique_id,
    
    505
    +                             msg_type=MessageType.FAIL)
    
    425 506
                     raise
    
    426 507
     
    
    427 508
                 elapsed = datetime.datetime.now() - starttime
    
    428
    -            message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
    
    429 509
                 self._pop_message_depth()
    
    430
    -            self.message(message)
    
    510
    +            self.message(activity_name, detail=detail,
    
    511
    +                         elapsed=elapsed, plugin=unique_id,
    
    512
    +                         msg_type=MessageType.SUCCESS)
    
    431 513
     
    
    432 514
         # recorded_messages()
    
    433 515
         #
    

  • buildstream/_frontend/app.py
    ... ... @@ -35,7 +35,7 @@ from .._context import Context
    35 35
     from .._platform import Platform
    
    36 36
     from .._project import Project
    
    37 37
     from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
    
    38
    -from .._message import Message, MessageType, unconditional_messages
    
    38
    +from .._message import MessageType, unconditional_messages
    
    39 39
     from .._stream import Stream
    
    40 40
     from .._versions import BST_FORMAT_VERSION
    
    41 41
     from .. import _yaml
    
    ... ... @@ -251,7 +251,7 @@ class App():
    251 251
     
    
    252 252
             # Mark the beginning of the session
    
    253 253
             if session_name:
    
    254
    -            self._message(MessageType.START, session_name)
    
    254
    +            self.context.message(session_name, msg_type=MessageType.START)
    
    255 255
     
    
    256 256
             # Run the body of the session here, once everything is loaded
    
    257 257
             try:
    
    ... ... @@ -263,9 +263,9 @@ class App():
    263 263
                     elapsed = self.stream.elapsed_time
    
    264 264
     
    
    265 265
                     if isinstance(e, StreamError) and e.terminated:  # pylint: disable=no-member
    
    266
    -                    self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
    
    266
    +                    self.context.warn(session_name + ' Terminated', elapsed=elapsed)
    
    267 267
                     else:
    
    268
    -                    self._message(MessageType.FAIL, session_name, elapsed=elapsed)
    
    268
    +                    self.context.message(session_name, elapsed=elapsed, msg_type=MessageType.FAIL)
    
    269 269
     
    
    270 270
                         # Notify session failure
    
    271 271
                         self._notify("{} failed".format(session_name), "{}".format(e))
    
    ... ... @@ -283,7 +283,9 @@ class App():
    283 283
             else:
    
    284 284
                 # No exceptions occurred, print session time and summary
    
    285 285
                 if session_name:
    
    286
    -                self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
    
    286
    +                self.context.message(session_name,
    
    287
    +                                     elapsed=self.stream.elapsed_time,
    
    288
    +                                     msg_type=MessageType.SUCCESS)
    
    287 289
                     if self._started:
    
    288 290
                         self._print_summary()
    
    289 291
     
    
    ... ... @@ -429,21 +431,13 @@ class App():
    429 431
             if self.interactive:
    
    430 432
                 self.notify(title, text)
    
    431 433
     
    
    432
    -    # Local message propagator
    
    433
    -    #
    
    434
    -    def _message(self, message_type, message, **kwargs):
    
    435
    -        args = dict(kwargs)
    
    436
    -        self.context.message(
    
    437
    -            Message(None, message_type, message, **args))
    
    438
    -
    
    439 434
         # Exception handler
    
    440 435
         #
    
    441 436
         def _global_exception_handler(self, etype, value, tb):
    
    442 437
     
    
    443 438
             # Print the regular BUG message
    
    444 439
             formatted = "".join(traceback.format_exception(etype, value, tb))
    
    445
    -        self._message(MessageType.BUG, str(value),
    
    446
    -                      detail=formatted)
    
    440
    +        self.context.message(value, detail=formatted, msg_type=MessageType.BUG)
    
    447 441
     
    
    448 442
             # If the scheduler has started, try to terminate all jobs gracefully,
    
    449 443
             # otherwise exit immediately.
    

  • buildstream/_pipeline.py
    ... ... @@ -24,7 +24,6 @@ import itertools
    24 24
     from operator import itemgetter
    
    25 25
     
    
    26 26
     from ._exceptions import PipelineError
    
    27
    -from ._message import Message, MessageType
    
    28 27
     from ._profile import Topics, profile_start, profile_end
    
    29 28
     from . import Scope, Consistency
    
    30 29
     from ._project import ProjectRefStorage
    
    ... ... @@ -201,8 +200,8 @@ class Pipeline():
    201 200
                 for t in targets:
    
    202 201
                     new_elm = t._get_source_element()
    
    203 202
                     if new_elm != t and not silent:
    
    204
    -                    self._message(MessageType.INFO, "Element '{}' redirected to '{}'"
    
    205
    -                                  .format(t.name, new_elm.name))
    
    203
    +                    self._context.info("Element '{}' redirected to '{}'"
    
    204
    +                                       .format(t.name, new_elm.name))
    
    206 205
                     if new_elm not in elements:
    
    207 206
                         elements.append(new_elm)
    
    208 207
             elif mode == PipelineSelection.PLAN:
    
    ... ... @@ -433,15 +432,6 @@ class Pipeline():
    433 432
     
    
    434 433
                     raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
    
    435 434
     
    
    436
    -    # _message()
    
    437
    -    #
    
    438
    -    # Local message propagator
    
    439
    -    #
    
    440
    -    def _message(self, message_type, message, **kwargs):
    
    441
    -        args = dict(kwargs)
    
    442
    -        self._context.message(
    
    443
    -            Message(None, message_type, message, **args))
    
    444
    -
    
    445 435
     
    
    446 436
     # _Planner()
    
    447 437
     #
    

  • buildstream/_project.py
    ... ... @@ -37,7 +37,6 @@ from ._projectrefs import ProjectRefs, ProjectRefStorage
    37 37
     from ._versions import BST_FORMAT_VERSION
    
    38 38
     from ._loader import Loader
    
    39 39
     from .element import Element
    
    40
    -from ._message import Message, MessageType
    
    41 40
     from ._includes import Includes
    
    42 41
     from ._platform import Platform
    
    43 42
     
    
    ... ... @@ -337,8 +336,7 @@ class Project():
    337 336
                     for source, ref in redundant_refs
    
    338 337
                 ]
    
    339 338
                 detail += "\n".join(lines)
    
    340
    -            self._context.message(
    
    341
    -                Message(None, MessageType.WARN, "Ignoring redundant source references", detail=detail))
    
    339
    +            self._context.warn("Ignoring redundant source references", detail=detail)
    
    342 340
     
    
    343 341
             return elements
    
    344 342
     
    
    ... ... @@ -514,13 +512,9 @@ class Project():
    514 512
     
    
    515 513
             # Deprecation check
    
    516 514
             if fail_on_overlap is not None:
    
    517
    -            self._context.message(
    
    518
    -                Message(
    
    519
    -                    None,
    
    520
    -                    MessageType.WARN,
    
    521
    -                    "Use of fail-on-overlap within project.conf " +
    
    522
    -                    "is deprecated. Consider using fatal-warnings instead."
    
    523
    -                )
    
    515
    +            self._context.warn(
    
    516
    +                "Use of fail-on-overlap within project.conf " +
    
    517
    +                "is deprecated. Consider using fatal-warnings instead."
    
    524 518
                 )
    
    525 519
     
    
    526 520
             # Load project.refs if it exists, this may be ignored.
    

  • buildstream/_scheduler/jobs/elementjob.py
    ... ... @@ -18,8 +18,6 @@
    18 18
     #
    
    19 19
     from ruamel import yaml
    
    20 20
     
    
    21
    -from ..._message import Message, MessageType
    
    22
    -
    
    23 21
     from .job import Job
    
    24 22
     
    
    25 23
     
    
    ... ... @@ -86,9 +84,8 @@ class ElementJob(Job):
    86 84
             # This should probably be omitted for non-build tasks but it's harmless here
    
    87 85
             elt_env = self._element.get_environment()
    
    88 86
             env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
    
    89
    -        self.message(MessageType.LOG,
    
    90
    -                     "Build environment for element {}".format(self._element.name),
    
    91
    -                     detail=env_dump)
    
    87
    +        self._log("Build environment for element {}".format(self._element.name),
    
    88
    +                  detail=env_dump, plugin=self.element, scheduler=True)
    
    92 89
     
    
    93 90
             # Run the action
    
    94 91
             return self._action_cb(self._element)
    
    ... ... @@ -96,15 +93,6 @@ class ElementJob(Job):
    96 93
         def parent_complete(self, success, result):
    
    97 94
             self._complete_cb(self, self._element, success, self._result)
    
    98 95
     
    
    99
    -    def message(self, message_type, message, **kwargs):
    
    100
    -        args = dict(kwargs)
    
    101
    -        args['scheduler'] = True
    
    102
    -        self._scheduler.context.message(
    
    103
    -            Message(self._element._get_unique_id(),
    
    104
    -                    message_type,
    
    105
    -                    message,
    
    106
    -                    **args))
    
    107
    -
    
    108 96
         def child_process_data(self):
    
    109 97
             data = {}
    
    110 98
     
    
    ... ... @@ -113,3 +101,10 @@ class ElementJob(Job):
    113 101
                 data['workspace'] = workspace.to_dict()
    
    114 102
     
    
    115 103
             return data
    
    104
    +
    
    105
    +    # _fail()
    
    106
    +    #
    
    107
    +    # Override _fail to set scheduler kwarg to true.
    
    108
    +    #
    
    109
    +    def _fail(self, text, **kwargs):
    
    110
    +        super()._fail(text, scheduler=True, **kwargs)

  • buildstream/_scheduler/jobs/job.py
    ... ... @@ -32,7 +32,7 @@ import psutil
    32 32
     
    
    33 33
     # BuildStream toplevel imports
    
    34 34
     from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
    
    35
    -from ..._message import Message, MessageType, unconditional_messages
    
    35
    +from ..._message import MessageType, unconditional_messages
    
    36 36
     from ... import _signals, utils
    
    37 37
     
    
    38 38
     # Return code values shutdown of job handling child processes
    
    ... ... @@ -110,6 +110,7 @@ class Job():
    110 110
             # Private members
    
    111 111
             #
    
    112 112
             self._scheduler = scheduler            # The scheduler
    
    113
    +        self._context = scheduler.context      # The context, used primarily for UI messaging.
    
    113 114
             self._queue = None                     # A message passing queue
    
    114 115
             self._process = None                   # The Process object
    
    115 116
             self._watcher = None                   # Child process watcher
    
    ... ... @@ -184,7 +185,7 @@ class Job():
    184 185
             # First resume the job if it's suspended
    
    185 186
             self.resume(silent=True)
    
    186 187
     
    
    187
    -        self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
    
    188
    +        self._status("{} terminating".format(self.action_name))
    
    188 189
     
    
    189 190
             # Make sure there is no garbage on the queue
    
    190 191
             self._parent_stop_listening()
    
    ... ... @@ -217,8 +218,8 @@ class Job():
    217 218
         def kill(self):
    
    218 219
     
    
    219 220
             # Force kill
    
    220
    -        self.message(MessageType.WARN,
    
    221
    -                     "{} did not terminate gracefully, killing".format(self.action_name))
    
    221
    +        self._warn("{} did not terminate gracefully, killing"
    
    222
    +                   .format(self.action_name))
    
    222 223
     
    
    223 224
             try:
    
    224 225
                 utils._kill_process_tree(self._process.pid)
    
    ... ... @@ -233,8 +234,7 @@ class Job():
    233 234
         #
    
    234 235
         def suspend(self):
    
    235 236
             if not self._suspended:
    
    236
    -            self.message(MessageType.STATUS,
    
    237
    -                         "{} suspending".format(self.action_name))
    
    237
    +            self._status("{} suspending".format(self.action_name))
    
    238 238
     
    
    239 239
                 try:
    
    240 240
                     # Use SIGTSTP so that child processes may handle and propagate
    
    ... ... @@ -258,8 +258,7 @@ class Job():
    258 258
         def resume(self, silent=False):
    
    259 259
             if self._suspended:
    
    260 260
                 if not silent and not self._scheduler.terminated:
    
    261
    -                self.message(MessageType.STATUS,
    
    262
    -                             "{} resuming".format(self.action_name))
    
    261
    +                self._status("{} resuming".format(self.action_name))
    
    263 262
     
    
    264 263
                 os.kill(self._process.pid, signal.SIGCONT)
    
    265 264
                 self._suspended = False
    
    ... ... @@ -324,21 +323,6 @@ class Job():
    324 323
             raise ImplError("Job '{kind}' does not implement child_process()"
    
    325 324
                             .format(kind=type(self).__name__))
    
    326 325
     
    
    327
    -    # message():
    
    328
    -    #
    
    329
    -    # Logs a message, this will be logged in the task's logfile and
    
    330
    -    # conditionally also be sent to the frontend.
    
    331
    -    #
    
    332
    -    # Args:
    
    333
    -    #    message_type (MessageType): The type of message to send
    
    334
    -    #    message (str): The message
    
    335
    -    #    kwargs: Remaining Message() constructor arguments
    
    336
    -    #
    
    337
    -    def message(self, message_type, message, **kwargs):
    
    338
    -        args = dict(kwargs)
    
    339
    -        args['scheduler'] = True
    
    340
    -        self._scheduler.context.message(Message(None, message_type, message, **args))
    
    341
    -
    
    342 326
         # child_process_data()
    
    343 327
         #
    
    344 328
         # Abstract method to retrieve additional data that should be
    
    ... ... @@ -365,6 +349,32 @@ class Job():
    365 349
         #
    
    366 350
         #######################################################
    
    367 351
     
    
    352
    +    def _debug(self, text, **kwargs):
    
    353
    +        self._context.debug(text, task_id=self._task_id, **kwargs)
    
    354
    +
    
    355
    +    def _status(self, text, **kwargs):
    
    356
    +        self._context.status(text, task_id=self._task_id, **kwargs)
    
    357
    +
    
    358
    +    def _info(self, text, **kwargs):
    
    359
    +        self._context.info(text, task_id=self._task_id, **kwargs)
    
    360
    +
    
    361
    +    def _warn(self, text, **kwargs):
    
    362
    +        self._context.warn(text, task_id=self._task_id, **kwargs)
    
    363
    +
    
    364
    +    def _error(self, text, **kwargs):
    
    365
    +        self._context.error(text, task_id=self._task_id, **kwargs)
    
    366
    +
    
    367
    +    def _log(self, text, **kwargs):
    
    368
    +        self._context.log(text, task_id=self._task_id, **kwargs)
    
    369
    +
    
    370
    +    # _fail()
    
    371
    +    #
    
    372
    +    # Only exists for sub classes to override and add kwargs to.
    
    373
    +    #
    
    374
    +    def _fail(self, text, **kwargs):
    
    375
    +        self._context.message(text, task_id=self._task_id,
    
    376
    +                              msg_type=MessageType.FAIL, **kwargs)
    
    377
    +
    
    368 378
         # _child_action()
    
    369 379
         #
    
    370 380
         # Perform the action in the child process, this calls the action_cb.
    
    ... ... @@ -391,7 +401,7 @@ class Job():
    391 401
             # Set the global message handler in this child
    
    392 402
             # process to forward messages to the parent process
    
    393 403
             self._queue = queue
    
    394
    -        self._scheduler.context.set_message_handler(self._child_message_handler)
    
    404
    +        self._context.set_message_handler(self._child_message_handler)
    
    395 405
     
    
    396 406
             starttime = datetime.datetime.now()
    
    397 407
             stopped_time = None
    
    ... ... @@ -408,17 +418,17 @@ class Job():
    408 418
             # Time, log and and run the action function
    
    409 419
             #
    
    410 420
             with _signals.suspendable(stop_time, resume_time), \
    
    411
    -            self._scheduler.context.recorded_messages(self._logfile) as filename:
    
    421
    +            self._context.recorded_messages(self._logfile) as filename:
    
    412 422
     
    
    413
    -            self.message(MessageType.START, self.action_name, logfile=filename)
    
    423
    +            self._context.message(self.action_name, logfile=filename,
    
    424
    +                                  msg_type=MessageType.START, task_id=self._task_id)
    
    414 425
     
    
    415 426
                 try:
    
    416 427
                     # Try the task action
    
    417 428
                     result = self.child_process()
    
    418 429
                 except SkipJob as e:
    
    419 430
                     elapsed = datetime.datetime.now() - starttime
    
    420
    -                self.message(MessageType.SKIPPED, str(e),
    
    421
    -                             elapsed=elapsed, logfile=filename)
    
    431
    +                self._context.skipped(e, elapsed=elapsed, logfile=filename)
    
    422 432
     
    
    423 433
                     # Alert parent of skip by return code
    
    424 434
                     self._child_shutdown(RC_SKIPPED)
    
    ... ... @@ -427,13 +437,11 @@ class Job():
    427 437
                     self._retry_flag = e.temporary
    
    428 438
     
    
    429 439
                     if self._retry_flag and (self._tries <= self._max_retries):
    
    430
    -                    self.message(MessageType.FAIL,
    
    431
    -                                 "Try #{} failed, retrying".format(self._tries),
    
    432
    -                                 elapsed=elapsed, logfile=filename)
    
    440
    +                    self._fail("Try #{} failed, retrying".format(self._tries),
    
    441
    +                               elapsed=elapsed, logfile=filename)
    
    433 442
                     else:
    
    434
    -                    self.message(MessageType.FAIL, str(e),
    
    435
    -                                 elapsed=elapsed, detail=e.detail,
    
    436
    -                                 logfile=filename, sandbox=e.sandbox)
    
    443
    +                    self._fail(e, elapsed=elapsed, detail=e.detail,
    
    444
    +                               logfile=filename, sandbox=e.sandbox)
    
    437 445
     
    
    438 446
                     self._queue.put(Envelope('child_data', self.child_process_data()))
    
    439 447
     
    
    ... ... @@ -453,9 +461,9 @@ class Job():
    453 461
                     elapsed = datetime.datetime.now() - starttime
    
    454 462
                     detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
    
    455 463
     
    
    456
    -                self.message(MessageType.BUG, self.action_name,
    
    457
    -                             elapsed=elapsed, detail=detail,
    
    458
    -                             logfile=filename)
    
    464
    +                self._context.message(self.action_name, elapsed=elapsed,
    
    465
    +                                      detail=detail, logfile=filename,
    
    466
    +                                      task_id=self._task_id, msg_type=MessageType.BUG)
    
    459 467
                     # Unhandled exceptions should permenantly fail
    
    460 468
                     self._child_shutdown(RC_PERM_FAIL)
    
    461 469
     
    
    ... ... @@ -465,8 +473,10 @@ class Job():
    465 473
                     self._child_send_result(result)
    
    466 474
     
    
    467 475
                     elapsed = datetime.datetime.now() - starttime
    
    468
    -                self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
    
    469
    -                             logfile=filename)
    
    476
    +                self._context.message(self.action_name,
    
    477
    +                                      elapsed=elapsed, logfile=filename,
    
    478
    +                                      msg_type=MessageType.SUCCESS,
    
    479
    +                                      task_id=self._task_id)
    
    470 480
     
    
    471 481
                     # Shutdown needs to stay outside of the above context manager,
    
    472 482
                     # make sure we dont try to handle SIGTERM while the process
    
    ... ... @@ -603,7 +613,7 @@ class Job():
    603 613
             if envelope._message_type == 'message':
    
    604 614
                 # Propagate received messages from children
    
    605 615
                 # back through the context.
    
    606
    -            self._scheduler.context.message(envelope._message)
    
    616
    +            self._context._send_message(envelope._message)
    
    607 617
             elif envelope._message_type == 'error':
    
    608 618
                 # For regression tests only, save the last error domain / reason
    
    609 619
                 # reported from a child task in the main process, this global state
    

  • buildstream/_scheduler/queues/buildqueue.py
    ... ... @@ -51,10 +51,10 @@ class BuildQueue(Queue):
    51 51
                 self._tried.add(element)
    
    52 52
                 _, description, detail = element._get_build_result()
    
    53 53
                 logfile = element._get_build_log()
    
    54
    -            self._message(element, MessageType.FAIL, description,
    
    55
    -                          detail=detail, action_name=self.action_name,
    
    56
    -                          elapsed=timedelta(seconds=0),
    
    57
    -                          logfile=logfile)
    
    54
    +            self._context.message(description, msg_type=MessageType.FAIL, plugin=element,
    
    55
    +                                  detail=detail, action_name=self.action_name,
    
    56
    +                                  elapsed=timedelta(seconds=0),
    
    57
    +                                  logfile=logfile)
    
    58 58
                 job = ElementJob(self._scheduler, self.action_name,
    
    59 59
                                  logfile, element=element, queue=self,
    
    60 60
                                  resources=self.resources,
    

  • buildstream/_scheduler/queues/queue.py
    ... ... @@ -30,7 +30,7 @@ from ..resources import ResourceType
    30 30
     
    
    31 31
     # BuildStream toplevel imports
    
    32 32
     from ..._exceptions import BstError, set_last_task_error
    
    33
    -from ..._message import Message, MessageType
    
    33
    +from ..._message import MessageType
    
    34 34
     
    
    35 35
     
    
    36 36
     # Queue status for a given element
    
    ... ... @@ -72,6 +72,7 @@ class Queue():
    72 72
             # Private members
    
    73 73
             #
    
    74 74
             self._scheduler = scheduler
    
    75
    +        self._context = scheduler.context
    
    75 76
             self._wait_queue = deque()
    
    76 77
             self._done_queue = deque()
    
    77 78
             self._max_retries = 0
    
    ... ... @@ -270,17 +271,19 @@ class Queue():
    270 271
             # Handle any workspace modifications now
    
    271 272
             #
    
    272 273
             if workspace_dict:
    
    273
    -            context = element._get_context()
    
    274
    -            workspaces = context.get_workspaces()
    
    274
    +            workspaces = self._context.get_workspaces()
    
    275 275
                 if workspaces.update_workspace(element._get_full_name(), workspace_dict):
    
    276 276
                     try:
    
    277 277
                         workspaces.save_config()
    
    278 278
                     except BstError as e:
    
    279
    -                    self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
    
    280
    -                except Exception as e:   # pylint: disable=broad-except
    
    281
    -                    self._message(element, MessageType.BUG,
    
    282
    -                                  "Unhandled exception while saving workspaces",
    
    283
    -                                  detail=traceback.format_exc())
    
    279
    +                    self._context.error("Error saving workspaces",
    
    280
    +                                        detail=str(e),
    
    281
    +                                        plugin=element)
    
    282
    +                except Exception as e: #pylint: disable=broad-except
    
    283
    +                    self._context.message("Unhandled exception while saving workspaces",
    
    284
    +                                          msg_type=MessageType.BUG,
    
    285
    +                                          detail=traceback.format_exc(),
    
    286
    +                                          plugin=element)
    
    284 287
     
    
    285 288
         # _job_done()
    
    286 289
         #
    
    ... ... @@ -304,10 +307,10 @@ class Queue():
    304 307
             try:
    
    305 308
                 self.done(job, element, result, success)
    
    306 309
             except BstError as e:
    
    307
    -
    
    308 310
                 # Report error and mark as failed
    
    309 311
                 #
    
    310
    -            self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
    
    312
    +            self._context.error("Post processing error",
    
    313
    +                                plugin=element, detail=e)
    
    311 314
                 self.failed_elements.append(element)
    
    312 315
     
    
    313 316
                 # Treat this as a task error as it's related to a task
    
    ... ... @@ -317,13 +320,12 @@ class Queue():
    317 320
                 #
    
    318 321
                 set_last_task_error(e.domain, e.reason)
    
    319 322
     
    
    320
    -        except Exception as e:   # pylint: disable=broad-except
    
    321
    -
    
    323
    +        except Exception:   # pylint: disable=broad-except
    
    322 324
                 # Report unhandled exceptions and mark as failed
    
    323 325
                 #
    
    324
    -            self._message(element, MessageType.BUG,
    
    325
    -                          "Unhandled exception in post processing",
    
    326
    -                          detail=traceback.format_exc())
    
    326
    +            self._context.message("Unhandled exception in post processing",
    
    327
    +                                  plugin=element, msg_type=MessageType.BUG,
    
    328
    +                                  detail=traceback.format_exc())
    
    327 329
                 self.failed_elements.append(element)
    
    328 330
             else:
    
    329 331
                 #
    
    ... ... @@ -343,13 +345,6 @@ class Queue():
    343 345
                 else:
    
    344 346
                     self.failed_elements.append(element)
    
    345 347
     
    
    346
    -    # Convenience wrapper for Queue implementations to send
    
    347
    -    # a message for the element they are processing
    
    348
    -    def _message(self, element, message_type, brief, **kwargs):
    
    349
    -        context = element._get_context()
    
    350
    -        message = Message(element._get_unique_id(), message_type, brief, **kwargs)
    
    351
    -        context.message(message)
    
    352
    -
    
    353 348
         def _element_log_path(self, element):
    
    354 349
             project = element._get_project()
    
    355 350
             key = element._get_display_key()[1]
    

  • buildstream/_stream.py
    ... ... @@ -25,11 +25,11 @@ import stat
    25 25
     import shlex
    
    26 26
     import shutil
    
    27 27
     import tarfile
    
    28
    +import traceback
    
    28 29
     from contextlib import contextmanager
    
    29 30
     from tempfile import TemporaryDirectory
    
    30 31
     
    
    31 32
     from ._exceptions import StreamError, ImplError, BstError, set_last_task_error
    
    32
    -from ._message import Message, MessageType
    
    33 33
     from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
    
    34 34
     from ._pipeline import Pipeline, PipelineSelection
    
    35 35
     from . import utils, _yaml, _site
    
    ... ... @@ -510,7 +510,7 @@ class Stream():
    510 510
                     target._open_workspace()
    
    511 511
     
    
    512 512
             workspaces.save_config()
    
    513
    -        self._message(MessageType.INFO, "Saved workspace configuration")
    
    513
    +        self._context.info("Saved workspace configuration")
    
    514 514
     
    
    515 515
         # workspace_close
    
    516 516
         #
    
    ... ... @@ -537,7 +537,7 @@ class Stream():
    537 537
             # Delete the workspace and save the configuration
    
    538 538
             workspaces.delete_workspace(element_name)
    
    539 539
             workspaces.save_config()
    
    540
    -        self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
    
    540
    +        self._context.info("Closed workspace for {}".format(element_name))
    
    541 541
     
    
    542 542
         # workspace_reset
    
    543 543
         #
    
    ... ... @@ -578,8 +578,8 @@ class Stream():
    578 578
                 workspace_path = workspace.get_absolute_path()
    
    579 579
                 if soft:
    
    580 580
                     workspace.prepared = False
    
    581
    -                self._message(MessageType.INFO, "Reset workspace state for {} at: {}"
    
    582
    -                              .format(element.name, workspace_path))
    
    581
    +                self._context.info("Reset workspace state for {} at: {}"
    
    582
    +                                   .format(element.name, workspace.path))
    
    583 583
                     continue
    
    584 584
     
    
    585 585
                 with element.timed_activity("Removing workspace directory {}"
    
    ... ... @@ -596,9 +596,8 @@ class Stream():
    596 596
                 with element.timed_activity("Staging sources to {}".format(workspace_path)):
    
    597 597
                     element._open_workspace()
    
    598 598
     
    
    599
    -            self._message(MessageType.INFO,
    
    600
    -                          "Reset workspace for {} at: {}".format(element.name,
    
    601
    -                                                                 workspace_path))
    
    599
    +            self._context.info("Reset workspace for {} at: {}"
    
    600
    +                               .format(element.name, workspace._path))
    
    602 601
     
    
    603 602
             workspaces.save_config()
    
    604 603
     
    
    ... ... @@ -674,7 +673,7 @@ class Stream():
    674 673
             # source-bundle only supports one target
    
    675 674
             target = self.targets[0]
    
    676 675
     
    
    677
    -        self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
    
    676
    +        self._context.info("Bundling sources for target {}".format(target.name))
    
    678 677
     
    
    679 678
             # Find the correct filename for the compression algorithm
    
    680 679
             tar_location = os.path.join(directory, target.normal_name + ".tar")
    
    ... ... @@ -954,15 +953,6 @@ class Stream():
    954 953
     
    
    955 954
             return selected, track_selected
    
    956 955
     
    
    957
    -    # _message()
    
    958
    -    #
    
    959
    -    # Local message propagator
    
    960
    -    #
    
    961
    -    def _message(self, message_type, message, **kwargs):
    
    962
    -        args = dict(kwargs)
    
    963
    -        self._context.message(
    
    964
    -            Message(None, message_type, message, **args))
    
    965
    -
    
    966 956
         # _add_queue()
    
    967 957
         #
    
    968 958
         # Adds a queue to the stream
    
    ... ... @@ -1013,10 +1003,11 @@ class Stream():
    1013 1003
                 for element in self.total_elements:
    
    1014 1004
                     element._update_state()
    
    1015 1005
             except BstError as e:
    
    1016
    -            self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
    
    1006
    +            self._context.error("Error resolving final state", detail=e)
    
    1017 1007
                 set_last_task_error(e.domain, e.reason)
    
    1018
    -        except Exception as e:   # pylint: disable=broad-except
    
    1019
    -            self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
    
    1008
    +        except Exception as e: # pylint: disable=broad-except
    
    1009
    +            self._context.message("Unhandled exception while resolving final state",
    
    1010
    +                                  detail=traceback.format_exc())
    
    1020 1011
     
    
    1021 1012
             if status == SchedStatus.ERROR:
    
    1022 1013
                 raise StreamError()
    

  • buildstream/plugin.py
    ... ... @@ -117,7 +117,6 @@ from weakref import WeakValueDictionary
    117 117
     from . import _yaml
    
    118 118
     from . import utils
    
    119 119
     from ._exceptions import PluginError, ImplError
    
    120
    -from ._message import Message, MessageType
    
    121 120
     
    
    122 121
     
    
    123 122
     class Plugin():
    
    ... ... @@ -464,8 +463,7 @@ class Plugin():
    464 463
                brief (str): The brief message
    
    465 464
                detail (str): An optional detailed message, can be multiline output
    
    466 465
             """
    
    467
    -        if self.__context.log_debug:
    
    468
    -            self.__message(MessageType.DEBUG, brief, detail=detail)
    
    466
    +        self.__context.debug(brief, detail=detail, plugin=self)
    
    469 467
     
    
    470 468
         def status(self, brief, *, detail=None):
    
    471 469
             """Print a status message
    
    ... ... @@ -474,9 +472,9 @@ class Plugin():
    474 472
                brief (str): The brief message
    
    475 473
                detail (str): An optional detailed message, can be multiline output
    
    476 474
     
    
    477
    -        Note: Status messages tell about what a plugin is currently doing
    
    475
    +        Note: Status messages tell the user what a plugin is currently doing
    
    478 476
             """
    
    479
    -        self.__message(MessageType.STATUS, brief, detail=detail)
    
    477
    +        self.__context.status(brief, detail=detail, plugin=self)
    
    480 478
     
    
    481 479
         def info(self, brief, *, detail=None):
    
    482 480
             """Print an informative message
    
    ... ... @@ -488,7 +486,7 @@ class Plugin():
    488 486
             Note: Informative messages tell the user something they might want
    
    489 487
                   to know, like if refreshing an element caused it to change.
    
    490 488
             """
    
    491
    -        self.__message(MessageType.INFO, brief, detail=detail)
    
    489
    +        self.__context.info(brief, detail=detail, plugin=self)
    
    492 490
     
    
    493 491
         def warn(self, brief, *, detail=None, warning_token=None):
    
    494 492
             """Print a warning message, checks warning_token against project configuration
    
    ... ... @@ -512,7 +510,7 @@ class Plugin():
    512 510
                     detail = detail if detail else ""
    
    513 511
                     raise PluginError(message="{}\n{}".format(brief, detail), reason=warning_token)
    
    514 512
     
    
    515
    -        self.__message(MessageType.WARN, brief=brief, detail=detail)
    
    513
    +        self.__context.warn(brief, detail=detail, plugin=self)
    
    516 514
     
    
    517 515
         def log(self, brief, *, detail=None):
    
    518 516
             """Log a message into the plugin's log file
    
    ... ... @@ -524,7 +522,7 @@ class Plugin():
    524 522
                brief (str): The brief message
    
    525 523
                detail (str): An optional detailed message, can be multiline output
    
    526 524
             """
    
    527
    -        self.__message(MessageType.LOG, brief, detail=detail)
    
    525
    +        self.__context.log(brief, detail=detail, plugin=self)
    
    528 526
     
    
    529 527
         @contextmanager
    
    530 528
         def timed_activity(self, activity_name, *, detail=None, silent_nested=False):
    
    ... ... @@ -746,10 +744,6 @@ class Plugin():
    746 744
     
    
    747 745
             return (exit_code, output)
    
    748 746
     
    
    749
    -    def __message(self, message_type, brief, **kwargs):
    
    750
    -        message = Message(self.__unique_id, message_type, brief, **kwargs)
    
    751
    -        self.__context.message(message)
    
    752
    -
    
    753 747
         def __note_command(self, output, *popenargs, **kwargs):
    
    754 748
             workdir = os.getcwd()
    
    755 749
             if 'cwd' in kwargs:
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]