[pitivi/cpu_preservation: 2/2] previewers: Make sure only 1 audio and 1 video pipeline run concurrently



commit c198a239d262920fb4b09d8ec3d5b5eb1fd12454
Author: Thibault Saunier <thibault saunier collabora com>
Date:   Sat Jul 27 07:25:39 2013 +0200

    previewers: Make sure only 1 audio and 1 video pipeline run concurrently
    
    In order to be able to perform well in case we are trying to generate
    thumbnails and waves for many clips at the same time we need to manage
    the ressource in a concervative way.
    
    The idea in this commit is that we have an object that implements a
    queue of preview generator and control their execution. For that we also
    introduce a PreviewGenerator interface that must be implented by any
    class that generates previews.
    
    + We are now able to recover from errors in the waveform previewer, if
    we get an error on the bus, we stop the pipeline, and add the
    PreviewGenerator again to the PreviewGeneratorManager so it is tryed again latter,
    the second time there will be no rate modulation making it less error
    prone, we abandone after 2 times (1 with rate modulation, the other
    without)
    
    + The thumbnail generation stategy has been sensibly modified so that
    the thumbnails are generated *only* for the visible area, not the whole
    clip. This permit us to be able to get thumbnails for visible parts of
    all clips even though we generate thumbnails for 1 clip after the other.

 pitivi/timeline/previewers.py |  176 +++++++++++++++++++++++++++++++++++-----
 1 files changed, 154 insertions(+), 22 deletions(-)
---
diff --git a/pitivi/timeline/previewers.py b/pitivi/timeline/previewers.py
index 536999e..4615a9c 100644
--- a/pitivi/timeline/previewers.py
+++ b/pitivi/timeline/previewers.py
@@ -21,7 +21,7 @@
 # Boston, MA 02110-1301, USA.
 
 from datetime import datetime, timedelta
-from gi.repository import Clutter, Gst, GLib, GdkPixbuf, Cogl
+from gi.repository import Clutter, Gst, GLib, GdkPixbuf, Cogl, GObject, GES
 from math import log1p, log10
 from random import randrange
 from renderer import *
@@ -36,10 +36,11 @@ import sqlite3
 import sys
 import xdg.BaseDirectory as xdg_dirs
 
+from pitivi.utils.signal import Signallable
 from pitivi.utils.loggable import Loggable
 from pitivi.utils.timeline import Zoomable
 from pitivi.utils.ui import EXPANDED_SIZE, SPACING
-from pitivi.utils.misc import filename_from_uri, quote_uri
+from pitivi.utils.misc import filename_from_uri, quote_uri, print_ns
 from pitivi.utils.ui import EXPANDED_SIZE, SPACING, CONTROL_WIDTH
 
 
@@ -59,7 +60,82 @@ is prefixed with a little b, example : bTimeline
 """
 
 
-class VideoPreviewer(Clutter.ScrollActor, Zoomable, Loggable):
+class PreviewGeneratorManager():
+    """
+    Manage the execution of PreviewGenerators
+    """
+    def __init__(self):
+        self._cpipeline = {
+            GES.TrackType.AUDIO: None,
+            GES.TrackType.VIDEO: None
+        }
+        self._pipelines = {
+            GES.TrackType.AUDIO: [],
+            GES.TrackType.VIDEO: []
+        }
+
+    def addPipeline(self, pipeline):
+        track_type = pipeline.track_type
+
+        if pipeline in self._pipelines[track_type] or \
+                pipeline is self._cpipeline[track_type]:
+            return
+
+        if not self._pipelines[track_type] and self._cpipeline[track_type] is None:
+            self._setPipeline(pipeline)
+        else:
+            self._pipelines[track_type].insert(0, pipeline)
+
+    def _setPipeline(self, pipeline):
+        self._cpipeline[pipeline.track_type] = pipeline
+        PreviewGenerator.connect(pipeline, "done", self._nextPipeline)
+        pipeline.startGeneration()
+
+    def _nextPipeline(self, controlled):
+        track_type = controlled.track_type
+        if self._cpipeline[track_type]:
+            PreviewGenerator.disconnect_by_function(self._cpipeline[track_type],
+                                                    self._nextPipeline)
+            self._cpipeline[track_type] = None
+
+        if self._pipelines[track_type]:
+            self._setPipeline(self._pipelines[track_type].pop())
+
+
+class PreviewGenerator(Signallable):
+    """
+    Interface to be implemented by classes that generate previews
+    It is need to implement it so PreviewGeneratorManager can manage
+    those classes
+    """
+
+    # We only wan 1 instance of PipelineQueue to be used for all the
+    # generators
+    manager = PreviewGeneratorManager()
+
+    __signals__ = {
+        "done": [],
+        "error": [],
+    }
+
+    def __init__(self, track_type):
+        Signallable.__init__(self)
+        self.track_type = track_type
+
+    def startGeneration(self):
+        raise NotImplemented
+
+    def stopGeneration(self):
+        raise NotImplemented
+
+    def becomeControlled(self):
+        """
+        Let the PreviewGeneratorManager control our execution
+        """
+        self.manager.addPipeline(self)
+
+
+class VideoPreviewer(Clutter.ScrollActor, PreviewGenerator, Zoomable, Loggable):
     def __init__(self, bElement, timeline):
         """
         @param bElement : the backend GES.TrackElement
@@ -69,6 +145,7 @@ class VideoPreviewer(Clutter.ScrollActor, Zoomable, Loggable):
         Zoomable.__init__(self)
         Clutter.ScrollActor.__init__(self)
         Loggable.__init__(self)
+        PreviewGenerator.__init__(self, GES.TrackType.VIDEO)
 
         # Variables related to the timeline objects
         self.timeline = timeline
@@ -79,11 +156,13 @@ class VideoPreviewer(Clutter.ScrollActor, Zoomable, Loggable):
         # Variables related to thumbnailing
         self.wishlist = []
         self._callback_id = None
+        self._thumb_cb_id = None
         self._allAnimated = False
+        self._running = False
         self.thumb_period = long(0.5 * Gst.SECOND)  # TODO: get this from user settings
         self.thumb_margin = BORDER_WIDTH
         self.thumb_height = EXPANDED_SIZE - 2 * self.thumb_margin
-        # self.thumb_width will be set by self._setupPipeline()
+        self.thumb_width = None  # will be set by self._setupPipeline()
 
         # Maps (quantized) times to Thumbnail objects
         self.thumbs = {}
@@ -99,15 +178,20 @@ class VideoPreviewer(Clutter.ScrollActor, Zoomable, Loggable):
         self.bElement.connect("notify::duration", self._durationChangedCb)
         self.bElement.connect("notify::in-point", self._inpointChangedCb)
         self.bElement.connect("notify::start", self._startChangedCb)
-        self._setupPipeline()
-        self._startThumbnailingWhenIdle()
+
+        self.pipeline = None
+        self.becomeControlled()
 
     # Internal API
 
     def _update(self, unused_msg_source=None):
         if self._callback_id:
             GLib.source_remove(self._callback_id)
-        self._callback_id = GLib.idle_add(self._addVisibleThumbnails, priority=GLib.PRIORITY_LOW)
+
+        if self.thumb_width:
+            self._addVisibleThumbnails()
+            if self.wishlist:
+                self.becomeControlled()
 
     def _setupPipeline(self):
         """
@@ -173,7 +257,8 @@ class VideoPreviewer(Clutter.ScrollActor, Zoomable, Loggable):
 
         self.lastMoment = datetime.now()
         self.lastUsage = resource.getrusage(resource.RUSAGE_SELF)
-        GLib.timeout_add(self.interval, self._create_next_thumb_when_idle)
+        self._thumb_cb_id = GLib.timeout_add(self.interval, self._create_next_thumb)
+
         return False
 
     def _startThumbnailingWhenIdle(self):
@@ -197,22 +282,26 @@ class VideoPreviewer(Clutter.ScrollActor, Zoomable, Loggable):
 
         self._checkCPU()
 
+        self._addVisibleThumbnails()
         # Save periodically to avoid the common situation where the user exits
         # the app before a long clip has been fully thumbnailed.
         # Spread timeouts between 30-80 secs to avoid concurrent disk writes.
         random_time = randrange(30, 80)
         GLib.timeout_add_seconds(random_time, self._autosave)
 
-    def _create_next_thumb_when_idle(self):
-        self.log('Requesting next thumb when idle for "%s"' % filename_from_uri(self.uri))
-        GLib.idle_add(self._create_next_thumb, priority=GLib.PRIORITY_LOW)
+        # Remove the GSource
+        return False
 
     def _create_next_thumb(self):
-        if not self.queue:
+        if not self.wishlist:
             # nothing left to do
             self.debug("Thumbnails generation complete")
+            self.stopGeneration()
             self.thumb_cache.commit()
             return
+        else:
+            self.debug("Missing %d thumbs", len(self.wishlist))
+
         wish = self._get_wish()
         if wish:
             time = wish
@@ -228,8 +317,11 @@ class VideoPreviewer(Clutter.ScrollActor, Zoomable, Loggable):
             Gst.SeekType.SET, time,
             Gst.SeekType.NONE, -1)
 
+        # Remove the GSource
+        return False
+
     def _autosave(self):
-        if self.queue:
+        if self.wishlist:
             self.log("Periodic thumbnail autosave")
             self.thumb_cache.commit()
             return True
@@ -373,7 +465,8 @@ class VideoPreviewer(Clutter.ScrollActor, Zoomable, Loggable):
             struct_name = struct.get_name()
             if struct_name == "preroll-pixbuf":
                 self._setThumbnail(struct.get_value("stream-time"), struct.get_value("pixbuf"))
-        elif message.type == Gst.MessageType.ASYNC_DONE:
+        elif message.type == Gst.MessageType.ASYNC_DONE and \
+                message.src == self.pipeline:
             self._checkCPU()
         return Gst.BusSyncReply.PASS
 
@@ -395,6 +488,21 @@ class VideoPreviewer(Clutter.ScrollActor, Zoomable, Loggable):
             self.duration = new_duration
             self._update()
 
+    def startGeneration(self):
+        self._setupPipeline()
+        self._startThumbnailingWhenIdle()
+
+    def stopGeneration(self):
+        if self._thumb_cb_id:
+            GLib.source_remove(self._thumb_cb_id)
+            self._thumb_cb_id = None
+
+        if self.pipeline:
+            self.pipeline.set_state(Gst.State.NULL)
+            self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
+            self.pipeline = None
+        PreviewGenerator.emit(self, "done")
+
 
 class Thumbnail(Clutter.Actor):
     def __init__(self, width, height):
@@ -531,16 +639,23 @@ class PipelineCpuAdapter(Loggable):
         self.pipeline = pipeline
         self.bus = self.pipeline.get_bus()
 
-        self.bus.connect("message", self._messageCb)
         self.lastMoment = datetime.now()
         self.lastUsage = resource.getrusage(resource.RUSAGE_SELF)
         self.rate = 1.0
         self.done = False
         self.ready = False
         self.lastPos = 0
+        self._bus_cb_id = None
+
+    def start(self):
         GLib.timeout_add(200, self._modulateRate)
+        self._bus_cb_id = self.bus.connect("message", self._messageCb)
+        self.done = False
 
     def stop(self):
+        if self._bus_cb_id is not None:
+            self.bus.disconnect(self._bus_cb_id)
+            self._bus_cb_id = None
         self.pipeline = None
         self.done = True
 
@@ -577,7 +692,7 @@ class PipelineCpuAdapter(Loggable):
         else:
             if self.rate > 0.5:  # This to avoid going back and forth from READY to PAUSED
                 self.pipeline.set_state(Gst.State.PAUSED)  # The message handler will unset ready and seek 
correctly.
-            return
+            return True
 
         self.pipeline.set_state(Gst.State.PAUSED)
         self.pipeline.seek(self.rate,
@@ -609,7 +724,7 @@ class PipelineCpuAdapter(Loggable):
                     self.ready = False
 
 
-class AudioPreviewer(Clutter.Actor, Zoomable, Loggable):
+class AudioPreviewer(Clutter.Actor, PreviewGenerator, Zoomable, Loggable):
     """
     Audio previewer based on the results from the "level" gstreamer element.
     """
@@ -617,6 +732,7 @@ class AudioPreviewer(Clutter.Actor, Zoomable, Loggable):
         Clutter.Actor.__init__(self)
         Zoomable.__init__(self)
         Loggable.__init__(self)
+        PreviewGenerator.__init__(self, GES.TrackType.AUDIO)
         self.discovered = False
         self.bElement = bElement
         self._uri = quote_uri(bElement.props.uri)  # Guard against malformed URIs
@@ -627,12 +743,14 @@ class AudioPreviewer(Clutter.Actor, Zoomable, Loggable):
         self.canvas = Clutter.Canvas()
         self.set_content(self.canvas)
         self.width = 0
+        self._num_failures = 0
         self.lastUpdate = datetime.now()
 
         self.interval = timedelta(microseconds=INTERVAL)
 
         self.current_geometry = (-1, -1)
 
+        self.adapter = None
         self.surface = None
         self.timeline.connect("scrolled", self._scrolledCb)
         self.canvas.connect("draw", self._drawContentCb)
@@ -650,8 +768,6 @@ class AudioPreviewer(Clutter.Actor, Zoomable, Loggable):
         cache_dir = get_dir(os.path.join(xdg_dirs.xdg_cache_home, os.path.join("pitivi/waves")), autocreate)
         filename = cache_dir + "/" + filename
 
-        self.adapter = None
-
         if os.path.exists(filename):
             self.samples = pickle.load(open(filename, "rb"))
             self._startRendering()
@@ -669,7 +785,7 @@ class AudioPreviewer(Clutter.Actor, Zoomable, Loggable):
 
         self.nSamples = self.bElement.get_parent().get_asset().get_duration() / 10000000
         bus.connect("message", self._messageCb)
-        self.pipeline.set_state(Gst.State.PLAYING)
+        self.becomeControlled()
 
     def set_size(self, width, height):
         if self.discovered:
@@ -779,7 +895,7 @@ class AudioPreviewer(Clutter.Actor, Zoomable, Loggable):
                              self._num_failures)
                 bus.disconnect_by_func(self._messageCb)
                 self._launchPipeline()
-                pipeline_queue.addPipeline(self)  # let it try again
+                self.becomeControlled()
             else:
                 self.error("Issue during waveforms generation: %s"
                            "Abandonning", message.parse_error())
@@ -796,8 +912,11 @@ class AudioPreviewer(Clutter.Actor, Zoomable, Loggable):
                                        Gst.SeekType.NONE,
                                        -1)
 
-                elif not self.adapter and prev == Gst.State.PAUSED and new == Gst.State.PLAYING:
+                # In case we failed previously, we won't modulate next time
+                elif not self.adapter and prev == Gst.State.PAUSED and \
+                        new == Gst.State.PLAYING and self._num_failures == 0:
                     self.adapter = PipelineCpuAdapter(self.pipeline)
+                    self.adapter.start()
 
     def _drawContentCb(self, canvas, cr, surf_w, surf_h):
         cr.set_operator(cairo.OPERATOR_CLEAR)
@@ -816,3 +935,16 @@ class AudioPreviewer(Clutter.Actor, Zoomable, Loggable):
 
     def _scrolledCb(self, unused):
         self._maybeUpdate()
+
+    def startGeneration(self):
+        self.pipeline.set_state(Gst.State.PLAYING)
+        if self.adapter is not None:
+            self.adapter.start()
+
+    def stopGeneration(self):
+        if self.adapter is not None:
+            self.adapter.stop()
+            self.adapter = None
+        self.pipeline.set_state(Gst.State.NULL)
+        self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
+        PreviewGenerator.emit(self, "done")


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]