[pitivi/ges] pipeline: Add a pipeline class subclass of ges.TimelinePipeline a helper



commit badb65574fa1e036ee237ad9379f648d2dd58eb5
Author: Thibault Saunier <thibault saunier collabora com>
Date:   Mon Apr 30 10:19:27 2012 -0400

    pipeline: Add a pipeline class subclass of ges.TimelinePipeline a helper
    
    This class provide a high level API over ges.TimelinePipeline

 pitivi/mainwindow.py     |   43 +++----
 pitivi/project.py        |    5 +-
 pitivi/utils/Makefile.am |    1 +
 pitivi/utils/pipeline.py |  341 ++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 364 insertions(+), 26 deletions(-)
---
diff --git a/pitivi/mainwindow.py b/pitivi/mainwindow.py
index ddfe0f5..af4b817 100644
--- a/pitivi/mainwindow.py
+++ b/pitivi/mainwindow.py
@@ -36,6 +36,7 @@ from gettext import gettext as _
 from gtk import RecentManager
 from hashlib import md5
 
+from pitivi.utils.pipeline import PipelineError
 from pitivi.utils.loggable import Loggable
 from pitivi.utils.misc import in_devel
 from pitivi.settings import GlobalSettings
@@ -1304,24 +1305,22 @@ class PitiviMainWindow(gtk.Window, Loggable):
 
     def _timelineSeekRelativeCb(self, unused_seeker, time):
         try:
-            position = self.app.current.pipeline.query_position(gst.FORMAT_TIME)[0]
+            position = self.app.current.pipeline.getPosition()
             position += time
 
-            self.app.current.pipeline.seek(1.0, gst.FORMAT_TIME, gst.SEEK_FLAG_FLUSH,
-                    gst.SEEK_TYPE_SET, position, gst.SEEK_TYPE_NONE, -1)
+            self.app.current.pipeline.simple_seek(position)
             self._seeker.setPosition(position)
 
-        except Exception, e:
+        except PipelineError:
             self.error("seek failed %s %s %s", gst.TIME_ARGS(position), format, e)
 
     def _timelineSeekFlushCb(self, unused_seeker):
         try:
-            position = self.app.current.pipeline.query_position(gst.FORMAT_TIME)[0]
-            self.app.current.pipeline.seek(1.0, gst.FORMAT_TIME, gst.SEEK_FLAG_FLUSH,
-                    gst.SEEK_TYPE_SET, position, gst.SEEK_TYPE_NONE, -1)
+            position = self.app.current.pipeline.getPosition()
+            self.app.current.pipeline.simple_seek(position)
             self._seeker.setPosition(position)
 
-        except Exception, e:
+        except PipelineError:
             self.error("seek failed %s %s %s", gst.TIME_ARGS(position), format, e)
 
     def _timelineSeekCb(self, ruler, position, format):
@@ -1331,23 +1330,19 @@ class PitiviMainWindow(gtk.Window, Loggable):
         We clamp the seeker position so that it cannot go past 0 or the
         end of the timeline.
         """
-        try:
-            # FIXME: ideally gstreamer should allow seeking to the exact end...
-            # but since it doesn't, we seek one nanosecond before the end.
-            end = self.app.current.timeline.props.duration - 1
-            # CLAMP (0, position, self.app.current.timeline.props.duration)
-            position = sorted((0, position, end))[1]
-
-            if not self.app.current.pipeline.seek(1.0, format, gst.SEEK_FLAG_FLUSH,
-                    gst.SEEK_TYPE_SET, position, gst.SEEK_TYPE_NONE, -1):
-                self.warning("Could not seek to %s", gst.TIME_ARGS(position))
-            else:
-                self._seeker.setPosition(position)
-            # Ensure that the viewer UI is updated when seeking while paused
-            self.viewer.positionCheck()
+        # FIXME: ideally gstreamer should allow seeking to the exact end...
+        # but since it doesn't, we seek one nanosecond before the end.
+        end = self.app.current.timeline.props.duration - 1
+        # CLAMP (0, position, self.app.current.timeline.props.duration)
+        position = sorted((0, position, end))[1]
 
-        except Exception, e:
-            self.error("seek failed %s %s %s", gst.TIME_ARGS(position), format, e)
+        try:
+            self.app.current.pipeline.simple_seek(position)
+            self._seeker.setPosition(position)
+        except PipelineError:
+            self.warning("Could not seek to %s", gst.TIME_ARGS(position))
+        # Ensure that the viewer UI is updated when seeking while paused
+        self.viewer.positionCheck()
 
     def updateTitle(self):
         name = touched = ""
diff --git a/pitivi/project.py b/pitivi/project.py
index 582da37..49109dd 100644
--- a/pitivi/project.py
+++ b/pitivi/project.py
@@ -45,6 +45,7 @@ from pitivi.utils.misc import quote_uri, path_from_uri
 from pitivi.utils.playback import Seeker
 from pitivi.utils.loggable import Loggable
 from pitivi.utils.signal import Signallable
+from pitivi.utils.pipeline import Pipeline
 from pitivi.utils.timeline import Selection
 from pitivi.utils.widgets import FractionWidget
 from pitivi.utils.ripple_update_group import RippleUpdateGroup
@@ -448,7 +449,7 @@ class Project(Signallable, Loggable):
     @ivar timeline: The timeline
     @type timeline: L{ges.Timeline}
     @ivar pipeline: The timeline's pipeline
-    @type pipeline: L{ges.Pipeline}
+    @type pipeline: L{Pipeline}
     @ivar format: The format under which the project is currently stored.
     @type format: L{FormatterClass}
     @ivar loaded: Whether the project is fully loaded or not.
@@ -489,7 +490,7 @@ class Project(Signallable, Loggable):
         # no such feature in GES
         self.timeline.selection = Selection()
 
-        self.pipeline = ges.TimelinePipeline()
+        self.pipeline = Pipeline()
         self.pipeline.add_timeline(self.timeline)
         self.seeker = Seeker()
 
diff --git a/pitivi/utils/Makefile.am b/pitivi/utils/Makefile.am
index 11c2d40..32e7c8f 100644
--- a/pitivi/utils/Makefile.am
+++ b/pitivi/utils/Makefile.am
@@ -6,6 +6,7 @@ utils_PYTHON = 	\
 	timeline.py     \
 	loggable.py     \
 	playback.py     \
+	pipeline.py     \
 	signal.py       \
 	ui.py           \
 	system.py       \
diff --git a/pitivi/utils/pipeline.py b/pitivi/utils/pipeline.py
new file mode 100644
index 0000000..d7c4922
--- /dev/null
+++ b/pitivi/utils/pipeline.py
@@ -0,0 +1,341 @@
+#!/usr/bin/env python
+#
+#       pipeline.py
+#
+# Copyright (C) 2012 Thibault Saunier <thibaul saunier collabora com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+
+"""
+High-level pipelines
+"""
+from pitivi.utils.loggable import Loggable
+import gobject
+import gst
+import ges
+
+
+# FIXME : define/document a proper hierarchy
+class PipelineError(Exception):
+    pass
+
+
+class Pipeline(ges.TimelinePipeline, Loggable):
+    """
+
+    The Pipeline is only responsible for:
+     - State changes
+     - Position seeking
+     - Position Querying
+       - Along with an periodic callback (optional)
+
+    Signals:
+     - C{state-changed} : The state of the pipeline changed.
+     - C{position} : The current position of the pipeline changed.
+     - C{eos} : The Pipeline has finished playing.
+     - C{error} : An error happened.
+    """
+
+    __gsignals__ = {
+        "state-changed": (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
+                        (gobject.TYPE_INT,)),
+        "position": (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
+                        (gobject.TYPE_UINT64,)),
+        "duration-changed": (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
+                        (gobject.TYPE_UINT64,)),
+        "eos": (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
+                        ()),
+        "error": (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
+                        (gobject.TYPE_POINTER, gobject.TYPE_POINTER)),
+        "element-message": (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
+                        (gobject.TYPE_POINTER,))}
+
+    def __init__(self):
+        Loggable.__init__(self)
+        ges.TimelinePipeline.__init__(self)
+        self._bus = self.get_bus()
+        self._bus.add_signal_watch()
+        self._bus.connect("message", self._busMessageCb)
+        self._bus.set_sync_handler(self._busSyncMessageHandler)
+        self._listening = False  # for the position handler
+        self._listeningInterval = 300  # default 300ms
+        self._listeningSigId = 0
+
+    def release(self):
+        """
+        Release the L{Pipeline} and all used L{ObjectFactory} and
+        L{Action}s.
+
+        Call this method when the L{Pipeline} is no longer used. Forgetting to do
+        so will result in memory loss.
+
+        @postcondition: The L{Pipeline} will no longer be usable.
+        """
+        self._listenToPosition(False)
+        self._bus.disconnect_by_func(self._busMessageCb)
+        self._bus.remove_signal_watch()
+        self._bus.set_sync_handler(None)
+        self.setState(gst.STATE_NULL)
+        self._bus = None
+
+    def flushSeekVideo(self):
+        self.pause()
+        try:
+            self.seekRelative(0)
+        except PipelineError:
+            pass
+
+    def setState(self, state):
+        """
+        Set the L{Pipeline} to the given state.
+
+        @raises PipelineError: If the C{gst.Pipeline} could not be changed to
+        the requested state.
+        """
+        self.debug("state:%r" % state)
+        res = self.set_state(state)
+        if res == gst.STATE_CHANGE_FAILURE:
+            # reset to NULL
+            self.set_state(gst.STATE_NULL)
+            raise PipelineError("Failure changing state of the gst.Pipeline to %r, currently reset to NULL" % state)
+
+    def getState(self):
+        """
+        Query the L{Pipeline} for the current state.
+
+        @see: L{setState}
+
+        This will do an actual query to the underlying GStreamer Pipeline.
+        @return: The current state.
+        @rtype: C{State}
+        """
+        change, state, pending = self.get_state(0)
+        self.debug("change:%r, state:%r, pending:%r" % (change, state, pending))
+        return state
+
+    def play(self):
+        """
+        Sets the L{Pipeline} to PLAYING
+        """
+        self.setState(gst.STATE_PLAYING)
+
+    def pause(self):
+        """
+        Sets the L{Pipeline} to PAUSED
+        """
+        self.setState(gst.STATE_PAUSED)
+
+        # When the pipeline has been paused we need to update the
+        # timeline/playhead position, as the 'position' signal
+        # is only emitted every 300ms and the playhead jumps
+        # during the playback.
+        try:
+            self.emit("position", self.getPosition())
+        except PipelineError:
+            # Getting the position failed
+            pass
+
+    def stop(self):
+        """
+        Sets the L{Pipeline} to READY
+        """
+        self.setState(gst.STATE_READY)
+
+    def togglePlayback(self):
+        if self.getState() == gst.STATE_PLAYING:
+            self.pause()
+        else:
+            self.play()
+
+    #{ Position and Seeking methods
+
+    def getPosition(self, format=gst.FORMAT_TIME):
+        """
+        Get the current position of the L{Pipeline}.
+
+        @param format: The format to return the current position in
+        @type format: C{gst.Format}
+        @return: The current position or gst.CLOCK_TIME_NONE
+        @rtype: L{long}
+        @raise PipelineError: If the position couldn't be obtained.
+        """
+        self.log("format %r" % format)
+        try:
+            cur, format = self.query_position(format)
+        except Exception, e:
+            self.handleException(e)
+            raise PipelineError("Couldn't get position")
+
+        self.log("Got position %s" % gst.TIME_ARGS(cur))
+        return cur
+
+    def getDuration(self, format=gst.FORMAT_TIME):
+        """
+        Get the duration of the C{Pipeline}.
+        """
+        self.log("format %r" % format)
+        try:
+            dur, format = self.query_duration(format)
+        except Exception, e:
+            self.handleException(e)
+            raise PipelineError("Couldn't get duration")
+
+        self.log("Got duration %s" % gst.TIME_ARGS(dur))
+        self.emit("duration-changed", dur)
+        return dur
+
+    def activatePositionListener(self, interval=300):
+        """
+        Activate the position listener.
+
+        When activated, the Pipeline will emit the 'position' signal at the
+        specified interval when it is the PLAYING or PAUSED state.
+
+        @see: L{deactivatePositionListener}
+        @param interval: Interval between position queries in milliseconds
+        @type interval: L{int} milliseconds
+        @return: Whether the position listener was activated or not
+        @rtype: L{bool}
+        """
+        if self._listening:
+            return True
+        self._listening = True
+        self._listeningInterval = interval
+        # if we're in paused or playing, switch it on
+        self._listenToPosition(self.getState() == gst.STATE_PLAYING)
+        return True
+
+    def deactivatePositionListener(self):
+        """
+        De-activates the position listener.
+
+        @see: L{activatePositionListener}
+        """
+        self._listenToPosition(False)
+        self._listening = False
+
+    def _positionListenerCb(self):
+        try:
+            cur = self.getPosition()
+            if cur != gst.CLOCK_TIME_NONE:
+                self.emit('position', cur)
+        finally:
+            return True
+
+    def _listenToPosition(self, listen=True):
+        # stupid and dumm method, not many checks done
+        # i.e. it does NOT check for current state
+        if listen:
+            if self._listening and self._listeningSigId == 0:
+                self._listeningSigId = gobject.timeout_add(self._listeningInterval,
+                    self._positionListenerCb)
+        elif self._listeningSigId != 0:
+            gobject.source_remove(self._listeningSigId)
+            self._listeningSigId = 0
+
+    def simple_seek(self, position, format=gst.FORMAT_TIME):
+        """
+        Seeks in the L{Pipeline} to the given position.
+
+        @param position: Position to seek to
+        @type position: L{long}
+        @param format: The C{Format} of the seek position
+        @type format: C{gst.Format}
+        @raise PipelineError: If seek failed
+        """
+        if format == gst.FORMAT_TIME:
+            self.debug("position : %s" % gst.TIME_ARGS(position))
+        else:
+            self.debug("position : %d , format:%d" % (position, format))
+
+        # clamp between [0, duration]
+        if format == gst.FORMAT_TIME:
+            position = max(0, min(position, self.getDuration()))
+
+        res = self.seek(1.0, format, gst.SEEK_FLAG_FLUSH,
+                                  gst.SEEK_TYPE_SET, position,
+                                  gst.SEEK_TYPE_NONE, -1)
+        if not res:
+            self.debug("seeking failed")
+            raise PipelineError("seek failed")
+        self.debug("seeking succesfull")
+        self.emit('position', position)
+
+    def seekRelative(self, time):
+        seekvalue = max(0, min(self.getPosition() + time,
+            self.getDuration()))
+        self.seek(seekvalue)
+
+    #}
+    ## Private methods
+
+    def _busMessageCb(self, unused_bus, message):
+        if message.type == gst.MESSAGE_EOS:
+            self.pause()
+            self.emit('eos')
+        elif message.type == gst.MESSAGE_STATE_CHANGED:
+            prev, new, pending = message.parse_state_changed()
+            self.debug("element %s state change %s" % (message.src,
+                    (prev, new, pending)))
+
+            if message.src == self:
+                self.debug("Pipeline change state prev:%r, new:%r, pending:%r" % (prev, new, pending))
+
+                emit_state_change = pending == gst.STATE_VOID_PENDING
+                if prev == gst.STATE_READY and new == gst.STATE_PAUSED:
+                    # trigger duration-changed
+                    try:
+                        self.getDuration()
+                    except PipelineError:
+                        # no sinks??
+                        pass
+                elif prev == gst.STATE_PAUSED and new == gst.STATE_PLAYING:
+                    self._listenToPosition(True)
+                elif prev == gst.STATE_PLAYING and new == gst.STATE_PAUSED:
+                    self._listenToPosition(False)
+
+                if emit_state_change:
+                    self.emit('state-changed', new)
+
+        elif message.type == gst.MESSAGE_ERROR:
+            error, detail = message.parse_error()
+            self._handleErrorMessage(error, detail, message.src)
+        elif message.type == gst.MESSAGE_DURATION:
+            self.debug("Duration might have changed, querying it")
+            gobject.idle_add(self._queryDurationAsync)
+        else:
+            self.info("%s [%r]" % (message.type, message.src))
+
+    def _queryDurationAsync(self, *args, **kwargs):
+        try:
+            self.getDuration()
+        except:
+            self.log("Duration failed... but we don't care")
+        return False
+
+    def _handleErrorMessage(self, error, detail, source):
+        self.error("error from %s: %s (%s)" % (source, error, detail))
+        self.emit('error', error, detail)
+
+    def _busSyncMessageHandler(self, unused_bus, message):
+        if message.type == gst.MESSAGE_ELEMENT:
+            # handle element message synchronously
+            self.emit('element-message', message)
+        return gst.BUS_PASS



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]