[pitivi] Refactor state change and position reporting code in Pipeline. Fixes #575123.



commit 22e7160fac081ba36f20f13c9d91de301f628409
Author: Alessandro Decina <alessandro decina collabora co uk>
Date:   Wed Mar 18 18:23:10 2009 +0100

    Refactor state change and position reporting code in Pipeline. Fixes #575123.
---
 pitivi/factories/base.py      |    2 +
 pitivi/pipeline.py            |   76 +++++++++++++++------------------------
 tests/common.py               |    2 +
 tests/test_action.py          |    1 +
 tests/test_pipeline.py        |   78 ++++++++++++++++++-----------------------
 tests/test_pipeline_action.py |    1 -
 6 files changed, 68 insertions(+), 92 deletions(-)

diff --git a/pitivi/factories/base.py b/pitivi/factories/base.py
index 639a3b3..7a53729 100644
--- a/pitivi/factories/base.py
+++ b/pitivi/factories/base.py
@@ -364,6 +364,7 @@ class SinkFactory(ObjectFactory):
         Some factories can create a limited number of bins or implement caching.
         You should call C{releaseBin} once you are done using a bin.
         """
+        bin.set_state(gst.STATE_NULL)
         self._releaseBin(bin)
         if bin in self.bins:
             self.bins.remove(bin)
@@ -456,6 +457,7 @@ class OperationFactory(ObjectFactory):
         Some factories can create a limited number of bins or implement caching.
         You should call C{releaseBin} once you are done using a bin.
         """
+        bin.set_state(gst.STATE_NULL)
         self._releaseBin(bin)
         if bin in self.bins:
             self.bins.remove(bin)
diff --git a/pitivi/pipeline.py b/pitivi/pipeline.py
index 7f76189..e76e62e 100644
--- a/pitivi/pipeline.py
+++ b/pitivi/pipeline.py
@@ -109,9 +109,6 @@ class Pipeline(object, Signallable, Loggable):
      - C{eos} : The Pipeline has finished playing.
      - C{error} : An error happened.
 
-    @ivar state: The current state. This is a cached value, use getState() for
-    the exact actual C{gst.State} of the L{Pipeline}.
-    @type state: C{gst.State}
     @ivar actions: The Action(s) currently used.
     @type actions: List of L{Action}
     @ivar factories: The ObjectFactories handled by the Pipeline.
@@ -148,7 +145,6 @@ class Pipeline(object, Signallable, Loggable):
         self._bus.set_sync_handler(self._busSyncMessageHandler)
         self.factories = {}
         self.actions = []
-        self._state = STATE_NULL
         self._listening = False # for the position handler
         self._listeningInterval = 300 # default 300ms
         self._listeningSigId = 0
@@ -245,8 +241,10 @@ class Pipeline(object, Signallable, Loggable):
         if not action in self.actions:
             self.debug("action not controlled by this Pipeline, returning")
             return
-        if action.isActive() and self._state in [STATE_PAUSED, STATE_PLAYING]:
-            raise PipelineError("Active actions can't be removed from PLAYING or PAUSED Pipeline")
+        if action.isActive():
+            res, current, pending = self._pipeline.get_state(0)
+            if current > STATE_READY or pending > STATE_READY:
+                raise PipelineError("Active actions can't be removed from PLAYING or PAUSED Pipeline")
         try:
             action.unsetPipeline()
         except ActionError:
@@ -264,18 +262,9 @@ class Pipeline(object, Signallable, Loggable):
         the requested state.
         """
         self.debug("state:%r", state)
-        if self._state == state:
-            self.debug("Already at the required state, returning")
-            return
         res = self._pipeline.set_state(state)
         if res == gst.STATE_CHANGE_FAILURE:
             raise PipelineError("Failure changing state of the gst.Pipeline")
-        if res == gst.STATE_CHANGE_SUCCESS:
-            # the change to the request state was successful and not asynchronous
-            self._state = state
-            self.emit('state-changed', self._state)
-            # update position listener status
-            self._listenToPosition(state in [STATE_PAUSED, STATE_PLAYING])
 
     def getState(self):
         """
@@ -289,24 +278,7 @@ class Pipeline(object, Signallable, Loggable):
         """
         change, state, pending = self._pipeline.get_state(0)
         self.debug("change:%r, state:%r, pending:%r", change, state, pending)
-        if change != gst.STATE_CHANGE_FAILURE and state != self._state:
-            self._state = state
-            self.emit('state-changed', self._state)
-            # update position listener status
-            self._listenToPosition(state in [STATE_PAUSED, STATE_PLAYING])
-        self.debug("Returning %r", self._state)
-        return self._state
-
-    @property
-    def state(self):
-        """
-        The state of the L{Pipeline}.
-
-        @warning: This doesn't query the underlying C{gst.Pipeline} but returns the cached
-        state.
-        """
-        self.debug("Returning state %r", self._state)
-        return self._state
+        return state
 
     def play(self):
         """
@@ -377,7 +349,7 @@ class Pipeline(object, Signallable, Loggable):
         self._listening = True
         self._listeningInterval = interval
         # if we're in paused or playing, switch it on
-        self._listenToPosition(self.getState() in [STATE_PAUSED, STATE_PLAYING])
+        self._listenToPosition(self.getState() == STATE_PLAYING)
         return True
 
     def deactivatePositionListener(self):
@@ -423,7 +395,7 @@ class Pipeline(object, Signallable, Loggable):
         else:
             self.debug("position : %d , format:%d", position, format)
         # FIXME : temporarily deactivate position listener
-        self._listenToPosition(False)
+        #self._listenToPosition(False)
 
         res = self._pipeline.seek(1.0, format, gst.SEEK_FLAG_FLUSH,
                                   gst.SEEK_TYPE_SET, position,
@@ -443,7 +415,10 @@ class Pipeline(object, Signallable, Loggable):
             if not create:
                 raise PipelineError()
 
-            if self._state > STATE_READY and isinstance(factory, SourceFactory):
+            change, current, pending = self._pipeline.get_state(0)
+
+            if (current > STATE_READY or pending > STATE_READY) and \
+                    isinstance(factory, SourceFactory):
                 raise PipelineError("Pipeline not in NULL/READY,"
                         " can not create source bin")
 
@@ -574,6 +549,7 @@ class Pipeline(object, Signallable, Loggable):
 
             # ask the factory to finish cleanup
             factory_entry.factory.releaseBin(bin_stream_entry.bin)
+
             bin_stream_entry.bin = None
             if not factory_entry.streams:
                 del self.factories[factory_entry.factory]
@@ -786,17 +762,23 @@ class Pipeline(object, Signallable, Loggable):
         elif message.type == gst.MESSAGE_STATE_CHANGED and message.src == self._pipeline:
             prev, new, pending = message.parse_state_changed()
             self.debug("Pipeline change state prev:%r, new:%r, pending:%r", prev, new, pending)
-            if pending == gst.STATE_VOID_PENDING and self._state != new:
-                self._state = new
-                if self.state in [STATE_PAUSED, STATE_PLAYING]:
-                    # trigger duration-changed
-                    try:
-                        self.getDuration()
-                    except PipelineError:
-                        # no sinks??
-                        pass
-                self._listenToPosition(self._state in [STATE_PAUSED, STATE_PLAYING])
-                self.emit('state-changed', self._state)
+
+            emit_state_change = pending == gst.STATE_VOID_PENDING
+            if prev == STATE_READY and new == STATE_PAUSED:
+                # trigger duration-changed
+                try:
+                    self.getDuration()
+                except PipelineError:
+                    # no sinks??
+                    pass
+            elif prev == STATE_PAUSED and new == STATE_PLAYING:
+                self._listenToPosition(True)
+            elif prev == STATE_PLAYING and new == STATE_PAUSED:
+                self._listenToPosition(False)
+
+            if emit_state_change:
+                self.emit('state-changed', new)
+
         elif message.type == gst.MESSAGE_ERROR:
             error, detail = message.parse_error()
             self._handleErrorMessage(error, detail, message.src)
diff --git a/tests/common.py b/tests/common.py
index 4191dcd..e4ae89f 100644
--- a/tests/common.py
+++ b/tests/common.py
@@ -34,6 +34,8 @@ class TestCase(unittest.TestCase):
         objs = gc.get_objects()
         for c in self._tracked_types:
             new.extend([o for o in objs if isinstance(o, c) and not o in self._tracked[c]])
+        del objs
+        gc.collect()
 
         self.failIf(new, new)
         del self._tracked
diff --git a/tests/test_action.py b/tests/test_action.py
index 591fbe3..f3cd6ca 100644
--- a/tests/test_action.py
+++ b/tests/test_action.py
@@ -226,6 +226,7 @@ class TestRenderAction(common.TestCase):
         a.unsetPipeline()
         p.release()
 
+
     def testSimpleStreams(self):
         """Test a RenderSettings with exact stream settings"""
         # let's force the video to some unusual size
diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py
index a4a5bd2..2a8d5a9 100644
--- a/tests/test_pipeline.py
+++ b/tests/test_pipeline.py
@@ -19,6 +19,8 @@
 # Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 # Boston, MA 02111-1307, USA.
 
+import gobject
+gobject.threads_init()
 import gst
 from unittest import main
 from pitivi.pipeline import Pipeline, STATE_NULL, STATE_READY, STATE_PAUSED, STATE_PLAYING, PipelineError
@@ -115,13 +117,13 @@ class TestPipeline(TestCase):
         # we can't remove active actions while in PAUSED/PLAYING
         self.pipeline.setState(STATE_PAUSED)
         ac1.state = STATE_ACTIVE
-        self.assertEquals(self.pipeline.state, STATE_PAUSED)
+        self.assertEquals(self.pipeline.getState(), STATE_PAUSED)
         self.failUnlessRaises(PipelineError, self.pipeline.removeAction, ac1)
 
         # but we can remove deactivated actions while in PAUSED/PLAYING
         self.pipeline.setState(STATE_PAUSED)
         ac1.state = STATE_NOT_ACTIVE
-        self.assertEquals(self.pipeline.state, STATE_PAUSED)
+        self.assertEquals(self.pipeline.getState(), STATE_PAUSED)
         self.pipeline.removeAction(ac1)
 
         # we can add actions while in PAUSED/PLAYING
@@ -133,64 +135,52 @@ class TestPipeline(TestCase):
         p2.release()
 
     def testStateChange(self):
-        """ State Changes """
+        loop = gobject.MainLoop()
+
+        bag = {"last_state": None}
+        def state_changed_cb(pipeline, state, bag, loop):
+            bag["last_state"] = state
+            loop.quit()
+
+        self.pipeline.connect('state-changed', state_changed_cb, bag, loop)
+
+        # playing
         self.pipeline.setState(STATE_PLAYING)
-        # change should have happened instantly... except not, because
-        # the bus is asynchronous. We are therefore not guaranteed when
-        # the message will be received on the mainloop bus.
-        # Not sure how to check that efficiently.
-        self.assertEquals(self.pipeline.getState(), STATE_PLAYING)
-        self.assertEquals(self.pipeline.state, STATE_PLAYING)
-
-        # the 'state-changed' signal should have been emitted with the
-        # correct state
+        loop.run()
+        self.failUnlessEqual(bag["last_state"], STATE_PLAYING)
+        self.failUnlessEqual(self.pipeline.getState(), STATE_PLAYING)
         self.assertEquals(self.monitor.state_changed_count, 1)
-        self.assertEquals(self.monitor.state_changed_collect, [(STATE_PLAYING, )])
 
-        # Setting to the same state again shouldn't change anything
+        # playing again
         self.pipeline.setState(STATE_PLAYING)
-        self.assertEquals(self.pipeline.getState(), STATE_PLAYING)
-        self.assertEquals(self.pipeline.state, STATE_PLAYING)
         self.assertEquals(self.monitor.state_changed_count, 1)
-        self.assertEquals(self.monitor.state_changed_collect, [(STATE_PLAYING, )])
 
-        # back to NULL
-        self.pipeline.setState(STATE_NULL)
-        self.assertEquals(self.pipeline.getState(), STATE_NULL)
-        self.assertEquals(self.pipeline.state, STATE_NULL)
+        # ready
+        self.pipeline.setState(STATE_READY)
+        loop.run()
+        self.failUnlessEqual(bag["last_state"], STATE_READY)
+        self.failUnlessEqual(self.pipeline.getState(), STATE_READY)
         self.assertEquals(self.monitor.state_changed_count, 2)
-        self.assertEquals(self.monitor.state_changed_collect, [(STATE_PLAYING, ),
-                                                               (STATE_NULL, )])
 
-        # .play()
+        # PLAYING
         self.pipeline.play()
-        self.assertEquals(self.pipeline.getState(), STATE_PLAYING)
-        self.assertEquals(self.pipeline.state, STATE_PLAYING)
+        loop.run()
+        self.failUnlessEqual(bag["last_state"], STATE_PLAYING)
+        self.failUnlessEqual(self.pipeline.getState(), STATE_PLAYING)
         self.assertEquals(self.monitor.state_changed_count, 3)
-        self.assertEquals(self.monitor.state_changed_collect, [(STATE_PLAYING, ),
-                                                               (STATE_NULL, ),
-                                                               (STATE_PLAYING, )])
 
-        # .pause()
+        # PAUSE
         self.pipeline.pause()
-        self.assertEquals(self.pipeline.getState(), STATE_PAUSED)
-        self.assertEquals(self.pipeline.state, STATE_PAUSED)
+        loop.run()
+        self.failUnlessEqual(bag["last_state"], STATE_PAUSED)
+        self.failUnlessEqual(self.pipeline.getState(), STATE_PAUSED)
         self.assertEquals(self.monitor.state_changed_count, 4)
-        self.assertEquals(self.monitor.state_changed_collect, [(STATE_PLAYING, ),
-                                                               (STATE_NULL, ),
-                                                               (STATE_PLAYING, ),
-                                                               (STATE_PAUSED, )])
 
-        # .stop()
         self.pipeline.stop()
-        self.assertEquals(self.pipeline.getState(), STATE_READY)
-        self.assertEquals(self.pipeline.state, STATE_READY)
+        loop.run()
+        self.failUnlessEqual(bag["last_state"], STATE_READY)
+        self.failUnlessEqual(self.pipeline.getState(), STATE_READY)
         self.assertEquals(self.monitor.state_changed_count, 5)
-        self.assertEquals(self.monitor.state_changed_collect, [(STATE_PLAYING, ),
-                                                               (STATE_NULL, ),
-                                                               (STATE_PLAYING, ),
-                                                               (STATE_PAUSED, ),
-                                                               (STATE_READY, )])
 
     def testGetReleaseBinForFactoryStream(self):
         factory = FakeSourceFactory()
diff --git a/tests/test_pipeline_action.py b/tests/test_pipeline_action.py
index 5f4b076..a452f0a 100644
--- a/tests/test_pipeline_action.py
+++ b/tests/test_pipeline_action.py
@@ -138,7 +138,6 @@ class TestPipelineAction(TestCase):
 
         p.setState(STATE_PLAYING)
         time.sleep(1)
-        p.getState()
         # and make sure that all other elements were created (4)
         # FIXME  if it's failing here, run the test a few times trying to raise
         # the time.sleep() above, it may just be racy...



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]