Tristan Maat pushed to branch Qinusty/message-helpers at BuildStream / buildstream
Commits:
11 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_context.py
- buildstream/_frontend/app.py
- buildstream/_pipeline.py
- buildstream/_project.py
- buildstream/_scheduler/jobs/elementjob.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_stream.py
- buildstream/plugin.py
Changes:
... | ... | @@ -24,7 +24,6 @@ from collections.abc import Mapping |
24 | 24 |
|
25 | 25 |
from ..types import _KeyStrength
|
26 | 26 |
from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
|
27 |
-from .._message import Message, MessageType
|
|
28 | 27 |
from .. import utils
|
29 | 28 |
from .. import _yaml
|
30 | 29 |
|
... | ... | @@ -589,15 +588,6 @@ class ArtifactCache(): |
589 | 588 |
# Local Private Methods #
|
590 | 589 |
################################################
|
591 | 590 |
|
592 |
- # _message()
|
|
593 |
- #
|
|
594 |
- # Local message propagator
|
|
595 |
- #
|
|
596 |
- def _message(self, message_type, message, **kwargs):
|
|
597 |
- args = dict(kwargs)
|
|
598 |
- self.context.message(
|
|
599 |
- Message(None, message_type, message, **args))
|
|
600 |
- |
|
601 | 591 |
# _set_remotes():
|
602 | 592 |
#
|
603 | 593 |
# Set the list of remote caches. If project is None, the global list of
|
... | ... | @@ -621,7 +611,7 @@ class ArtifactCache(): |
621 | 611 |
#
|
622 | 612 |
def _initialize_remotes(self):
|
623 | 613 |
def remote_failed(url, error):
|
624 |
- self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error))
|
|
614 |
+ self.context.warn("Failed to fetch remote refs from {}: {}".format(url, error))
|
|
625 | 615 |
|
626 | 616 |
with self.context.timed_activity("Initializing remote caches", silent_nested=True):
|
627 | 617 |
self.initialize_remotes(on_failure=remote_failed)
|
... | ... | @@ -27,6 +27,7 @@ from . import _cachekey |
27 | 27 |
from . import _signals
|
28 | 28 |
from . import _site
|
29 | 29 |
from . import _yaml
|
30 |
+from .plugin import Plugin
|
|
30 | 31 |
from ._exceptions import LoadError, LoadErrorReason, BstError
|
31 | 32 |
from ._message import Message, MessageType
|
32 | 33 |
from ._profile import Topics, profile_start, profile_end
|
... | ... | @@ -326,7 +327,7 @@ class Context(): |
326 | 327 |
# the context.
|
327 | 328 |
#
|
328 | 329 |
# The message handler should have the same signature as
|
329 |
- # the message() method
|
|
330 |
+ # the _send_message() method
|
|
330 | 331 |
def set_message_handler(self, handler):
|
331 | 332 |
self._message_handler = handler
|
332 | 333 |
|
... | ... | @@ -341,16 +342,15 @@ class Context(): |
341 | 342 |
return True
|
342 | 343 |
return False
|
343 | 344 |
|
344 |
- # message():
|
|
345 |
+ # _send_message():
|
|
345 | 346 |
#
|
346 |
- # Proxies a message back to the caller, this is the central
|
|
347 |
+ # Proxies a message back through the message handler, this is the central
|
|
347 | 348 |
# point through which all messages pass.
|
348 | 349 |
#
|
349 | 350 |
# Args:
|
350 | 351 |
# message: A Message object
|
351 | 352 |
#
|
352 |
- def message(self, message):
|
|
353 |
- |
|
353 |
+ def _send_message(self, message):
|
|
354 | 354 |
# Tag message only once
|
355 | 355 |
if message.depth is None:
|
356 | 356 |
message.depth = len(list(self._message_depth))
|
... | ... | @@ -366,6 +366,87 @@ class Context(): |
366 | 366 |
self._message_handler(message, context=self)
|
367 | 367 |
return
|
368 | 368 |
|
369 |
+ # message():
|
|
370 |
+ #
|
|
371 |
+ # The global message API. Any message-sending functions should go
|
|
372 |
+ # through here. This will call `_send_message` to deliver the
|
|
373 |
+ # final message.
|
|
374 |
+ #
|
|
375 |
+ # Args:
|
|
376 |
+ # text (str): The text of the message.
|
|
377 |
+ #
|
|
378 |
+ # Kwargs:
|
|
379 |
+ # msg_type (MessageType): The type of the message (required).
|
|
380 |
+ # plugin (Plugin|str|None): The id of the plugin
|
|
381 |
+ # (i.e. Element, Source subclass
|
|
382 |
+ # instance) sending the message. If
|
|
383 |
+ # a plugin is given, this will be
|
|
384 |
+ # determined automatically, if
|
|
385 |
+ # omitted the message will be sent
|
|
386 |
+ # without a plugin context.
|
|
387 |
+ #
|
|
388 |
+ # For other kwargs, see `Message`.
|
|
389 |
+ #
|
|
390 |
+ def message(self, text, *, plugin=None, msg_type=None, **kwargs):
|
|
391 |
+ assert msg_type is not None
|
|
392 |
+ |
|
393 |
+ if isinstance(plugin, Plugin):
|
|
394 |
+ plugin_id = plugin._get_unique_id()
|
|
395 |
+ else:
|
|
396 |
+ plugin_id = plugin
|
|
397 |
+ |
|
398 |
+ self._send_message(Message(plugin_id, msg_type, str(text), **kwargs))
|
|
399 |
+ |
|
400 |
+ # skipped():
|
|
401 |
+ #
|
|
402 |
+ # Produce and send a skipped message through the context.
|
|
403 |
+ #
|
|
404 |
+ def skipped(self, text, **kwargs):
|
|
405 |
+ self.message(text, msg_type=MessageType.SKIPPED, **kwargs)
|
|
406 |
+ |
|
407 |
+ # debug():
|
|
408 |
+ #
|
|
409 |
+ # Produce and send a debug message through the context.
|
|
410 |
+ #
|
|
411 |
+ def debug(self, text, **kwargs):
|
|
412 |
+ if self.log_debug:
|
|
413 |
+ self.message(text, msg_type=MessageType.DEBUG, **kwargs)
|
|
414 |
+ |
|
415 |
+ # status():
|
|
416 |
+ #
|
|
417 |
+ # Produce and send a status message through the context.
|
|
418 |
+ #
|
|
419 |
+ def status(self, text, **kwargs):
|
|
420 |
+ self.message(text, msg_type=MessageType.STATUS, **kwargs)
|
|
421 |
+ |
|
422 |
+ # info():
|
|
423 |
+ #
|
|
424 |
+ # Produce and send a info message through the context.
|
|
425 |
+ #
|
|
426 |
+ def info(self, text, **kwargs):
|
|
427 |
+ self.message(text, msg_type=MessageType.INFO, **kwargs)
|
|
428 |
+ |
|
429 |
+ # warn():
|
|
430 |
+ #
|
|
431 |
+ # Produce and send a warning message through the context.
|
|
432 |
+ #
|
|
433 |
+ def warn(self, text, **kwargs):
|
|
434 |
+ self.message(text, msg_type=MessageType.WARN, **kwargs)
|
|
435 |
+ |
|
436 |
+ # error():
|
|
437 |
+ #
|
|
438 |
+ # Produce and send a error message through the context.
|
|
439 |
+ #
|
|
440 |
+ def error(self, text, **kwargs):
|
|
441 |
+ self.message(text, msg_type=MessageType.ERROR, **kwargs)
|
|
442 |
+ |
|
443 |
+ # log():
|
|
444 |
+ #
|
|
445 |
+ # Produce and send a log message through the context.
|
|
446 |
+ #
|
|
447 |
+ def log(self, text, **kwargs):
|
|
448 |
+ self.message(text, msg_type=MessageType.LOG, **kwargs)
|
|
449 |
+ |
|
369 | 450 |
# silence()
|
370 | 451 |
#
|
371 | 452 |
# A context manager to silence messages, this behaves in
|
... | ... | @@ -410,8 +491,8 @@ class Context(): |
410 | 491 |
with _signals.suspendable(stop_time, resume_time):
|
411 | 492 |
try:
|
412 | 493 |
# Push activity depth for status messages
|
413 |
- message = Message(unique_id, MessageType.START, activity_name, detail=detail)
|
|
414 |
- self.message(message)
|
|
494 |
+ self.message(activity_name, detail=detail, plugin=unique_id,
|
|
495 |
+ msg_type=MessageType.START)
|
|
415 | 496 |
self._push_message_depth(silent_nested)
|
416 | 497 |
yield
|
417 | 498 |
|
... | ... | @@ -419,15 +500,16 @@ class Context(): |
419 | 500 |
# Note the failure in status messages and reraise, the scheduler
|
420 | 501 |
# expects an error when there is an error.
|
421 | 502 |
elapsed = datetime.datetime.now() - starttime
|
422 |
- message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
|
|
423 | 503 |
self._pop_message_depth()
|
424 |
- self.message(message)
|
|
504 |
+ self.message(activity_name, detail=detail, elapsed=elapsed, plugin=unique_id,
|
|
505 |
+ msg_type=MessageType.FAIL)
|
|
425 | 506 |
raise
|
426 | 507 |
|
427 | 508 |
elapsed = datetime.datetime.now() - starttime
|
428 |
- message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
|
|
429 | 509 |
self._pop_message_depth()
|
430 |
- self.message(message)
|
|
510 |
+ self.message(activity_name, detail=detail,
|
|
511 |
+ elapsed=elapsed, plugin=unique_id,
|
|
512 |
+ msg_type=MessageType.SUCCESS)
|
|
431 | 513 |
|
432 | 514 |
# recorded_messages()
|
433 | 515 |
#
|
... | ... | @@ -35,7 +35,7 @@ from .._context import Context |
35 | 35 |
from .._platform import Platform
|
36 | 36 |
from .._project import Project
|
37 | 37 |
from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
|
38 |
-from .._message import Message, MessageType, unconditional_messages
|
|
38 |
+from .._message import MessageType, unconditional_messages
|
|
39 | 39 |
from .._stream import Stream
|
40 | 40 |
from .._versions import BST_FORMAT_VERSION
|
41 | 41 |
from .. import _yaml
|
... | ... | @@ -251,7 +251,7 @@ class App(): |
251 | 251 |
|
252 | 252 |
# Mark the beginning of the session
|
253 | 253 |
if session_name:
|
254 |
- self._message(MessageType.START, session_name)
|
|
254 |
+ self.context.message(session_name, msg_type=MessageType.START)
|
|
255 | 255 |
|
256 | 256 |
# Run the body of the session here, once everything is loaded
|
257 | 257 |
try:
|
... | ... | @@ -263,9 +263,9 @@ class App(): |
263 | 263 |
elapsed = self.stream.elapsed_time
|
264 | 264 |
|
265 | 265 |
if isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member
|
266 |
- self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
|
|
266 |
+ self.context.warn(session_name + ' Terminated', elapsed=elapsed)
|
|
267 | 267 |
else:
|
268 |
- self._message(MessageType.FAIL, session_name, elapsed=elapsed)
|
|
268 |
+ self.context.message(session_name, elapsed=elapsed, msg_type=MessageType.FAIL)
|
|
269 | 269 |
|
270 | 270 |
# Notify session failure
|
271 | 271 |
self._notify("{} failed".format(session_name), "{}".format(e))
|
... | ... | @@ -283,7 +283,9 @@ class App(): |
283 | 283 |
else:
|
284 | 284 |
# No exceptions occurred, print session time and summary
|
285 | 285 |
if session_name:
|
286 |
- self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
|
|
286 |
+ self.context.message(session_name,
|
|
287 |
+ elapsed=self.stream.elapsed_time,
|
|
288 |
+ msg_type=MessageType.SUCCESS)
|
|
287 | 289 |
if self._started:
|
288 | 290 |
self._print_summary()
|
289 | 291 |
|
... | ... | @@ -429,21 +431,13 @@ class App(): |
429 | 431 |
if self.interactive:
|
430 | 432 |
self.notify(title, text)
|
431 | 433 |
|
432 |
- # Local message propagator
|
|
433 |
- #
|
|
434 |
- def _message(self, message_type, message, **kwargs):
|
|
435 |
- args = dict(kwargs)
|
|
436 |
- self.context.message(
|
|
437 |
- Message(None, message_type, message, **args))
|
|
438 |
- |
|
439 | 434 |
# Exception handler
|
440 | 435 |
#
|
441 | 436 |
def _global_exception_handler(self, etype, value, tb):
|
442 | 437 |
|
443 | 438 |
# Print the regular BUG message
|
444 | 439 |
formatted = "".join(traceback.format_exception(etype, value, tb))
|
445 |
- self._message(MessageType.BUG, str(value),
|
|
446 |
- detail=formatted)
|
|
440 |
+ self.context.message(value, detail=formatted, msg_type=MessageType.BUG)
|
|
447 | 441 |
|
448 | 442 |
# If the scheduler has started, try to terminate all jobs gracefully,
|
449 | 443 |
# otherwise exit immediately.
|
... | ... | @@ -24,7 +24,6 @@ import itertools |
24 | 24 |
from operator import itemgetter
|
25 | 25 |
|
26 | 26 |
from ._exceptions import PipelineError
|
27 |
-from ._message import Message, MessageType
|
|
28 | 27 |
from ._profile import Topics, profile_start, profile_end
|
29 | 28 |
from . import Scope, Consistency
|
30 | 29 |
from ._project import ProjectRefStorage
|
... | ... | @@ -201,8 +200,8 @@ class Pipeline(): |
201 | 200 |
for t in targets:
|
202 | 201 |
new_elm = t._get_source_element()
|
203 | 202 |
if new_elm != t and not silent:
|
204 |
- self._message(MessageType.INFO, "Element '{}' redirected to '{}'"
|
|
205 |
- .format(t.name, new_elm.name))
|
|
203 |
+ self._context.info("Element '{}' redirected to '{}'"
|
|
204 |
+ .format(t.name, new_elm.name))
|
|
206 | 205 |
if new_elm not in elements:
|
207 | 206 |
elements.append(new_elm)
|
208 | 207 |
elif mode == PipelineSelection.PLAN:
|
... | ... | @@ -433,15 +432,6 @@ class Pipeline(): |
433 | 432 |
|
434 | 433 |
raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
|
435 | 434 |
|
436 |
- # _message()
|
|
437 |
- #
|
|
438 |
- # Local message propagator
|
|
439 |
- #
|
|
440 |
- def _message(self, message_type, message, **kwargs):
|
|
441 |
- args = dict(kwargs)
|
|
442 |
- self._context.message(
|
|
443 |
- Message(None, message_type, message, **args))
|
|
444 |
- |
|
445 | 435 |
|
446 | 436 |
# _Planner()
|
447 | 437 |
#
|
... | ... | @@ -37,7 +37,6 @@ from ._projectrefs import ProjectRefs, ProjectRefStorage |
37 | 37 |
from ._versions import BST_FORMAT_VERSION
|
38 | 38 |
from ._loader import Loader
|
39 | 39 |
from .element import Element
|
40 |
-from ._message import Message, MessageType
|
|
41 | 40 |
from ._includes import Includes
|
42 | 41 |
from ._platform import Platform
|
43 | 42 |
|
... | ... | @@ -337,8 +336,7 @@ class Project(): |
337 | 336 |
for source, ref in redundant_refs
|
338 | 337 |
]
|
339 | 338 |
detail += "\n".join(lines)
|
340 |
- self._context.message(
|
|
341 |
- Message(None, MessageType.WARN, "Ignoring redundant source references", detail=detail))
|
|
339 |
+ self._context.warn("Ignoring redundant source references", detail=detail)
|
|
342 | 340 |
|
343 | 341 |
return elements
|
344 | 342 |
|
... | ... | @@ -514,13 +512,9 @@ class Project(): |
514 | 512 |
|
515 | 513 |
# Deprecation check
|
516 | 514 |
if fail_on_overlap is not None:
|
517 |
- self._context.message(
|
|
518 |
- Message(
|
|
519 |
- None,
|
|
520 |
- MessageType.WARN,
|
|
521 |
- "Use of fail-on-overlap within project.conf " +
|
|
522 |
- "is deprecated. Consider using fatal-warnings instead."
|
|
523 |
- )
|
|
515 |
+ self._context.warn(
|
|
516 |
+ "Use of fail-on-overlap within project.conf " +
|
|
517 |
+ "is deprecated. Consider using fatal-warnings instead."
|
|
524 | 518 |
)
|
525 | 519 |
|
526 | 520 |
# Load project.refs if it exists, this may be ignored.
|
... | ... | @@ -18,8 +18,6 @@ |
18 | 18 |
#
|
19 | 19 |
from ruamel import yaml
|
20 | 20 |
|
21 |
-from ..._message import Message, MessageType
|
|
22 |
- |
|
23 | 21 |
from .job import Job
|
24 | 22 |
|
25 | 23 |
|
... | ... | @@ -86,9 +84,8 @@ class ElementJob(Job): |
86 | 84 |
# This should probably be omitted for non-build tasks but it's harmless here
|
87 | 85 |
elt_env = self._element.get_environment()
|
88 | 86 |
env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
|
89 |
- self.message(MessageType.LOG,
|
|
90 |
- "Build environment for element {}".format(self._element.name),
|
|
91 |
- detail=env_dump)
|
|
87 |
+ self._log("Build environment for element {}".format(self._element.name),
|
|
88 |
+ detail=env_dump, plugin=self.element, scheduler=True)
|
|
92 | 89 |
|
93 | 90 |
# Run the action
|
94 | 91 |
return self._action_cb(self._element)
|
... | ... | @@ -96,15 +93,6 @@ class ElementJob(Job): |
96 | 93 |
def parent_complete(self, success, result):
|
97 | 94 |
self._complete_cb(self, self._element, success, self._result)
|
98 | 95 |
|
99 |
- def message(self, message_type, message, **kwargs):
|
|
100 |
- args = dict(kwargs)
|
|
101 |
- args['scheduler'] = True
|
|
102 |
- self._scheduler.context.message(
|
|
103 |
- Message(self._element._get_unique_id(),
|
|
104 |
- message_type,
|
|
105 |
- message,
|
|
106 |
- **args))
|
|
107 |
- |
|
108 | 96 |
def child_process_data(self):
|
109 | 97 |
data = {}
|
110 | 98 |
|
... | ... | @@ -113,3 +101,10 @@ class ElementJob(Job): |
113 | 101 |
data['workspace'] = workspace.to_dict()
|
114 | 102 |
|
115 | 103 |
return data
|
104 |
+ |
|
105 |
+ # _fail()
|
|
106 |
+ #
|
|
107 |
+ # Override _fail to set scheduler kwarg to true.
|
|
108 |
+ #
|
|
109 |
+ def _fail(self, text, **kwargs):
|
|
110 |
+ super()._fail(text, scheduler=True, **kwargs)
|
... | ... | @@ -32,7 +32,7 @@ import psutil |
32 | 32 |
|
33 | 33 |
# BuildStream toplevel imports
|
34 | 34 |
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
|
35 |
-from ..._message import Message, MessageType, unconditional_messages
|
|
35 |
+from ..._message import MessageType, unconditional_messages
|
|
36 | 36 |
from ... import _signals, utils
|
37 | 37 |
|
38 | 38 |
# Return code values shutdown of job handling child processes
|
... | ... | @@ -110,6 +110,7 @@ class Job(): |
110 | 110 |
# Private members
|
111 | 111 |
#
|
112 | 112 |
self._scheduler = scheduler # The scheduler
|
113 |
+ self._context = scheduler.context # The context, used primarily for UI messaging.
|
|
113 | 114 |
self._queue = None # A message passing queue
|
114 | 115 |
self._process = None # The Process object
|
115 | 116 |
self._watcher = None # Child process watcher
|
... | ... | @@ -184,7 +185,7 @@ class Job(): |
184 | 185 |
# First resume the job if it's suspended
|
185 | 186 |
self.resume(silent=True)
|
186 | 187 |
|
187 |
- self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
|
|
188 |
+ self._status("{} terminating".format(self.action_name))
|
|
188 | 189 |
|
189 | 190 |
# Make sure there is no garbage on the queue
|
190 | 191 |
self._parent_stop_listening()
|
... | ... | @@ -217,8 +218,8 @@ class Job(): |
217 | 218 |
def kill(self):
|
218 | 219 |
|
219 | 220 |
# Force kill
|
220 |
- self.message(MessageType.WARN,
|
|
221 |
- "{} did not terminate gracefully, killing".format(self.action_name))
|
|
221 |
+ self._warn("{} did not terminate gracefully, killing"
|
|
222 |
+ .format(self.action_name))
|
|
222 | 223 |
|
223 | 224 |
try:
|
224 | 225 |
utils._kill_process_tree(self._process.pid)
|
... | ... | @@ -233,8 +234,7 @@ class Job(): |
233 | 234 |
#
|
234 | 235 |
def suspend(self):
|
235 | 236 |
if not self._suspended:
|
236 |
- self.message(MessageType.STATUS,
|
|
237 |
- "{} suspending".format(self.action_name))
|
|
237 |
+ self._status("{} suspending".format(self.action_name))
|
|
238 | 238 |
|
239 | 239 |
try:
|
240 | 240 |
# Use SIGTSTP so that child processes may handle and propagate
|
... | ... | @@ -258,8 +258,7 @@ class Job(): |
258 | 258 |
def resume(self, silent=False):
|
259 | 259 |
if self._suspended:
|
260 | 260 |
if not silent and not self._scheduler.terminated:
|
261 |
- self.message(MessageType.STATUS,
|
|
262 |
- "{} resuming".format(self.action_name))
|
|
261 |
+ self._status("{} resuming".format(self.action_name))
|
|
263 | 262 |
|
264 | 263 |
os.kill(self._process.pid, signal.SIGCONT)
|
265 | 264 |
self._suspended = False
|
... | ... | @@ -324,21 +323,6 @@ class Job(): |
324 | 323 |
raise ImplError("Job '{kind}' does not implement child_process()"
|
325 | 324 |
.format(kind=type(self).__name__))
|
326 | 325 |
|
327 |
- # message():
|
|
328 |
- #
|
|
329 |
- # Logs a message, this will be logged in the task's logfile and
|
|
330 |
- # conditionally also be sent to the frontend.
|
|
331 |
- #
|
|
332 |
- # Args:
|
|
333 |
- # message_type (MessageType): The type of message to send
|
|
334 |
- # message (str): The message
|
|
335 |
- # kwargs: Remaining Message() constructor arguments
|
|
336 |
- #
|
|
337 |
- def message(self, message_type, message, **kwargs):
|
|
338 |
- args = dict(kwargs)
|
|
339 |
- args['scheduler'] = True
|
|
340 |
- self._scheduler.context.message(Message(None, message_type, message, **args))
|
|
341 |
- |
|
342 | 326 |
# child_process_data()
|
343 | 327 |
#
|
344 | 328 |
# Abstract method to retrieve additional data that should be
|
... | ... | @@ -365,6 +349,32 @@ class Job(): |
365 | 349 |
#
|
366 | 350 |
#######################################################
|
367 | 351 |
|
352 |
+ def _debug(self, text, **kwargs):
|
|
353 |
+ self._context.debug(text, task_id=self._task_id, **kwargs)
|
|
354 |
+ |
|
355 |
+ def _status(self, text, **kwargs):
|
|
356 |
+ self._context.status(text, task_id=self._task_id, **kwargs)
|
|
357 |
+ |
|
358 |
+ def _info(self, text, **kwargs):
|
|
359 |
+ self._context.info(text, task_id=self._task_id, **kwargs)
|
|
360 |
+ |
|
361 |
+ def _warn(self, text, **kwargs):
|
|
362 |
+ self._context.warn(text, task_id=self._task_id, **kwargs)
|
|
363 |
+ |
|
364 |
+ def _error(self, text, **kwargs):
|
|
365 |
+ self._context.error(text, task_id=self._task_id, **kwargs)
|
|
366 |
+ |
|
367 |
+ def _log(self, text, **kwargs):
|
|
368 |
+ self._context.log(text, task_id=self._task_id, **kwargs)
|
|
369 |
+ |
|
370 |
+ # _fail()
|
|
371 |
+ #
|
|
372 |
+ # Only exists for sub classes to override and add kwargs to.
|
|
373 |
+ #
|
|
374 |
+ def _fail(self, text, **kwargs):
|
|
375 |
+ self._context.message(text, task_id=self._task_id,
|
|
376 |
+ msg_type=MessageType.FAIL, **kwargs)
|
|
377 |
+ |
|
368 | 378 |
# _child_action()
|
369 | 379 |
#
|
370 | 380 |
# Perform the action in the child process, this calls the action_cb.
|
... | ... | @@ -391,7 +401,7 @@ class Job(): |
391 | 401 |
# Set the global message handler in this child
|
392 | 402 |
# process to forward messages to the parent process
|
393 | 403 |
self._queue = queue
|
394 |
- self._scheduler.context.set_message_handler(self._child_message_handler)
|
|
404 |
+ self._context.set_message_handler(self._child_message_handler)
|
|
395 | 405 |
|
396 | 406 |
starttime = datetime.datetime.now()
|
397 | 407 |
stopped_time = None
|
... | ... | @@ -408,17 +418,17 @@ class Job(): |
408 | 418 |
# Time, log and and run the action function
|
409 | 419 |
#
|
410 | 420 |
with _signals.suspendable(stop_time, resume_time), \
|
411 |
- self._scheduler.context.recorded_messages(self._logfile) as filename:
|
|
421 |
+ self._context.recorded_messages(self._logfile) as filename:
|
|
412 | 422 |
|
413 |
- self.message(MessageType.START, self.action_name, logfile=filename)
|
|
423 |
+ self._context.message(self.action_name, logfile=filename,
|
|
424 |
+ msg_type=MessageType.START, task_id=self._task_id)
|
|
414 | 425 |
|
415 | 426 |
try:
|
416 | 427 |
# Try the task action
|
417 | 428 |
result = self.child_process()
|
418 | 429 |
except SkipJob as e:
|
419 | 430 |
elapsed = datetime.datetime.now() - starttime
|
420 |
- self.message(MessageType.SKIPPED, str(e),
|
|
421 |
- elapsed=elapsed, logfile=filename)
|
|
431 |
+ self._context.skipped(e, elapsed=elapsed, logfile=filename)
|
|
422 | 432 |
|
423 | 433 |
# Alert parent of skip by return code
|
424 | 434 |
self._child_shutdown(RC_SKIPPED)
|
... | ... | @@ -427,13 +437,11 @@ class Job(): |
427 | 437 |
self._retry_flag = e.temporary
|
428 | 438 |
|
429 | 439 |
if self._retry_flag and (self._tries <= self._max_retries):
|
430 |
- self.message(MessageType.FAIL,
|
|
431 |
- "Try #{} failed, retrying".format(self._tries),
|
|
432 |
- elapsed=elapsed, logfile=filename)
|
|
440 |
+ self._fail("Try #{} failed, retrying".format(self._tries),
|
|
441 |
+ elapsed=elapsed, logfile=filename)
|
|
433 | 442 |
else:
|
434 |
- self.message(MessageType.FAIL, str(e),
|
|
435 |
- elapsed=elapsed, detail=e.detail,
|
|
436 |
- logfile=filename, sandbox=e.sandbox)
|
|
443 |
+ self._fail(e, elapsed=elapsed, detail=e.detail,
|
|
444 |
+ logfile=filename, sandbox=e.sandbox)
|
|
437 | 445 |
|
438 | 446 |
self._queue.put(Envelope('child_data', self.child_process_data()))
|
439 | 447 |
|
... | ... | @@ -453,9 +461,9 @@ class Job(): |
453 | 461 |
elapsed = datetime.datetime.now() - starttime
|
454 | 462 |
detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
|
455 | 463 |
|
456 |
- self.message(MessageType.BUG, self.action_name,
|
|
457 |
- elapsed=elapsed, detail=detail,
|
|
458 |
- logfile=filename)
|
|
464 |
+ self._context.message(self.action_name, elapsed=elapsed,
|
|
465 |
+ detail=detail, logfile=filename,
|
|
466 |
+ task_id=self._task_id, msg_type=MessageType.BUG)
|
|
459 | 467 |
# Unhandled exceptions should permenantly fail
|
460 | 468 |
self._child_shutdown(RC_PERM_FAIL)
|
461 | 469 |
|
... | ... | @@ -465,8 +473,10 @@ class Job(): |
465 | 473 |
self._child_send_result(result)
|
466 | 474 |
|
467 | 475 |
elapsed = datetime.datetime.now() - starttime
|
468 |
- self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
|
|
469 |
- logfile=filename)
|
|
476 |
+ self._context.message(self.action_name,
|
|
477 |
+ elapsed=elapsed, logfile=filename,
|
|
478 |
+ msg_type=MessageType.SUCCESS,
|
|
479 |
+ task_id=self._task_id)
|
|
470 | 480 |
|
471 | 481 |
# Shutdown needs to stay outside of the above context manager,
|
472 | 482 |
# make sure we dont try to handle SIGTERM while the process
|
... | ... | @@ -603,7 +613,7 @@ class Job(): |
603 | 613 |
if envelope._message_type == 'message':
|
604 | 614 |
# Propagate received messages from children
|
605 | 615 |
# back through the context.
|
606 |
- self._scheduler.context.message(envelope._message)
|
|
616 |
+ self._context._send_message(envelope._message)
|
|
607 | 617 |
elif envelope._message_type == 'error':
|
608 | 618 |
# For regression tests only, save the last error domain / reason
|
609 | 619 |
# reported from a child task in the main process, this global state
|
... | ... | @@ -51,10 +51,10 @@ class BuildQueue(Queue): |
51 | 51 |
self._tried.add(element)
|
52 | 52 |
_, description, detail = element._get_build_result()
|
53 | 53 |
logfile = element._get_build_log()
|
54 |
- self._message(element, MessageType.FAIL, description,
|
|
55 |
- detail=detail, action_name=self.action_name,
|
|
56 |
- elapsed=timedelta(seconds=0),
|
|
57 |
- logfile=logfile)
|
|
54 |
+ self._context.message(description, msg_type=MessageType.FAIL, plugin=element,
|
|
55 |
+ detail=detail, action_name=self.action_name,
|
|
56 |
+ elapsed=timedelta(seconds=0),
|
|
57 |
+ logfile=logfile)
|
|
58 | 58 |
job = ElementJob(self._scheduler, self.action_name,
|
59 | 59 |
logfile, element=element, queue=self,
|
60 | 60 |
resources=self.resources,
|
... | ... | @@ -30,7 +30,7 @@ from ..resources import ResourceType |
30 | 30 |
|
31 | 31 |
# BuildStream toplevel imports
|
32 | 32 |
from ..._exceptions import BstError, set_last_task_error
|
33 |
-from ..._message import Message, MessageType
|
|
33 |
+from ..._message import MessageType
|
|
34 | 34 |
|
35 | 35 |
|
36 | 36 |
# Queue status for a given element
|
... | ... | @@ -72,6 +72,7 @@ class Queue(): |
72 | 72 |
# Private members
|
73 | 73 |
#
|
74 | 74 |
self._scheduler = scheduler
|
75 |
+ self._context = scheduler.context
|
|
75 | 76 |
self._wait_queue = deque()
|
76 | 77 |
self._done_queue = deque()
|
77 | 78 |
self._max_retries = 0
|
... | ... | @@ -270,17 +271,19 @@ class Queue(): |
270 | 271 |
# Handle any workspace modifications now
|
271 | 272 |
#
|
272 | 273 |
if workspace_dict:
|
273 |
- context = element._get_context()
|
|
274 |
- workspaces = context.get_workspaces()
|
|
274 |
+ workspaces = self._context.get_workspaces()
|
|
275 | 275 |
if workspaces.update_workspace(element._get_full_name(), workspace_dict):
|
276 | 276 |
try:
|
277 | 277 |
workspaces.save_config()
|
278 | 278 |
except BstError as e:
|
279 |
- self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
|
|
280 |
- except Exception as e: # pylint: disable=broad-except
|
|
281 |
- self._message(element, MessageType.BUG,
|
|
282 |
- "Unhandled exception while saving workspaces",
|
|
283 |
- detail=traceback.format_exc())
|
|
279 |
+ self._context.error("Error saving workspaces",
|
|
280 |
+ detail=str(e),
|
|
281 |
+ plugin=element)
|
|
282 |
+ except Exception as e: #pylint: disable=broad-except
|
|
283 |
+ self._context.message("Unhandled exception while saving workspaces",
|
|
284 |
+ msg_type=MessageType.BUG,
|
|
285 |
+ detail=traceback.format_exc(),
|
|
286 |
+ plugin=element)
|
|
284 | 287 |
|
285 | 288 |
# _job_done()
|
286 | 289 |
#
|
... | ... | @@ -304,10 +307,10 @@ class Queue(): |
304 | 307 |
try:
|
305 | 308 |
self.done(job, element, result, success)
|
306 | 309 |
except BstError as e:
|
307 |
- |
|
308 | 310 |
# Report error and mark as failed
|
309 | 311 |
#
|
310 |
- self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
|
|
312 |
+ self._context.error("Post processing error",
|
|
313 |
+ plugin=element, detail=e)
|
|
311 | 314 |
self.failed_elements.append(element)
|
312 | 315 |
|
313 | 316 |
# Treat this as a task error as it's related to a task
|
... | ... | @@ -317,13 +320,12 @@ class Queue(): |
317 | 320 |
#
|
318 | 321 |
set_last_task_error(e.domain, e.reason)
|
319 | 322 |
|
320 |
- except Exception as e: # pylint: disable=broad-except
|
|
321 |
- |
|
323 |
+ except Exception: # pylint: disable=broad-except
|
|
322 | 324 |
# Report unhandled exceptions and mark as failed
|
323 | 325 |
#
|
324 |
- self._message(element, MessageType.BUG,
|
|
325 |
- "Unhandled exception in post processing",
|
|
326 |
- detail=traceback.format_exc())
|
|
326 |
+ self._context.message("Unhandled exception in post processing",
|
|
327 |
+ plugin=element, msg_type=MessageType.BUG,
|
|
328 |
+ detail=traceback.format_exc())
|
|
327 | 329 |
self.failed_elements.append(element)
|
328 | 330 |
else:
|
329 | 331 |
#
|
... | ... | @@ -343,13 +345,6 @@ class Queue(): |
343 | 345 |
else:
|
344 | 346 |
self.failed_elements.append(element)
|
345 | 347 |
|
346 |
- # Convenience wrapper for Queue implementations to send
|
|
347 |
- # a message for the element they are processing
|
|
348 |
- def _message(self, element, message_type, brief, **kwargs):
|
|
349 |
- context = element._get_context()
|
|
350 |
- message = Message(element._get_unique_id(), message_type, brief, **kwargs)
|
|
351 |
- context.message(message)
|
|
352 |
- |
|
353 | 348 |
def _element_log_path(self, element):
|
354 | 349 |
project = element._get_project()
|
355 | 350 |
key = element._get_display_key()[1]
|
... | ... | @@ -25,11 +25,11 @@ import stat |
25 | 25 |
import shlex
|
26 | 26 |
import shutil
|
27 | 27 |
import tarfile
|
28 |
+import traceback
|
|
28 | 29 |
from contextlib import contextmanager
|
29 | 30 |
from tempfile import TemporaryDirectory
|
30 | 31 |
|
31 | 32 |
from ._exceptions import StreamError, ImplError, BstError, set_last_task_error
|
32 |
-from ._message import Message, MessageType
|
|
33 | 33 |
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
|
34 | 34 |
from ._pipeline import Pipeline, PipelineSelection
|
35 | 35 |
from . import utils, _yaml, _site
|
... | ... | @@ -510,7 +510,7 @@ class Stream(): |
510 | 510 |
target._open_workspace()
|
511 | 511 |
|
512 | 512 |
workspaces.save_config()
|
513 |
- self._message(MessageType.INFO, "Saved workspace configuration")
|
|
513 |
+ self._context.info("Saved workspace configuration")
|
|
514 | 514 |
|
515 | 515 |
# workspace_close
|
516 | 516 |
#
|
... | ... | @@ -537,7 +537,7 @@ class Stream(): |
537 | 537 |
# Delete the workspace and save the configuration
|
538 | 538 |
workspaces.delete_workspace(element_name)
|
539 | 539 |
workspaces.save_config()
|
540 |
- self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
|
|
540 |
+ self._context.info("Closed workspace for {}".format(element_name))
|
|
541 | 541 |
|
542 | 542 |
# workspace_reset
|
543 | 543 |
#
|
... | ... | @@ -578,8 +578,8 @@ class Stream(): |
578 | 578 |
workspace_path = workspace.get_absolute_path()
|
579 | 579 |
if soft:
|
580 | 580 |
workspace.prepared = False
|
581 |
- self._message(MessageType.INFO, "Reset workspace state for {} at: {}"
|
|
582 |
- .format(element.name, workspace_path))
|
|
581 |
+ self._context.info("Reset workspace state for {} at: {}"
|
|
582 |
+ .format(element.name, workspace.path))
|
|
583 | 583 |
continue
|
584 | 584 |
|
585 | 585 |
with element.timed_activity("Removing workspace directory {}"
|
... | ... | @@ -596,9 +596,8 @@ class Stream(): |
596 | 596 |
with element.timed_activity("Staging sources to {}".format(workspace_path)):
|
597 | 597 |
element._open_workspace()
|
598 | 598 |
|
599 |
- self._message(MessageType.INFO,
|
|
600 |
- "Reset workspace for {} at: {}".format(element.name,
|
|
601 |
- workspace_path))
|
|
599 |
+ self._context.info("Reset workspace for {} at: {}"
|
|
600 |
+ .format(element.name, workspace._path))
|
|
602 | 601 |
|
603 | 602 |
workspaces.save_config()
|
604 | 603 |
|
... | ... | @@ -674,7 +673,7 @@ class Stream(): |
674 | 673 |
# source-bundle only supports one target
|
675 | 674 |
target = self.targets[0]
|
676 | 675 |
|
677 |
- self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
|
|
676 |
+ self._context.info("Bundling sources for target {}".format(target.name))
|
|
678 | 677 |
|
679 | 678 |
# Find the correct filename for the compression algorithm
|
680 | 679 |
tar_location = os.path.join(directory, target.normal_name + ".tar")
|
... | ... | @@ -954,15 +953,6 @@ class Stream(): |
954 | 953 |
|
955 | 954 |
return selected, track_selected
|
956 | 955 |
|
957 |
- # _message()
|
|
958 |
- #
|
|
959 |
- # Local message propagator
|
|
960 |
- #
|
|
961 |
- def _message(self, message_type, message, **kwargs):
|
|
962 |
- args = dict(kwargs)
|
|
963 |
- self._context.message(
|
|
964 |
- Message(None, message_type, message, **args))
|
|
965 |
- |
|
966 | 956 |
# _add_queue()
|
967 | 957 |
#
|
968 | 958 |
# Adds a queue to the stream
|
... | ... | @@ -1013,10 +1003,11 @@ class Stream(): |
1013 | 1003 |
for element in self.total_elements:
|
1014 | 1004 |
element._update_state()
|
1015 | 1005 |
except BstError as e:
|
1016 |
- self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
|
|
1006 |
+ self._context.error("Error resolving final state", detail=e)
|
|
1017 | 1007 |
set_last_task_error(e.domain, e.reason)
|
1018 |
- except Exception as e: # pylint: disable=broad-except
|
|
1019 |
- self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
|
|
1008 |
+ except Exception as e: # pylint: disable=broad-except
|
|
1009 |
+ self._context.message("Unhandled exception while resolving final state",
|
|
1010 |
+ detail=traceback.format_exc())
|
|
1020 | 1011 |
|
1021 | 1012 |
if status == SchedStatus.ERROR:
|
1022 | 1013 |
raise StreamError()
|
... | ... | @@ -117,7 +117,6 @@ from weakref import WeakValueDictionary |
117 | 117 |
from . import _yaml
|
118 | 118 |
from . import utils
|
119 | 119 |
from ._exceptions import PluginError, ImplError
|
120 |
-from ._message import Message, MessageType
|
|
121 | 120 |
|
122 | 121 |
|
123 | 122 |
class Plugin():
|
... | ... | @@ -464,8 +463,7 @@ class Plugin(): |
464 | 463 |
brief (str): The brief message
|
465 | 464 |
detail (str): An optional detailed message, can be multiline output
|
466 | 465 |
"""
|
467 |
- if self.__context.log_debug:
|
|
468 |
- self.__message(MessageType.DEBUG, brief, detail=detail)
|
|
466 |
+ self.__context.debug(brief, detail=detail, plugin=self)
|
|
469 | 467 |
|
470 | 468 |
def status(self, brief, *, detail=None):
|
471 | 469 |
"""Print a status message
|
... | ... | @@ -474,9 +472,9 @@ class Plugin(): |
474 | 472 |
brief (str): The brief message
|
475 | 473 |
detail (str): An optional detailed message, can be multiline output
|
476 | 474 |
|
477 |
- Note: Status messages tell about what a plugin is currently doing
|
|
475 |
+ Note: Status messages tell the user what a plugin is currently doing
|
|
478 | 476 |
"""
|
479 |
- self.__message(MessageType.STATUS, brief, detail=detail)
|
|
477 |
+ self.__context.status(brief, detail=detail, plugin=self)
|
|
480 | 478 |
|
481 | 479 |
def info(self, brief, *, detail=None):
|
482 | 480 |
"""Print an informative message
|
... | ... | @@ -488,7 +486,7 @@ class Plugin(): |
488 | 486 |
Note: Informative messages tell the user something they might want
|
489 | 487 |
to know, like if refreshing an element caused it to change.
|
490 | 488 |
"""
|
491 |
- self.__message(MessageType.INFO, brief, detail=detail)
|
|
489 |
+ self.__context.info(brief, detail=detail, plugin=self)
|
|
492 | 490 |
|
493 | 491 |
def warn(self, brief, *, detail=None, warning_token=None):
|
494 | 492 |
"""Print a warning message, checks warning_token against project configuration
|
... | ... | @@ -512,7 +510,7 @@ class Plugin(): |
512 | 510 |
detail = detail if detail else ""
|
513 | 511 |
raise PluginError(message="{}\n{}".format(brief, detail), reason=warning_token)
|
514 | 512 |
|
515 |
- self.__message(MessageType.WARN, brief=brief, detail=detail)
|
|
513 |
+ self.__context.warn(brief, detail=detail, plugin=self)
|
|
516 | 514 |
|
517 | 515 |
def log(self, brief, *, detail=None):
|
518 | 516 |
"""Log a message into the plugin's log file
|
... | ... | @@ -524,7 +522,7 @@ class Plugin(): |
524 | 522 |
brief (str): The brief message
|
525 | 523 |
detail (str): An optional detailed message, can be multiline output
|
526 | 524 |
"""
|
527 |
- self.__message(MessageType.LOG, brief, detail=detail)
|
|
525 |
+ self.__context.log(brief, detail=detail, plugin=self)
|
|
528 | 526 |
|
529 | 527 |
@contextmanager
|
530 | 528 |
def timed_activity(self, activity_name, *, detail=None, silent_nested=False):
|
... | ... | @@ -746,10 +744,6 @@ class Plugin(): |
746 | 744 |
|
747 | 745 |
return (exit_code, output)
|
748 | 746 |
|
749 |
- def __message(self, message_type, brief, **kwargs):
|
|
750 |
- message = Message(self.__unique_id, message_type, brief, **kwargs)
|
|
751 |
- self.__context.message(message)
|
|
752 |
- |
|
753 | 747 |
def __note_command(self, output, *popenargs, **kwargs):
|
754 | 748 |
workdir = os.getcwd()
|
755 | 749 |
if 'cwd' in kwargs:
|