[pitivi] test_integration.py: factor integration test logic into IntanceRunner



commit e0f9b7a8031cf7c99a030ce634d80feb8d1c7da5
Author: Brandon Lewis <brandon_lewis alum berkeley edu>
Date:   Wed Nov 4 14:14:14 2009 -0800

    test_integration.py: factor integration test logic into IntanceRunner
    
    Reduces test case boilerplate code and enables most tests to run as
    self-contained methods of TestCase objects.

 tests/test_integration.py |  427 ++++++++++++++++++++++++++++-----------------
 1 files changed, 271 insertions(+), 156 deletions(-)
---
diff --git a/tests/test_integration.py b/tests/test_integration.py
index a3fc7d6..c1abdd0 100644
--- a/tests/test_integration.py
+++ b/tests/test_integration.py
@@ -28,6 +28,7 @@ from pitivi.application import InteractivePitivi
 from pitivi.timeline.timeline import MoveContext, TrimStartContext,\
     TrimEndContext
 from pitivi.signalinterface import Signallable
+from pitivi.stream import AudioStream, VideoStream
 import pitivi.instance
 import gobject
 import os.path
@@ -111,15 +112,183 @@ class TestWatchdog(TestCase):
         self.timeout_called = True
         return False
 
+class Configuration(object):
+
+    def __init__(self, *sources):
+        self.sources = []
+        for source in sources:
+            self.addSource(*source)
+
+    def addSource(self, name, uri, props=None, error=False):
+        self.sources.append((name, uri, props))
+
+    def addBadSource(self, name, uri):
+        self.sources.append((name, uri))
+
+    def getUris(self):
+        return set((source[1] for source in self.sources))
+
+    def getGoodUris(self):
+        return set((source[1] for source in self.sources if
+            len(source) > 2))
+
+    def matches(self, instance_runner):
+        for name, uri, props in self.sources:
+            timelineObject = getattr(instance_runner, name)
+            if timelineObject.factory.uri != uri:
+                raise Exception("%s has wrong factory type!" % name)
+            if timelineObject:
+                for prop, value in props.iteritems():
+                    if not getattr(timelineObject, prop) == value:
+                        raise Exception("'%s'.%s != %r" % (uri, prop, value))
+
+    def __iter__(self):
+        return (source for source in self.sources if len(source) > 2)
+
+class InstanceRunner(Signallable):
+
+    no_ui = True
+
+    class container(object):
+
+        def __init__(self):
+            pass
+
+    __signals__ = {
+        "sources-loaded" : [],
+        "timeline-configured" : [],
+        "scrub-done" : [],
+    }
+
+    def __init__(self, instance):
+        self.instance = instance
+        self.watchdog = WatchDog(instance.mainloop, 10000)
+        self.factories = set()
+        self.errors = set()
+        self.project = None
+        self.timeline = None
+        self.tracks = {}
+        self.pending_configuration = None
+        self.scrubContext = None
+        self.scrubTime = 0
+        self.scrubPriority = 0
+        self.scrubMaxPriority = 0
+        self.scrubMaxTime = 0
+        self.scrubCount = 0
+        self.scrubSteps = 0
+        self.audioTracks = 0
+        self.videoTracks = 0
+        instance.connect("new-project-loaded", self._newProjectLoadedCb)
+
+    def loadConfiguration(self, configuration):
+        self.pending_configuration = configuration
+
+    def _newProjectLoadedCb(self, instance, project):
+        self.project = instance.current
+        self.timeline = self.project.timeline
+        for track in self.timeline.tracks:
+            self._trackAddedCb(self.timeline, track)
+        self.project.sources.connect("source-added", self._sourceAdded)
+        self.project.sources.connect("discovery-error", self._discoveryError)
+        self.project.sources.connect("ready", self._readyCb)
+        self.timeline.connect("track-added", self._trackAddedCb)
+
+        if self.pending_configuration:
+            self._loadSources(self.pending_configuration)
+
+    def _sourceAdded(self, sourcelist, factory):
+        self.factories.add(factory.uri)
+
+    def _discoveryError(self, sourcelist, uri, reason, unused):
+        self.errors.add(uri)
+
+    def _readyCb(self, soucelist):
+        assert self.factories == self.pending_configuration.getGoodUris()
+        if self.pending_configuration:
+            self._setupTimeline(self.pending_configuration)
+        self.emit("sources-loaded")
+
+    def _loadSources(self, configuration):
+        for uri in configuration.getUris():
+            self.project.sources.addUri(uri)
+
+    def _trackAddedCb(self, timeline, track):
+        if type(track.stream) is AudioStream:
+            self.audioTracks += 1
+            attrname = "audio%d" % self.audioTracks
+        elif type(track.stream) is VideoStream:
+            self.videoTracks += 1
+            attrname = "video%d" % self.videoTracks
+        container = self.container()
+        setattr(self, attrname, container)
+        self.tracks[track] = container 
+
+    def _setupTimeline(self, configuration):
+        for name, uri, props in configuration:
+            factory = self.project.sources.getUri(uri)
+            if not factory:
+                raise Exception("Could not find '%s' in sourcelist" %
+                    source)
+
+            if not props:
+                continue
+
+            timelineObject = self.timeline.addSourceFactory(factory)
+            setattr(self, name, timelineObject)
+            for trackObject in timelineObject.track_objects:
+                track = self.tracks[trackObject.track]
+                setattr(track, name, trackObject)
+
+            if not timelineObject:
+                raise Exception("Could not add source '%s' to timeline" %
+                    source)
+            for prop, value in props.iteritems():
+                setattr(timelineObject, prop, value)
+        self.emit("timeline-configured")
+
+    def scrub(self, context, finalTime, finalPriority, delay=100, maxtime = 7200 * gst.SECOND, maxpriority =10, steps = 10):
+        """ Scrubs an editing context as if a user were frantically dragging a
+        clips with the mouse """
+ 
+        self.scrubContext = context
+        self.scrubTime = finalTime
+        self.scrubPriority = finalPriority
+        self.scrubMaxPriority = maxpriority
+        self.scrubMaxTime = maxtime
+        self.scrubCount = 0
+        self.scrubSteps = steps
+ 
+        self.watchdog.keepAlive()
+        gobject.timeout_add(delay, self._scrubTimeoutCb)
+ 
+    def _scrubTimeoutCb(self):
+        time_ = random.randint(0, self.scrubMaxTime)
+        priority = random.randint(0, self.scrubMaxPriority)
+        self.scrubContext.editTo(time_, priority)
+        self.scrubCount += 1
+        self.watchdog.keepAlive()
+ 
+        if self.scrubCount < self.scrubSteps:
+            return True
+        else:
+            self.scrubContext.editTo(self.scrubTime, self.scrubPriority)
+            self.scrubContext.finish()
+            self.emit("scrub-done")
+            return False
+
+    def run(self):
+        self.watchdog.start()
+        if self.no_ui:
+            self.instance.run(["--no-ui"])
+        else:
+            self.instance.run([])
+
 class Base(TestCase):
     """
     Creates and runs an InteractivePitivi object, then starts the mainloop.
     Uses a WatchDog to ensure that test cases will eventually terminate with an
     assertion failure if runtime errors occur inside the mainloop."""
 
-    watchdog_timeout = 1000
-    no_ui = True
-
     def setUp(self):
         TestCase.setUp(self)
         ptv = InteractivePitivi()
@@ -133,185 +302,131 @@ class Base(TestCase):
         self.assertEquals(pitivi.instance.PiTiVi, ptv)
         self.ptv = ptv
 
-        # setup a watchdog timer
-        self.watchdog = WatchDog(ptv.mainloop, self.watchdog_timeout)
-
-        # connect to the new-project-loaded signal
-        self.ptv.connect("new-project-loaded", self._projectLoadedCb)
-
-    def testPiTiVi(self):
-        self.runPitivi()
-
-    def runPitivi(self):
-        self.watchdog.start()
-        if self.no_ui:
-            self.ptv.run(["--no-ui"])
-        else:
-            self.ptv.run([])
-
-    def _projectLoadedCb(self, pitivi, project):
-        gobject.idle_add(self.ptv.shutdown)
+        # create an instance runner
+        self.runner = InstanceRunner(ptv)
 
     def tearDown(self):
         # make sure we aren't exiting because our watchdog activated
-        self.assertFalse(self.watchdog.activated)
+        self.assertFalse(self.runner.watchdog.activated)
         # make sure the instance has been unset
         self.assertEquals(pitivi.instance.PiTiVi, None)
         del self.ptv
+        del self.runner
         TestCase.tearDown(self)
 
-class TestImport(Base):
+    def testWatchdog(self):
+        self.runner.run()
+        self.assertTrue(self.runner.watchdog.activated)
+        self.runner.watchdog.activated = False
 
-    """Test discoverer, sourcelist, and project integration. When the new
-    project loads, attempt to add both existing and non-existing sources. Make
-    sure that an error is emitted when a source fails to load, and that
-    source-added is emitted when sources are loaded successfully."""
+    def testBasic(self):
 
-    def setUp(self):
-        self.factories = set()
-        self.errors = set()
-        Base.setUp(self)
+        def newProjectLoaded(pitivi, project):
+            gobject.idle_add(self.ptv.shutdown)
 
-    def _sourceAdded(self, sourcelist, factory):
-        self.factories.add(factory.uri)
+        self.ptv.connect("new-project-loaded", newProjectLoaded)
+        self.runner.run()
 
-    def _discoveryError(self, sourcelist, uri, reason, unused):
-        self.errors.add(uri)
+    def testImport(self):
 
-    def _readyCb(self, soucelist):
-        self.failUnlessEqual(self.factories, set((test1, test2)))
-        self.failUnlessEqual(self.errors, set((test3,)))
-        self.ptv.current._dirty = False
-        self.ptv.shutdown()
-
-    def _projectLoadedCb(self, pitivi, project):
-        self.ptv.current.sources.connect("source-added", self._sourceAdded)
-        self.ptv.current.sources.connect("discovery-error", self._discoveryError)
-        self.ptv.current.sources.connect("ready", self._readyCb)
-        self.ptv.current.sources.addUri(test1)
-        self.ptv.current.sources.addUri(test2)
-        self.ptv.current.sources.addUri(test3)
-
-class TestTimeline(Base):
-
-    """Base test case for tests involving the timeline. Imports several
-    sources to the project and adds them to the timeline, optionally setting
-    their start, media-start, and duration properties."""
-
-    sources = (
-        (
-            "timelineObject1", 
+        def sourcesLoaded(runner):
+            gobject.idle_add(self.ptv.shutdown)
+
+        config = Configuration()
+        config.addSource("test1", test1)
+        config.addSource("test2", test2)
+        config.addBadSource("test3", test3)
+
+        self.runner.connect("sources-loaded", sourcesLoaded)
+        self.runner.loadConfiguration(config)
+        self.runner.run()
+
+        self.assertFalse(hasattr(self.runner,test1))
+        self.assertFalse(hasattr(self.runner,test2))
+        self.failUnlessEqual(self.runner.factories, set((test1, test2)))
+        self.failUnlessEqual(self.runner.errors, set((test3,)))
+
+    def testConfigureTimeline(self):
+ 
+        config = Configuration()
+        config.addSource(
+            "object1", 
             test1, 
             {
                 "start" : 0,
                 "duration" : gst.SECOND,
                 "media-start" : gst.SECOND,
-            }
-        ),
-        (
-            "timelineObject2", 
+            })
+        config.addSource(
+            "object2", 
             test2,
             {
                 "start" : gst.SECOND,
                 "duration" : gst.SECOND,
-            }
-        ),
-    )
-
-    def setUp(self):
-        self.timelineObjects = set()
-        Base.setUp(self)
-
-    def _projectLoadedCb(self, pitivi, project):
-        self.ptv.current.sources.connect("ready", self._readyCb)
-        for name, uri, props in self.sources:
-            self.ptv.current.sources.addUri(uri)
-
-    def _timelineSetup(self):
-        self.ptv.current.timeline.connect("timeline-object-added",
-            self._timelineObjectAddedCb)
-
-        for name, uri, props in self.sources:
-            factory = self.ptv.current.sources.getUri(uri)
-            if not factory:
-                raise Exception("Could not find '%s' in sourcelist" %
-                    source)
-            timelineObject = self.ptv.current.timeline.addSourceFactory(factory)
-            setattr(self, name, timelineObject)
-            if not timelineObject:
-                raise Exception("Could not add source '%s' to timeline" %
-                    source)
-            if props:
-                for prop, value in props.iteritems():
-                    setattr(timelineObject, prop, value)
-
-    def _timelineObjectAddedCb(self, timeline, object):
-        self.timelineObjects.add(object)
-
-    def _readyCb(self, unused_sourcelist):
-        self._timelineSetup()
-        self._verifyTimeline()
-        self._interactiveTest()
-
-    def _interactiveTest(self):
-        self.ptv.current._dirty = False
-        self.ptv.shutdown()
-
-    def _verifyTimeline(self):
-        for name, uri, props in self.sources:
-            timelineObject = getattr(self, name)
-            if timelineObject:
-                for prop, value in props.iteritems():
-                    if not getattr(timelineObject, prop) == value:
-                        raise Exception("'%s'.%s != %r" % (uri, prop, value))
-
-    def scrubContext(self, context, finalTime, finalPriority, callback=None,
-        delay=100, maxtime = 7200 * gst.SECOND, maxpriority =10, steps = 10):
-        """ Scrubs an editing context as if a user were frantically dragging a
-        clips with the mouse """
-
-        self._scrubContext = context
-        self._scrubTime = finalTime
-        self._scrubPriority = finalPriority
-        self._scrubMaxPriority = maxpriority
-        self._scrubMaxTime = maxtime
-        self._scrubCount = 0
-        self._scrubSteps = steps
-        self._scrubCallback = callback
+            })
+ 
+        def timelineConfigured(runner):
+            config.matches(self.runner)
+            gobject.idle_add(self.ptv.shutdown)
+ 
+        self.runner.loadConfiguration(config)
+        self.runner.connect("timeline-configured", timelineConfigured)
+        self.runner.run()
+
+        self.assertTrue(self.runner.object1)
+        self.assertTrue(self.runner.object2)
+        self.assertTrue(self.runner.video1.object1)
+        self.assertTrue(self.runner.audio1.object2)
+
+    def testMoveSources(self):
+        initial = Configuration()
+        initial.addSource(
+            "object1", 
+            test1, 
+            {
+                "start" : 0,
+                "duration" : gst.SECOND,
+                "media-start" : gst.SECOND,
+                "priority" : 0
+            })
+        initial.addSource(
+            "object2", 
+            test2,
+            {
+                "start" : gst.SECOND,
+                "duration" : gst.SECOND,
+                "priority" : 1,
+            })
+        final = Configuration()
+        final.addSource(
+            "object1",
+            test1,
+            {
+                "start" : 10 * gst.SECOND,
+            })
+        final.addSource(
+            "object2",
+            test2,
+            {
+                "start" : 11 * gst.SECOND,
+                "priority" : 2,
+            })
 
-        self.watchdog.keepAlive()
-        gobject.timeout_add(delay, self._scrubTimeoutCb)
+        def timelineConfigured(runner):
+            context = MoveContext(self.runner.timeline, 
+                self.runner.video1.object1,
+                set((self.runner.audio1.object2,)))
+            self.runner.scrub(context, 10 * gst.SECOND, 1, steps=10)
 
-    def _scrubTimeoutCb(self):
-        time_ = random.randint(0, self._scrubMaxTime)
-        priority = random.randint(0, self._scrubMaxPriority)
-        self._scrubContext.editTo(time_, priority)
-        self._scrubCount += 1
-        self.watchdog.keepAlive()
+        def scrubDone(runner):
+            final.matches(runner)
+            gobject.idle_add(self.ptv.shutdown)
 
-        if self._scrubCount < self._scrubSteps:
-            return True
-        else:
-            self._scrubContext.editTo(self._scrubTime, self._scrubPriority)
-            self._scrubCallback()
-            return False
+        self.runner.loadConfiguration(initial)
+        self.runner.connect("timeline-configured", timelineConfigured)
+        self.runner.connect("scrub-done", scrubDone)
 
-class TestMoveSources(TestTimeline):
-
-    def _finish(self):
-        self.context.finish()
-        self.assertEquals(self.timelineObject1.start, 10 * gst.SECOND)
-        self.assertEquals(self.timelineObject1.priority, 2)
-        self.assertEquals(self.timelineObject2.start, 11 * gst.SECOND)
-        self.assertEquals(self.timelineObject2.priority, 2)
-        self.ptv.current.dirty = False
-        self.ptv.shutdown()
-
-    def _interactiveTest(self):
-        timeline = self.ptv.current.timeline
-        self.context = MoveContext(timeline, self.timelineObject1,
-            set((self.timelineObject2,)))
-        self.scrubContext(self.context, 10 * gst.SECOND, 2, self._finish)
+        self.runner.run()
 
 if __name__ == "__main__":
     unittest.main()



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]