[pitivi] pipeline: Simplify the async_done cycle



commit f59f543f088316d7d52957e6d234ada12c54f4af
Author: Alexandru Băluț <alexandru balut gmail com>
Date:   Tue Nov 22 21:49:10 2016 +0100

    pipeline: Simplify the async_done cycle
    
    The _timeout_async_id field identifies the timer which checks whether
    the Gst.MessageType.ASYNC_DONE has been received, implying the pipeline
    expects an async-done event.
    The _waiting_for_async_done field indicates whether the timer has been
    started.
    
    When a timeout occured, the _timeout_async_id is being unset and the
    recovery procedure is started. Hopefully this recovery procedure ends up
    in an ASYNC_DONE being pushed which leads to _waiting_for_async_done to
    be unset.
    
    In practice we don't need both fields. When a timeout occurs, the
    recovery procedure should set the state to PAUSED, which means a new
    timer is started, so then _timeout_async_id is non-zero.
    
    This commit removes the _waiting_for_async_done field.
    
    Reviewed-by: Thibault Saunier <tsaunier gnome org>
    Differential Revision: https://phabricator.freedesktop.org/D1550

 pitivi/utils/pipeline.py |   56 ++++++++++++++++++++++++------------------
 pitivi/utils/validate.py |    2 +-
 tests/test_pipeline.py   |   60 ++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 93 insertions(+), 25 deletions(-)
---
diff --git a/pitivi/utils/pipeline.py b/pitivi/utils/pipeline.py
index 7670ebf..13ad99c 100644
--- a/pitivi/utils/pipeline.py
+++ b/pitivi/utils/pipeline.py
@@ -94,7 +94,6 @@ class SimplePipeline(GObject.Object, Loggable):
         self._last_position = int(0 * Gst.SECOND)
         self._recovery_state = self.RecoveryState.NOT_RECOVERING
         self._attempted_recoveries = 0
-        self._waiting_for_async_done = False
         self._next_seek = None
         self._timeout_async_id = 0
         self._force_position_listener = False
@@ -150,9 +149,10 @@ class SimplePipeline(GObject.Object, Loggable):
 
         Raises:
             PipelineError: If the low-level pipeline could not be changed to
-                the requested state.
+                the requested state. In this case the state is set to
+                Gst.State.NULL
         """
-        self.debug("state set to: %r", state)
+        self.debug("Setting state to: %r", state)
         if state >= Gst.State.PAUSED:
             cstate = self.getState()
             if cstate < Gst.State.PAUSED:
@@ -299,16 +299,34 @@ class SimplePipeline(GObject.Object, Loggable):
         return False
 
     def _removeWaitingForAsyncDoneTimeout(self):
-        if self._timeout_async_id:
-            GLib.source_remove(self._timeout_async_id)
+        if not self._busy_async:
+            return
+        GLib.source_remove(self._timeout_async_id)
         self._timeout_async_id = 0
 
     def _addWaitingForAsyncDoneTimeout(self, timeout=WATCHDOG_TIMEOUT):
         self._removeWaitingForAsyncDoneTimeout()
-
         self._timeout_async_id = GLib.timeout_add_seconds(timeout,
                                                           self._async_done_not_received_cb)
-        self._waiting_for_async_done = True
+
+    @property
+    def _busy_async(self):
+        """Gets whether the pipeline is busy in the background.
+
+        The following operations are performed in the background:
+        - State changing from READY to PAUSED. For example a pipeline in
+          NULL set to PLAYING, goes through each intermediary state
+          including READY to PAUSED, so we consider it ASYNC.
+        - Seeking.
+        - Committing, but only if the timeline is not empty at the time of the commit.
+
+        When the pipeline is working in the background, no seek nor commit
+        should be performed.
+
+        Returns:
+            bool: True iff the pipeline is busy.
+        """
+        return bool(self._timeout_async_id)
 
     def simple_seek(self, position):
         """Seeks in the low-level pipeline to the specified position.
@@ -319,7 +337,7 @@ class SimplePipeline(GObject.Object, Loggable):
         Raises:
             PipelineError: When the seek fails.
         """
-        if self._waiting_for_async_done is True:
+        if self._busy_async:
             self._next_seek = position
             self.info("Setting next seek to %s", self._next_seek)
             return
@@ -336,7 +354,6 @@ class SimplePipeline(GObject.Object, Loggable):
                                   position,
                                   Gst.SeekType.NONE,
                                   -1)
-
         if not res:
             raise PipelineError(self.get_name() + " seek failed: " + str(position))
 
@@ -360,11 +377,10 @@ class SimplePipeline(GObject.Object, Loggable):
             self.pause()
             self.emit('eos')
         elif message.type == Gst.MessageType.STATE_CHANGED:
-            prev, new, pending = message.parse_state_changed()
-
             if message.src == self._pipeline:
+                prev, new, pending = message.parse_state_changed()
                 self.debug(
-                    "Pipeline change state prev: %r, new: %r, pending: %r", prev, new, pending)
+                    "Pipeline changed state. prev: %r, new: %r, pending: %r", prev, new, pending)
 
                 emit_state_change = pending == Gst.State.VOID_PENDING
                 if prev == Gst.State.READY and new == Gst.State.PAUSED:
@@ -408,12 +424,12 @@ class SimplePipeline(GObject.Object, Loggable):
             self.debug("Duration might have changed, querying it")
             GLib.idle_add(self._queryDurationAsync)
         elif message.type == Gst.MessageType.ASYNC_DONE:
+            self.debug("Async done, ready for action")
             self.emit("async-done")
             self._removeWaitingForAsyncDoneTimeout()
             if self._recovery_state == self.RecoveryState.SEEKED_AFTER_RECOVERING:
                 self._recovery_state = self.RecoveryState.NOT_RECOVERING
                 self._attempted_recoveries = 0
-            self._waiting_for_async_done = False
             self.__emitPosition()
             if self._next_seek is not None:
                 self.simple_seek(self._next_seek)
@@ -437,14 +453,6 @@ class SimplePipeline(GObject.Object, Loggable):
 
         return position
 
-    @property
-    def _waiting_for_async_done(self):
-        return self.__waiting_for_async_done
-
-    @_waiting_for_async_done.setter
-    def _waiting_for_async_done(self, value):
-        self.__waiting_for_async_done = value
-
     def _recover(self):
         if not self._bus:
             raise PipelineError("Should not try to recover after destroy")
@@ -615,8 +623,8 @@ class Pipeline(GES.Pipeline, SimplePipeline):
             SimplePipeline._busMessageCb(self, bus, message)
 
     def commit_timeline(self):
-        if self._waiting_for_async_done and not self._was_empty\
-                and not self.props.timeline.is_empty():
+        is_empty = self.props.timeline.is_empty()
+        if self._busy_async and not self._was_empty and not is_empty:
             self._commit_wanted = True
             self._was_empty = False
             self.debug("commit wanted")
@@ -624,7 +632,7 @@ class Pipeline(GES.Pipeline, SimplePipeline):
             self._addWaitingForAsyncDoneTimeout()
             self.props.timeline.commit()
             self.debug("Commiting right now")
-            self._was_empty = self.props.timeline.is_empty()
+            self._was_empty = is_empty
 
     def setState(self, state):
         SimplePipeline.setState(self, state)
diff --git a/pitivi/utils/validate.py b/pitivi/utils/validate.py
index 21a7e76..eecafd5 100644
--- a/pitivi/utils/validate.py
+++ b/pitivi/utils/validate.py
@@ -141,7 +141,7 @@ def stop(scenario, action):
 
 def positionChangedCb(pipeline, position, scenario, action,
                       wanted_position):
-    if pipeline._waiting_for_async_done:
+    if pipeline._busy_async:
         return
 
     if pipeline._next_seek:
diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py
index 3997749..b54ffb8 100644
--- a/tests/test_pipeline.py
+++ b/tests/test_pipeline.py
@@ -26,6 +26,7 @@ from gi.repository import Gst
 
 from pitivi.utils.pipeline import MAX_RECOVERIES
 from pitivi.utils.pipeline import Pipeline
+from pitivi.utils.pipeline import SimplePipeline
 from tests import common
 
 
@@ -71,3 +72,62 @@ class TestPipeline(common.TestCase):
             set_state.assert_not_called()
         self.assertTrue(pipeline_died_cb.called)
         self.assertEqual(pipe._attempted_recoveries, MAX_RECOVERIES)
+
+    def test_async_done_not_received(self):
+        """Checks the recovery when the ASYNC_DONE message timed out."""
+        ges_timeline = GES.Timeline()
+        self.assertTrue(ges_timeline.add_track(GES.VideoTrack.new()))
+        ges_layer = ges_timeline.append_layer()
+        uri = common.get_sample_uri("tears_of_steel.webm")
+        asset = GES.UriClipAsset.request_sync(uri)
+        ges_clip = asset.extract()
+        self.assertTrue(ges_layer.add_clip(ges_clip))
+        self.assertFalse(ges_timeline.is_empty())
+
+        pipe = Pipeline(app=common.create_pitivi_mock())
+
+        pipe.set_timeline(ges_timeline)
+        self.assertFalse(pipe._busy_async)
+        self.assertEqual(pipe._recovery_state, SimplePipeline.RecoveryState.NOT_RECOVERING)
+
+        # Pretend waiting for async-done timed out.
+        # We mock set_state because we don't actually care about the state,
+        # and setting the state to PAUSED could show a video window.
+        with mock.patch.object(pipe, "set_state"):
+            pipe._async_done_not_received_cb()
+        # Make sure the pipeline started a watchdog timer waiting for async-done
+        # as part of setting the state from NULL to PAUSED.
+        self.assertTrue(pipe._busy_async)
+        self.assertEqual(pipe._attempted_recoveries, 1)
+        self.assertEqual(pipe._recovery_state, SimplePipeline.RecoveryState.STARTED_RECOVERING)
+
+        # Pretend the state changed to READY.
+        message = mock.Mock()
+        message.type = Gst.MessageType.STATE_CHANGED
+        message.src = pipe._pipeline
+        message.parse_state_changed.return_value = (Gst.State.NULL, Gst.State.READY, Gst.State.PAUSED)
+        pipe._busMessageCb(None, message)
+
+        # Pretend the state changed to PAUSED.
+        message.parse_state_changed.return_value = (Gst.State.READY, Gst.State.PAUSED, 
Gst.State.VOID_PENDING)
+        self.assertEqual(pipe._next_seek, None)
+        pipe._busMessageCb(None, message)
+        self.assertEqual(pipe._recovery_state, SimplePipeline.RecoveryState.SEEKED_AFTER_RECOVERING)
+        self.assertTrue(pipe._busy_async)
+        # The pipeline should have tried to seek back to the last position.
+        self.assertEqual(pipe._next_seek, 0)
+
+        # Pretend the state change async operation finished.
+        message.type = Gst.MessageType.ASYNC_DONE
+        pipe._busMessageCb(None, message)
+        self.assertEqual(pipe._recovery_state, SimplePipeline.RecoveryState.NOT_RECOVERING)
+        # Should still be busy because of seeking to _next_seek.
+        self.assertTrue(pipe._busy_async)
+        self.assertIsNone(pipe._next_seek)
+
+        # Pretend the seek async operation finished.
+        message.type = Gst.MessageType.ASYNC_DONE
+        pipe._busMessageCb(None, message)
+        self.assertEqual(pipe._recovery_state, SimplePipeline.RecoveryState.NOT_RECOVERING)
+        self.assertFalse(pipe._busy_async)
+        self.assertIsNone(pipe._next_seek)


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]