[pitivi] test_integration.py: factor integration test logic into IntanceRunner
- From: Edward Hervey <edwardrv src gnome org>
- To: svn-commits-list gnome org
- Cc:
- Subject: [pitivi] test_integration.py: factor integration test logic into IntanceRunner
- Date: Fri, 20 Nov 2009 07:48:03 +0000 (UTC)
commit e0f9b7a8031cf7c99a030ce634d80feb8d1c7da5
Author: Brandon Lewis <brandon_lewis alum berkeley edu>
Date: Wed Nov 4 14:14:14 2009 -0800
test_integration.py: factor integration test logic into IntanceRunner
Reduces test case boilerplate code and enables most tests to run as
self-contained methods of TestCase objects.
tests/test_integration.py | 427 ++++++++++++++++++++++++++++-----------------
1 files changed, 271 insertions(+), 156 deletions(-)
---
diff --git a/tests/test_integration.py b/tests/test_integration.py
index a3fc7d6..c1abdd0 100644
--- a/tests/test_integration.py
+++ b/tests/test_integration.py
@@ -28,6 +28,7 @@ from pitivi.application import InteractivePitivi
from pitivi.timeline.timeline import MoveContext, TrimStartContext,\
TrimEndContext
from pitivi.signalinterface import Signallable
+from pitivi.stream import AudioStream, VideoStream
import pitivi.instance
import gobject
import os.path
@@ -111,15 +112,183 @@ class TestWatchdog(TestCase):
self.timeout_called = True
return False
+class Configuration(object):
+
+ def __init__(self, *sources):
+ self.sources = []
+ for source in sources:
+ self.addSource(*source)
+
+ def addSource(self, name, uri, props=None, error=False):
+ self.sources.append((name, uri, props))
+
+ def addBadSource(self, name, uri):
+ self.sources.append((name, uri))
+
+ def getUris(self):
+ return set((source[1] for source in self.sources))
+
+ def getGoodUris(self):
+ return set((source[1] for source in self.sources if
+ len(source) > 2))
+
+ def matches(self, instance_runner):
+ for name, uri, props in self.sources:
+ timelineObject = getattr(instance_runner, name)
+ if timelineObject.factory.uri != uri:
+ raise Exception("%s has wrong factory type!" % name)
+ if timelineObject:
+ for prop, value in props.iteritems():
+ if not getattr(timelineObject, prop) == value:
+ raise Exception("'%s'.%s != %r" % (uri, prop, value))
+
+ def __iter__(self):
+ return (source for source in self.sources if len(source) > 2)
+
+class InstanceRunner(Signallable):
+
+ no_ui = True
+
+ class container(object):
+
+ def __init__(self):
+ pass
+
+ __signals__ = {
+ "sources-loaded" : [],
+ "timeline-configured" : [],
+ "scrub-done" : [],
+ }
+
+ def __init__(self, instance):
+ self.instance = instance
+ self.watchdog = WatchDog(instance.mainloop, 10000)
+ self.factories = set()
+ self.errors = set()
+ self.project = None
+ self.timeline = None
+ self.tracks = {}
+ self.pending_configuration = None
+ self.scrubContext = None
+ self.scrubTime = 0
+ self.scrubPriority = 0
+ self.scrubMaxPriority = 0
+ self.scrubMaxTime = 0
+ self.scrubCount = 0
+ self.scrubSteps = 0
+ self.audioTracks = 0
+ self.videoTracks = 0
+ instance.connect("new-project-loaded", self._newProjectLoadedCb)
+
+ def loadConfiguration(self, configuration):
+ self.pending_configuration = configuration
+
+ def _newProjectLoadedCb(self, instance, project):
+ self.project = instance.current
+ self.timeline = self.project.timeline
+ for track in self.timeline.tracks:
+ self._trackAddedCb(self.timeline, track)
+ self.project.sources.connect("source-added", self._sourceAdded)
+ self.project.sources.connect("discovery-error", self._discoveryError)
+ self.project.sources.connect("ready", self._readyCb)
+ self.timeline.connect("track-added", self._trackAddedCb)
+
+ if self.pending_configuration:
+ self._loadSources(self.pending_configuration)
+
+ def _sourceAdded(self, sourcelist, factory):
+ self.factories.add(factory.uri)
+
+ def _discoveryError(self, sourcelist, uri, reason, unused):
+ self.errors.add(uri)
+
+ def _readyCb(self, soucelist):
+ assert self.factories == self.pending_configuration.getGoodUris()
+ if self.pending_configuration:
+ self._setupTimeline(self.pending_configuration)
+ self.emit("sources-loaded")
+
+ def _loadSources(self, configuration):
+ for uri in configuration.getUris():
+ self.project.sources.addUri(uri)
+
+ def _trackAddedCb(self, timeline, track):
+ if type(track.stream) is AudioStream:
+ self.audioTracks += 1
+ attrname = "audio%d" % self.audioTracks
+ elif type(track.stream) is VideoStream:
+ self.videoTracks += 1
+ attrname = "video%d" % self.videoTracks
+ container = self.container()
+ setattr(self, attrname, container)
+ self.tracks[track] = container
+
+ def _setupTimeline(self, configuration):
+ for name, uri, props in configuration:
+ factory = self.project.sources.getUri(uri)
+ if not factory:
+ raise Exception("Could not find '%s' in sourcelist" %
+ source)
+
+ if not props:
+ continue
+
+ timelineObject = self.timeline.addSourceFactory(factory)
+ setattr(self, name, timelineObject)
+ for trackObject in timelineObject.track_objects:
+ track = self.tracks[trackObject.track]
+ setattr(track, name, trackObject)
+
+ if not timelineObject:
+ raise Exception("Could not add source '%s' to timeline" %
+ source)
+ for prop, value in props.iteritems():
+ setattr(timelineObject, prop, value)
+ self.emit("timeline-configured")
+
+ def scrub(self, context, finalTime, finalPriority, delay=100, maxtime = 7200 * gst.SECOND, maxpriority =10, steps = 10):
+ """ Scrubs an editing context as if a user were frantically dragging a
+ clips with the mouse """
+
+ self.scrubContext = context
+ self.scrubTime = finalTime
+ self.scrubPriority = finalPriority
+ self.scrubMaxPriority = maxpriority
+ self.scrubMaxTime = maxtime
+ self.scrubCount = 0
+ self.scrubSteps = steps
+
+ self.watchdog.keepAlive()
+ gobject.timeout_add(delay, self._scrubTimeoutCb)
+
+ def _scrubTimeoutCb(self):
+ time_ = random.randint(0, self.scrubMaxTime)
+ priority = random.randint(0, self.scrubMaxPriority)
+ self.scrubContext.editTo(time_, priority)
+ self.scrubCount += 1
+ self.watchdog.keepAlive()
+
+ if self.scrubCount < self.scrubSteps:
+ return True
+ else:
+ self.scrubContext.editTo(self.scrubTime, self.scrubPriority)
+ self.scrubContext.finish()
+ self.emit("scrub-done")
+ return False
+
+ def run(self):
+ self.watchdog.start()
+ if self.no_ui:
+ self.instance.run(["--no-ui"])
+ else:
+ self.instance.run([])
+
class Base(TestCase):
"""
Creates and runs an InteractivePitivi object, then starts the mainloop.
Uses a WatchDog to ensure that test cases will eventually terminate with an
assertion failure if runtime errors occur inside the mainloop."""
- watchdog_timeout = 1000
- no_ui = True
-
def setUp(self):
TestCase.setUp(self)
ptv = InteractivePitivi()
@@ -133,185 +302,131 @@ class Base(TestCase):
self.assertEquals(pitivi.instance.PiTiVi, ptv)
self.ptv = ptv
- # setup a watchdog timer
- self.watchdog = WatchDog(ptv.mainloop, self.watchdog_timeout)
-
- # connect to the new-project-loaded signal
- self.ptv.connect("new-project-loaded", self._projectLoadedCb)
-
- def testPiTiVi(self):
- self.runPitivi()
-
- def runPitivi(self):
- self.watchdog.start()
- if self.no_ui:
- self.ptv.run(["--no-ui"])
- else:
- self.ptv.run([])
-
- def _projectLoadedCb(self, pitivi, project):
- gobject.idle_add(self.ptv.shutdown)
+ # create an instance runner
+ self.runner = InstanceRunner(ptv)
def tearDown(self):
# make sure we aren't exiting because our watchdog activated
- self.assertFalse(self.watchdog.activated)
+ self.assertFalse(self.runner.watchdog.activated)
# make sure the instance has been unset
self.assertEquals(pitivi.instance.PiTiVi, None)
del self.ptv
+ del self.runner
TestCase.tearDown(self)
-class TestImport(Base):
+ def testWatchdog(self):
+ self.runner.run()
+ self.assertTrue(self.runner.watchdog.activated)
+ self.runner.watchdog.activated = False
- """Test discoverer, sourcelist, and project integration. When the new
- project loads, attempt to add both existing and non-existing sources. Make
- sure that an error is emitted when a source fails to load, and that
- source-added is emitted when sources are loaded successfully."""
+ def testBasic(self):
- def setUp(self):
- self.factories = set()
- self.errors = set()
- Base.setUp(self)
+ def newProjectLoaded(pitivi, project):
+ gobject.idle_add(self.ptv.shutdown)
- def _sourceAdded(self, sourcelist, factory):
- self.factories.add(factory.uri)
+ self.ptv.connect("new-project-loaded", newProjectLoaded)
+ self.runner.run()
- def _discoveryError(self, sourcelist, uri, reason, unused):
- self.errors.add(uri)
+ def testImport(self):
- def _readyCb(self, soucelist):
- self.failUnlessEqual(self.factories, set((test1, test2)))
- self.failUnlessEqual(self.errors, set((test3,)))
- self.ptv.current._dirty = False
- self.ptv.shutdown()
-
- def _projectLoadedCb(self, pitivi, project):
- self.ptv.current.sources.connect("source-added", self._sourceAdded)
- self.ptv.current.sources.connect("discovery-error", self._discoveryError)
- self.ptv.current.sources.connect("ready", self._readyCb)
- self.ptv.current.sources.addUri(test1)
- self.ptv.current.sources.addUri(test2)
- self.ptv.current.sources.addUri(test3)
-
-class TestTimeline(Base):
-
- """Base test case for tests involving the timeline. Imports several
- sources to the project and adds them to the timeline, optionally setting
- their start, media-start, and duration properties."""
-
- sources = (
- (
- "timelineObject1",
+ def sourcesLoaded(runner):
+ gobject.idle_add(self.ptv.shutdown)
+
+ config = Configuration()
+ config.addSource("test1", test1)
+ config.addSource("test2", test2)
+ config.addBadSource("test3", test3)
+
+ self.runner.connect("sources-loaded", sourcesLoaded)
+ self.runner.loadConfiguration(config)
+ self.runner.run()
+
+ self.assertFalse(hasattr(self.runner,test1))
+ self.assertFalse(hasattr(self.runner,test2))
+ self.failUnlessEqual(self.runner.factories, set((test1, test2)))
+ self.failUnlessEqual(self.runner.errors, set((test3,)))
+
+ def testConfigureTimeline(self):
+
+ config = Configuration()
+ config.addSource(
+ "object1",
test1,
{
"start" : 0,
"duration" : gst.SECOND,
"media-start" : gst.SECOND,
- }
- ),
- (
- "timelineObject2",
+ })
+ config.addSource(
+ "object2",
test2,
{
"start" : gst.SECOND,
"duration" : gst.SECOND,
- }
- ),
- )
-
- def setUp(self):
- self.timelineObjects = set()
- Base.setUp(self)
-
- def _projectLoadedCb(self, pitivi, project):
- self.ptv.current.sources.connect("ready", self._readyCb)
- for name, uri, props in self.sources:
- self.ptv.current.sources.addUri(uri)
-
- def _timelineSetup(self):
- self.ptv.current.timeline.connect("timeline-object-added",
- self._timelineObjectAddedCb)
-
- for name, uri, props in self.sources:
- factory = self.ptv.current.sources.getUri(uri)
- if not factory:
- raise Exception("Could not find '%s' in sourcelist" %
- source)
- timelineObject = self.ptv.current.timeline.addSourceFactory(factory)
- setattr(self, name, timelineObject)
- if not timelineObject:
- raise Exception("Could not add source '%s' to timeline" %
- source)
- if props:
- for prop, value in props.iteritems():
- setattr(timelineObject, prop, value)
-
- def _timelineObjectAddedCb(self, timeline, object):
- self.timelineObjects.add(object)
-
- def _readyCb(self, unused_sourcelist):
- self._timelineSetup()
- self._verifyTimeline()
- self._interactiveTest()
-
- def _interactiveTest(self):
- self.ptv.current._dirty = False
- self.ptv.shutdown()
-
- def _verifyTimeline(self):
- for name, uri, props in self.sources:
- timelineObject = getattr(self, name)
- if timelineObject:
- for prop, value in props.iteritems():
- if not getattr(timelineObject, prop) == value:
- raise Exception("'%s'.%s != %r" % (uri, prop, value))
-
- def scrubContext(self, context, finalTime, finalPriority, callback=None,
- delay=100, maxtime = 7200 * gst.SECOND, maxpriority =10, steps = 10):
- """ Scrubs an editing context as if a user were frantically dragging a
- clips with the mouse """
-
- self._scrubContext = context
- self._scrubTime = finalTime
- self._scrubPriority = finalPriority
- self._scrubMaxPriority = maxpriority
- self._scrubMaxTime = maxtime
- self._scrubCount = 0
- self._scrubSteps = steps
- self._scrubCallback = callback
+ })
+
+ def timelineConfigured(runner):
+ config.matches(self.runner)
+ gobject.idle_add(self.ptv.shutdown)
+
+ self.runner.loadConfiguration(config)
+ self.runner.connect("timeline-configured", timelineConfigured)
+ self.runner.run()
+
+ self.assertTrue(self.runner.object1)
+ self.assertTrue(self.runner.object2)
+ self.assertTrue(self.runner.video1.object1)
+ self.assertTrue(self.runner.audio1.object2)
+
+ def testMoveSources(self):
+ initial = Configuration()
+ initial.addSource(
+ "object1",
+ test1,
+ {
+ "start" : 0,
+ "duration" : gst.SECOND,
+ "media-start" : gst.SECOND,
+ "priority" : 0
+ })
+ initial.addSource(
+ "object2",
+ test2,
+ {
+ "start" : gst.SECOND,
+ "duration" : gst.SECOND,
+ "priority" : 1,
+ })
+ final = Configuration()
+ final.addSource(
+ "object1",
+ test1,
+ {
+ "start" : 10 * gst.SECOND,
+ })
+ final.addSource(
+ "object2",
+ test2,
+ {
+ "start" : 11 * gst.SECOND,
+ "priority" : 2,
+ })
- self.watchdog.keepAlive()
- gobject.timeout_add(delay, self._scrubTimeoutCb)
+ def timelineConfigured(runner):
+ context = MoveContext(self.runner.timeline,
+ self.runner.video1.object1,
+ set((self.runner.audio1.object2,)))
+ self.runner.scrub(context, 10 * gst.SECOND, 1, steps=10)
- def _scrubTimeoutCb(self):
- time_ = random.randint(0, self._scrubMaxTime)
- priority = random.randint(0, self._scrubMaxPriority)
- self._scrubContext.editTo(time_, priority)
- self._scrubCount += 1
- self.watchdog.keepAlive()
+ def scrubDone(runner):
+ final.matches(runner)
+ gobject.idle_add(self.ptv.shutdown)
- if self._scrubCount < self._scrubSteps:
- return True
- else:
- self._scrubContext.editTo(self._scrubTime, self._scrubPriority)
- self._scrubCallback()
- return False
+ self.runner.loadConfiguration(initial)
+ self.runner.connect("timeline-configured", timelineConfigured)
+ self.runner.connect("scrub-done", scrubDone)
-class TestMoveSources(TestTimeline):
-
- def _finish(self):
- self.context.finish()
- self.assertEquals(self.timelineObject1.start, 10 * gst.SECOND)
- self.assertEquals(self.timelineObject1.priority, 2)
- self.assertEquals(self.timelineObject2.start, 11 * gst.SECOND)
- self.assertEquals(self.timelineObject2.priority, 2)
- self.ptv.current.dirty = False
- self.ptv.shutdown()
-
- def _interactiveTest(self):
- timeline = self.ptv.current.timeline
- self.context = MoveContext(timeline, self.timelineObject1,
- set((self.timelineObject2,)))
- self.scrubContext(self.context, 10 * gst.SECOND, 2, self._finish)
+ self.runner.run()
if __name__ == "__main__":
unittest.main()
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]