[pitivi] pipeline: Simplify the async_done cycle
- From: Alexandru Băluț <alexbalut src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [pitivi] pipeline: Simplify the async_done cycle
- Date: Sat, 8 Jul 2017 19:11:26 +0000 (UTC)
commit f59f543f088316d7d52957e6d234ada12c54f4af
Author: Alexandru Băluț <alexandru balut gmail com>
Date: Tue Nov 22 21:49:10 2016 +0100
pipeline: Simplify the async_done cycle
The _timeout_async_id field identifies the timer which checks whether
the Gst.MessageType.ASYNC_DONE has been received, implying the pipeline
expects an async-done event.
The _waiting_for_async_done field indicates whether the timer has been
started.
When a timeout occured, the _timeout_async_id is being unset and the
recovery procedure is started. Hopefully this recovery procedure ends up
in an ASYNC_DONE being pushed which leads to _waiting_for_async_done to
be unset.
In practice we don't need both fields. When a timeout occurs, the
recovery procedure should set the state to PAUSED, which means a new
timer is started, so then _timeout_async_id is non-zero.
This commit removes the _waiting_for_async_done field.
Reviewed-by: Thibault Saunier <tsaunier gnome org>
Differential Revision: https://phabricator.freedesktop.org/D1550
pitivi/utils/pipeline.py | 56 ++++++++++++++++++++++++------------------
pitivi/utils/validate.py | 2 +-
tests/test_pipeline.py | 60 ++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 93 insertions(+), 25 deletions(-)
---
diff --git a/pitivi/utils/pipeline.py b/pitivi/utils/pipeline.py
index 7670ebf..13ad99c 100644
--- a/pitivi/utils/pipeline.py
+++ b/pitivi/utils/pipeline.py
@@ -94,7 +94,6 @@ class SimplePipeline(GObject.Object, Loggable):
self._last_position = int(0 * Gst.SECOND)
self._recovery_state = self.RecoveryState.NOT_RECOVERING
self._attempted_recoveries = 0
- self._waiting_for_async_done = False
self._next_seek = None
self._timeout_async_id = 0
self._force_position_listener = False
@@ -150,9 +149,10 @@ class SimplePipeline(GObject.Object, Loggable):
Raises:
PipelineError: If the low-level pipeline could not be changed to
- the requested state.
+ the requested state. In this case the state is set to
+ Gst.State.NULL
"""
- self.debug("state set to: %r", state)
+ self.debug("Setting state to: %r", state)
if state >= Gst.State.PAUSED:
cstate = self.getState()
if cstate < Gst.State.PAUSED:
@@ -299,16 +299,34 @@ class SimplePipeline(GObject.Object, Loggable):
return False
def _removeWaitingForAsyncDoneTimeout(self):
- if self._timeout_async_id:
- GLib.source_remove(self._timeout_async_id)
+ if not self._busy_async:
+ return
+ GLib.source_remove(self._timeout_async_id)
self._timeout_async_id = 0
def _addWaitingForAsyncDoneTimeout(self, timeout=WATCHDOG_TIMEOUT):
self._removeWaitingForAsyncDoneTimeout()
-
self._timeout_async_id = GLib.timeout_add_seconds(timeout,
self._async_done_not_received_cb)
- self._waiting_for_async_done = True
+
+ @property
+ def _busy_async(self):
+ """Gets whether the pipeline is busy in the background.
+
+ The following operations are performed in the background:
+ - State changing from READY to PAUSED. For example a pipeline in
+ NULL set to PLAYING, goes through each intermediary state
+ including READY to PAUSED, so we consider it ASYNC.
+ - Seeking.
+ - Committing, but only if the timeline is not empty at the time of the commit.
+
+ When the pipeline is working in the background, no seek nor commit
+ should be performed.
+
+ Returns:
+ bool: True iff the pipeline is busy.
+ """
+ return bool(self._timeout_async_id)
def simple_seek(self, position):
"""Seeks in the low-level pipeline to the specified position.
@@ -319,7 +337,7 @@ class SimplePipeline(GObject.Object, Loggable):
Raises:
PipelineError: When the seek fails.
"""
- if self._waiting_for_async_done is True:
+ if self._busy_async:
self._next_seek = position
self.info("Setting next seek to %s", self._next_seek)
return
@@ -336,7 +354,6 @@ class SimplePipeline(GObject.Object, Loggable):
position,
Gst.SeekType.NONE,
-1)
-
if not res:
raise PipelineError(self.get_name() + " seek failed: " + str(position))
@@ -360,11 +377,10 @@ class SimplePipeline(GObject.Object, Loggable):
self.pause()
self.emit('eos')
elif message.type == Gst.MessageType.STATE_CHANGED:
- prev, new, pending = message.parse_state_changed()
-
if message.src == self._pipeline:
+ prev, new, pending = message.parse_state_changed()
self.debug(
- "Pipeline change state prev: %r, new: %r, pending: %r", prev, new, pending)
+ "Pipeline changed state. prev: %r, new: %r, pending: %r", prev, new, pending)
emit_state_change = pending == Gst.State.VOID_PENDING
if prev == Gst.State.READY and new == Gst.State.PAUSED:
@@ -408,12 +424,12 @@ class SimplePipeline(GObject.Object, Loggable):
self.debug("Duration might have changed, querying it")
GLib.idle_add(self._queryDurationAsync)
elif message.type == Gst.MessageType.ASYNC_DONE:
+ self.debug("Async done, ready for action")
self.emit("async-done")
self._removeWaitingForAsyncDoneTimeout()
if self._recovery_state == self.RecoveryState.SEEKED_AFTER_RECOVERING:
self._recovery_state = self.RecoveryState.NOT_RECOVERING
self._attempted_recoveries = 0
- self._waiting_for_async_done = False
self.__emitPosition()
if self._next_seek is not None:
self.simple_seek(self._next_seek)
@@ -437,14 +453,6 @@ class SimplePipeline(GObject.Object, Loggable):
return position
- @property
- def _waiting_for_async_done(self):
- return self.__waiting_for_async_done
-
- @_waiting_for_async_done.setter
- def _waiting_for_async_done(self, value):
- self.__waiting_for_async_done = value
-
def _recover(self):
if not self._bus:
raise PipelineError("Should not try to recover after destroy")
@@ -615,8 +623,8 @@ class Pipeline(GES.Pipeline, SimplePipeline):
SimplePipeline._busMessageCb(self, bus, message)
def commit_timeline(self):
- if self._waiting_for_async_done and not self._was_empty\
- and not self.props.timeline.is_empty():
+ is_empty = self.props.timeline.is_empty()
+ if self._busy_async and not self._was_empty and not is_empty:
self._commit_wanted = True
self._was_empty = False
self.debug("commit wanted")
@@ -624,7 +632,7 @@ class Pipeline(GES.Pipeline, SimplePipeline):
self._addWaitingForAsyncDoneTimeout()
self.props.timeline.commit()
self.debug("Commiting right now")
- self._was_empty = self.props.timeline.is_empty()
+ self._was_empty = is_empty
def setState(self, state):
SimplePipeline.setState(self, state)
diff --git a/pitivi/utils/validate.py b/pitivi/utils/validate.py
index 21a7e76..eecafd5 100644
--- a/pitivi/utils/validate.py
+++ b/pitivi/utils/validate.py
@@ -141,7 +141,7 @@ def stop(scenario, action):
def positionChangedCb(pipeline, position, scenario, action,
wanted_position):
- if pipeline._waiting_for_async_done:
+ if pipeline._busy_async:
return
if pipeline._next_seek:
diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py
index 3997749..b54ffb8 100644
--- a/tests/test_pipeline.py
+++ b/tests/test_pipeline.py
@@ -26,6 +26,7 @@ from gi.repository import Gst
from pitivi.utils.pipeline import MAX_RECOVERIES
from pitivi.utils.pipeline import Pipeline
+from pitivi.utils.pipeline import SimplePipeline
from tests import common
@@ -71,3 +72,62 @@ class TestPipeline(common.TestCase):
set_state.assert_not_called()
self.assertTrue(pipeline_died_cb.called)
self.assertEqual(pipe._attempted_recoveries, MAX_RECOVERIES)
+
+ def test_async_done_not_received(self):
+ """Checks the recovery when the ASYNC_DONE message timed out."""
+ ges_timeline = GES.Timeline()
+ self.assertTrue(ges_timeline.add_track(GES.VideoTrack.new()))
+ ges_layer = ges_timeline.append_layer()
+ uri = common.get_sample_uri("tears_of_steel.webm")
+ asset = GES.UriClipAsset.request_sync(uri)
+ ges_clip = asset.extract()
+ self.assertTrue(ges_layer.add_clip(ges_clip))
+ self.assertFalse(ges_timeline.is_empty())
+
+ pipe = Pipeline(app=common.create_pitivi_mock())
+
+ pipe.set_timeline(ges_timeline)
+ self.assertFalse(pipe._busy_async)
+ self.assertEqual(pipe._recovery_state, SimplePipeline.RecoveryState.NOT_RECOVERING)
+
+ # Pretend waiting for async-done timed out.
+ # We mock set_state because we don't actually care about the state,
+ # and setting the state to PAUSED could show a video window.
+ with mock.patch.object(pipe, "set_state"):
+ pipe._async_done_not_received_cb()
+ # Make sure the pipeline started a watchdog timer waiting for async-done
+ # as part of setting the state from NULL to PAUSED.
+ self.assertTrue(pipe._busy_async)
+ self.assertEqual(pipe._attempted_recoveries, 1)
+ self.assertEqual(pipe._recovery_state, SimplePipeline.RecoveryState.STARTED_RECOVERING)
+
+ # Pretend the state changed to READY.
+ message = mock.Mock()
+ message.type = Gst.MessageType.STATE_CHANGED
+ message.src = pipe._pipeline
+ message.parse_state_changed.return_value = (Gst.State.NULL, Gst.State.READY, Gst.State.PAUSED)
+ pipe._busMessageCb(None, message)
+
+ # Pretend the state changed to PAUSED.
+ message.parse_state_changed.return_value = (Gst.State.READY, Gst.State.PAUSED,
Gst.State.VOID_PENDING)
+ self.assertEqual(pipe._next_seek, None)
+ pipe._busMessageCb(None, message)
+ self.assertEqual(pipe._recovery_state, SimplePipeline.RecoveryState.SEEKED_AFTER_RECOVERING)
+ self.assertTrue(pipe._busy_async)
+ # The pipeline should have tried to seek back to the last position.
+ self.assertEqual(pipe._next_seek, 0)
+
+ # Pretend the state change async operation finished.
+ message.type = Gst.MessageType.ASYNC_DONE
+ pipe._busMessageCb(None, message)
+ self.assertEqual(pipe._recovery_state, SimplePipeline.RecoveryState.NOT_RECOVERING)
+ # Should still be busy because of seeking to _next_seek.
+ self.assertTrue(pipe._busy_async)
+ self.assertIsNone(pipe._next_seek)
+
+ # Pretend the seek async operation finished.
+ message.type = Gst.MessageType.ASYNC_DONE
+ pipe._busMessageCb(None, message)
+ self.assertEqual(pipe._recovery_state, SimplePipeline.RecoveryState.NOT_RECOVERING)
+ self.assertFalse(pipe._busy_async)
+ self.assertIsNone(pipe._next_seek)
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]