| ... |
... |
@@ -58,10 +58,10 @@ class JobStatus(): |
|
58
|
58
|
|
|
59
|
59
|
|
|
60
|
60
|
# Used to distinguish between status messages and return values
|
|
61
|
|
-class Envelope():
|
|
|
61
|
+class _Envelope():
|
|
62
|
62
|
def __init__(self, message_type, message):
|
|
63
|
|
- self._message_type = message_type
|
|
64
|
|
- self._message = message
|
|
|
63
|
+ self.message_type = message_type
|
|
|
64
|
+ self.message = message
|
|
65
|
65
|
|
|
66
|
66
|
|
|
67
|
67
|
# Process class that doesn't call waitpid on its own.
|
| ... |
... |
@@ -275,10 +275,37 @@ class Job(): |
|
275
|
275
|
def set_task_id(self, task_id):
|
|
276
|
276
|
self._task_id = task_id
|
|
277
|
277
|
|
|
|
278
|
+ # send_message()
|
|
|
279
|
+ #
|
|
|
280
|
+ # To be called from inside Job.child_process() implementations
|
|
|
281
|
+ # to send messages to the main process during processing.
|
|
|
282
|
+ #
|
|
|
283
|
+ # These messages will be processed by the class's Job.handle_message()
|
|
|
284
|
+ # implementation.
|
|
|
285
|
+ #
|
|
|
286
|
+ def send_message(self, message_type, message):
|
|
|
287
|
+ self._queue.put(_Envelope(message_type, message))
|
|
|
288
|
+
|
|
278
|
289
|
#######################################################
|
|
279
|
290
|
# Abstract Methods #
|
|
280
|
291
|
#######################################################
|
|
281
|
292
|
|
|
|
293
|
+ # handle_message()
|
|
|
294
|
+ #
|
|
|
295
|
+ # Handle a custom message. This will be called in the main process in
|
|
|
296
|
+ # response to any messages sent to the main proces using the
|
|
|
297
|
+ # Job.send_message() API from inside a Job.child_process() implementation
|
|
|
298
|
+ #
|
|
|
299
|
+ # Args:
|
|
|
300
|
+ # message_type (str): A string to identify the message type
|
|
|
301
|
+ # message (any): A simple serializable object
|
|
|
302
|
+ #
|
|
|
303
|
+ # Returns:
|
|
|
304
|
+ # (bool): Should return a truthy value if message_type is handled.
|
|
|
305
|
+ #
|
|
|
306
|
+ def handle_message(self, message_type, message):
|
|
|
307
|
+ return False
|
|
|
308
|
+
|
|
282
|
309
|
# parent_complete()
|
|
283
|
310
|
#
|
|
284
|
311
|
# This will be executed after the job finishes, and is expected to
|
| ... |
... |
@@ -416,7 +443,7 @@ class Job(): |
|
416
|
443
|
elapsed=elapsed, detail=e.detail,
|
|
417
|
444
|
logfile=filename, sandbox=e.sandbox)
|
|
418
|
445
|
|
|
419
|
|
- self._queue.put(Envelope('child_data', self.child_process_data()))
|
|
|
446
|
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
|
|
420
|
447
|
|
|
421
|
448
|
# Report the exception to the parent (for internal testing purposes)
|
|
422
|
449
|
self._child_send_error(e)
|
| ... |
... |
@@ -442,7 +469,7 @@ class Job(): |
|
442
|
469
|
|
|
443
|
470
|
else:
|
|
444
|
471
|
# No exception occurred in the action
|
|
445
|
|
- self._queue.put(Envelope('child_data', self.child_process_data()))
|
|
|
472
|
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
|
|
446
|
473
|
self._child_send_result(result)
|
|
447
|
474
|
|
|
448
|
475
|
elapsed = datetime.datetime.now() - starttime
|
| ... |
... |
@@ -469,7 +496,7 @@ class Job(): |
|
469
|
496
|
domain = e.domain
|
|
470
|
497
|
reason = e.reason
|
|
471
|
498
|
|
|
472
|
|
- envelope = Envelope('error', {
|
|
|
499
|
+ envelope = _Envelope('error', {
|
|
473
|
500
|
'domain': domain,
|
|
474
|
501
|
'reason': reason
|
|
475
|
502
|
})
|
| ... |
... |
@@ -487,7 +514,7 @@ class Job(): |
|
487
|
514
|
#
|
|
488
|
515
|
def _child_send_result(self, result):
|
|
489
|
516
|
if result is not None:
|
|
490
|
|
- envelope = Envelope('result', result)
|
|
|
517
|
+ envelope = _Envelope('result', result)
|
|
491
|
518
|
self._queue.put(envelope)
|
|
492
|
519
|
|
|
493
|
520
|
# _child_shutdown()
|
| ... |
... |
@@ -524,7 +551,7 @@ class Job(): |
|
524
|
551
|
if message.message_type == MessageType.LOG:
|
|
525
|
552
|
return
|
|
526
|
553
|
|
|
527
|
|
- self._queue.put(Envelope('message', message))
|
|
|
554
|
+ self._queue.put(_Envelope('message', message))
|
|
528
|
555
|
|
|
529
|
556
|
# _parent_shutdown()
|
|
530
|
557
|
#
|
| ... |
... |
@@ -588,24 +615,28 @@ class Job(): |
|
588
|
615
|
if not self._listening:
|
|
589
|
616
|
return
|
|
590
|
617
|
|
|
591
|
|
- if envelope._message_type == 'message':
|
|
|
618
|
+ if envelope.message_type == 'message':
|
|
592
|
619
|
# Propagate received messages from children
|
|
593
|
620
|
# back through the context.
|
|
594
|
|
- self._scheduler.context.message(envelope._message)
|
|
595
|
|
- elif envelope._message_type == 'error':
|
|
|
621
|
+ self._scheduler.context.message(envelope.message)
|
|
|
622
|
+ elif envelope.message_type == 'error':
|
|
596
|
623
|
# For regression tests only, save the last error domain / reason
|
|
597
|
624
|
# reported from a child task in the main process, this global state
|
|
598
|
625
|
# is currently managed in _exceptions.py
|
|
599
|
|
- set_last_task_error(envelope._message['domain'],
|
|
600
|
|
- envelope._message['reason'])
|
|
601
|
|
- elif envelope._message_type == 'result':
|
|
|
626
|
+ set_last_task_error(envelope.message['domain'],
|
|
|
627
|
+ envelope.message['reason'])
|
|
|
628
|
+ elif envelope.message_type == 'result':
|
|
602
|
629
|
assert self._result is None
|
|
603
|
|
- self._result = envelope._message
|
|
604
|
|
- elif envelope._message_type == 'child_data':
|
|
|
630
|
+ self._result = envelope.message
|
|
|
631
|
+ elif envelope.message_type == 'child_data':
|
|
605
|
632
|
# If we retry a job, we assign a new value to this
|
|
606
|
|
- self.child_data = envelope._message
|
|
607
|
|
- else:
|
|
608
|
|
- raise Exception()
|
|
|
633
|
+ self.child_data = envelope.message
|
|
|
634
|
+
|
|
|
635
|
+ # Try Job subclass specific messages now
|
|
|
636
|
+ elif not self.handle_message(envelope.message_type,
|
|
|
637
|
+ envelope.message):
|
|
|
638
|
+ assert 0, "Unhandled message type '{}': {}" \
|
|
|
639
|
+ .format(envelope.message_type, envelope.message)
|
|
609
|
640
|
|
|
610
|
641
|
# _parent_process_queue()
|
|
611
|
642
|
#
|