[pitivi] pipeline: Cleanup the pipeline recovery feature



commit 25bd619aad9e6e099336b2a642b00018837a9e8c
Author: Thibault Saunier <tsaunier gnome org>
Date:   Wed Nov 26 14:32:36 2014 +0100

    pipeline: Cleanup the pipeline recovery feature
    
    Make sure that we wait for ASYNC_DONE after we set a pipeline
    from < PAUSED to PAUSED
    
    Max recoveries goes from 5 to 3, we let quite a lot of time for the
    pipeline to recover and the user would be bored at that point!

 pitivi/utils/pipeline.py |   62 +++++++++++++++++++++++++++++++---------------
 1 files changed, 42 insertions(+), 20 deletions(-)
---
diff --git a/pitivi/utils/pipeline.py b/pitivi/utils/pipeline.py
index fdadeae..b0714f9 100644
--- a/pitivi/utils/pipeline.py
+++ b/pitivi/utils/pipeline.py
@@ -36,8 +36,6 @@ from pitivi.utils.loggable import Loggable
 from pitivi.utils.misc import format_ns
 
 
-MAX_RECOVERIES = 5
-
 PIPELINE_SIGNALS = {
     "state-change": (GObject.SignalFlags.RUN_LAST, None, (GObject.TYPE_INT, GObject.TYPE_INT)),
     "position": (GObject.SignalFlags.RUN_LAST, None, (GObject.TYPE_UINT64,)),
@@ -46,6 +44,10 @@ PIPELINE_SIGNALS = {
     "error": (GObject.SignalFlags.RUN_LAST, None, (GObject.TYPE_STRING, GObject.TYPE_STRING)),
 }
 
+MAX_RECOVERIES = 3
+MAX_BRINGING_TO_PAUSED_DURATION = 5
+MAX_SET_STATE_DURATION = 1
+
 
 class PipelineError(Exception):
     pass
@@ -179,7 +181,7 @@ class SimplePipeline(GObject.Object, Loggable):
         self.lastPosition = int(0 * Gst.SECOND)
         self.pendingRecovery = False
         self._attempted_recoveries = 0
-        self._waiting_for_async_done = True
+        self._waiting_for_async_done = False
         self._next_seek = None
         self._timeout_async_id = 0
 
@@ -224,6 +226,18 @@ class SimplePipeline(GObject.Object, Loggable):
         the requested state.
         """
         self.debug("state set to: %r", state)
+        if state >= Gst.State.PAUSED:
+            cstate = self.getState()
+            if cstate < Gst.State.PAUSED:
+                if cstate == Gst.State.NULL:
+                    timeout = MAX_BRINGING_TO_PAUSED_DURATION
+                else:
+                    timeout = MAX_SET_STATE_DURATION
+
+                self._addWaitingForAsyncDoneTimeout(timeout)
+        else:
+            self._removeWaitingForAsyncDoneTimeout()
+
         res = self._pipeline.set_state(state)
         if res == Gst.StateChangeReturn.FAILURE:
             # reset to NULL
@@ -371,14 +385,24 @@ class SimplePipeline(GObject.Object, Loggable):
             GLib.source_remove(self._listeningSigId)
             self._listeningSigId = 0
 
-    def _resetWaitingForAsyncDone(self):
-        self.warning("we didn't get async done, this is a bug")
-        self._waiting_for_async_done = False
-
+    def _asyncDoneNotReceivedCb(self):
+        self.error("we didn't get async done, this is a bug")
         # Source is being removed
-        self._timeout_async_id = 0
+        self._removeWaitingForAsyncDoneTimeout()
         return False
 
+    def _removeWaitingForAsyncDoneTimeout(self):
+        if self._timeout_async_id:
+            GLib.source_remove(self._timeout_async_id)
+        self._timeout_async_id = 0
+
+    def _addWaitingForAsyncDoneTimeout(self, timeout=3):
+        self._removeWaitingForAsyncDoneTimeout()
+
+        self._timeout_async_id = GLib.timeout_add_seconds(timeout,
+                                                          self._asyncDoneNotReceivedCb)
+        self._waiting_for_async_done = True
+
     def simple_seek(self, position):
         """
         Seeks in the L{Pipeline} to the given position.
@@ -400,11 +424,7 @@ class SimplePipeline(GObject.Object, Loggable):
         res = self._pipeline.seek(1.0, Gst.Format.TIME, Gst.SeekFlags.FLUSH,
                                   Gst.SeekType.SET, position,
                                   Gst.SeekType.NONE, -1)
-        self._waiting_for_async_done = True
-
-        if self._timeout_async_id:
-            GLib.source_remove(self._timeout_async_id)
-        self._timeout_async_id = GLib.timeout_add(1000, self._resetWaitingForAsyncDone)
+        self._addWaitingForAsyncDoneTimeout()
 
         if not res:
             raise PipelineError(self.get_name() + " seek failed: " + str(position))
@@ -468,21 +488,23 @@ class SimplePipeline(GObject.Object, Loggable):
             if self._next_seek is not None:
                 self.simple_seek(self._next_seek)
                 self._next_seek = None
-            if self._timeout_async_id:
-                GLib.source_remove(self._timeout_async_id)
-                self._timeout_async_id = 0
+            self._removeWaitingForAsyncDoneTimeout()
         else:
             self.log("%s [%r]", message.type, message.src)
 
     def _recover(self):
         if self._attempted_recoveries > MAX_RECOVERIES:
-            self.warning(
+            self.error(
                 "Pipeline error detected multiple times in a row, not resetting anymore")
             return
-        self.warning("Pipeline error detected during playback, resetting")
+
+        self.error("Pipeline error detected during playback, resetting"
+                   " -- num tries: %d", self._attempted_recoveries)
+
+        self.setState(Gst.State.NULL)
         self.pendingRecovery = True
-        self._pipeline.set_state(Gst.State.NULL)
-        self._pipeline.set_state(Gst.State.PAUSED)
+        self.pause()
+
         self._attempted_recoveries += 1
 
     def _queryDurationAsync(self, *unused_args, **unused_kwargs):


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]