[pitivi] Add simple undo/redo system.
- From: Edward Hervey <edwardrv src gnome org>
- To: svn-commits-list gnome org
- Subject: [pitivi] Add simple undo/redo system.
- Date: Thu, 11 Jun 2009 12:38:33 -0400 (EDT)
commit 0fb582ed5cf845cba7c6a6f02c33493acc47c099
Author: Alessandro Decina <alessandro d gmail com>
Date: Tue Jun 9 18:10:02 2009 +0200
Add simple undo/redo system.
pitivi/undo.py | 216 +++++++++++++++++++++++++
tests/test_undo.py | 454 ++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 670 insertions(+), 0 deletions(-)
---
diff --git a/pitivi/undo.py b/pitivi/undo.py
new file mode 100644
index 0000000..dfd3aa4
--- /dev/null
+++ b/pitivi/undo.py
@@ -0,0 +1,216 @@
+# PiTiVi , Non-linear video editor
+#
+# pitivi/undo.py
+#
+# Copyright (c) 2009, Alessandro Decina <alessandro d gmail com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+
+from pitivi.signalinterface import Signallable
+
+class UndoError(Exception):
+ pass
+
+class UndoWrongStateError(UndoError):
+ pass
+
+class UndoableAction(Signallable):
+ __signals__ = {
+ "done": [],
+ "undone": [],
+ "undone": [],
+ "error": ["exception"]
+ }
+
+ def do(self):
+ raise NotImplementedError()
+
+ def undo(self):
+ raise NotImplementedError()
+
+ def _done(self):
+ self.emit("done")
+
+ def _undone(self):
+ self.emit("undone")
+
+ def _error(self, exception):
+ self.emit("error", exception)
+
+class UndoableActionStack(UndoableAction):
+ __signals__ = {
+ "done": [],
+ "undone": [],
+ "error": ["exception"],
+ }
+
+ def __init__(self):
+ self.done_actions = []
+ self.undone_actions = []
+ self.actions = []
+
+ def push(self, action):
+ self.done_actions.append(action)
+
+ def _runAction(self, action_list, methodName, signalName,
+ continueCallback, finishCallback):
+ try:
+ action = action_list.pop(-1)
+ except IndexError:
+ finishCallback()
+ return
+
+ if action_list is self.done_actions:
+ self.undone_actions.append(action)
+ else:
+ self.done_actions.append(action)
+
+ self._connectToAction(action, action_list,
+ signalName, continueCallback, finishCallback)
+
+ method = getattr(action, methodName)
+ try:
+ method()
+ except Exception, e:
+ self._actionErrorCb(action, e, finishCallback)
+
+ def do(self):
+ self._runAction(self.undone_actions, "do", "done",
+ continueCallback=self.do, finishCallback=self._done)
+
+ def undo(self):
+ self._runAction(self.done_actions, "undo", "undone",
+ continueCallback=self.undo, finishCallback=self._undone)
+
+ def _connectToAction(self, action, action_list, signalName,
+ continueCallback, finishCallback):
+ action.connect(signalName, self._actionDoneOrUndoneCb,
+ action_list, continueCallback, finishCallback)
+ action.connect("error", self._actionErrorCb, finishCallback)
+
+ def _disconnectFromAction(self, action):
+ action.disconnect_by_func(self._actionDoneOrUndoneCb)
+ action.disconnect_by_func(self._actionErrorCb)
+
+ def _actionDoneOrUndoneCb(self, action, action_list,
+ continueCallback, finishCallback):
+ self._disconnectFromAction(action)
+
+ if not action_list:
+ finishCallback()
+ return
+
+ continueCallback()
+
+ def _actionErrorCb(self, action, exception, finishCallback):
+ self._disconnectFromAction(action)
+
+ self._error(exception)
+
+ def _done(self):
+ self.emit("done")
+
+ def _undone(self):
+ self.emit("undone")
+
+ def _error(self, exception):
+ self.emit("error", exception)
+
+class UndoableActionLog(Signallable):
+ __signals__ = {
+ "begin": ["stack", "nested"],
+ "push": ["stack", "action"],
+ "rollback": ["stack", "nested"],
+ "commit": ["stack", "nested"],
+ "undo": ["stack"],
+ "redo": ["stack"],
+ "error": ["exception"]
+ }
+ def __init__(self):
+ self.undo_stacks = []
+ self.redo_stacks = []
+ self.stacks = []
+
+ def begin(self):
+ stack = UndoableActionStack()
+ nested = self._stackIsNested(stack)
+ self.stacks.append(stack)
+ self.emit("begin", stack, nested)
+
+ def push(self, action):
+ stack = self._getTopmostStack()
+ if stack is None:
+ return
+ stack.push(action)
+ self.emit("push", stack, action)
+
+ def rollback(self):
+ stack = self._getTopmostStack(pop=True)
+ if stack is None:
+ return
+ nested = self._stackIsNested(stack)
+ self.emit("rollback", stack, nested)
+ stack.undo()
+
+ def commit(self):
+ stack = self._getTopmostStack(pop=True)
+ if stack is None:
+ return
+ nested = self._stackIsNested(stack)
+ if not self.stacks:
+ self.undo_stacks.append(stack)
+ else:
+ self.stacks[-1].push(stack)
+
+ self.emit("commit", stack, nested)
+
+ def undo(self):
+ if self.stacks or not self.undo_stacks:
+ self._error(UndoWrongStateError())
+ return
+
+ stack = self.undo_stacks.pop(-1)
+ stack.undo()
+
+ self.redo_stacks.append(stack)
+ self.emit("undo", stack)
+
+ def redo(self):
+ if self.stacks or not self.redo_stacks:
+ return self._error(UndoWrongStateError())
+
+ stack = self.redo_stacks.pop(-1)
+ stack.do()
+ self.undo_stacks.append(stack)
+ self.emit("redo", stack)
+
+ def _getTopmostStack(self, pop=False):
+ stack = None
+ try:
+ if pop:
+ stack = self.stacks.pop(-1)
+ else:
+ stack = self.stacks[-1]
+ except IndexError:
+ return self._error(UndoWrongStateError())
+
+ return stack
+
+ def _stackIsNested(self, stack):
+ return bool(len(self.stacks))
+
+ def _error(self, exception):
+ self.emit("error", exception)
diff --git a/tests/test_undo.py b/tests/test_undo.py
new file mode 100644
index 0000000..e36977c
--- /dev/null
+++ b/tests/test_undo.py
@@ -0,0 +1,454 @@
+# PiTiVi , Non-linear video editor
+#
+# tests/test_undo.py
+#
+# Copyright (c) 2009, Alessandro Decina <alessandro d gmail com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+
+from unittest import TestCase
+
+from pitivi.undo import UndoError, UndoWrongStateError, UndoableAction, \
+ UndoableActionStack, UndoableActionLog
+
+class DummyUndoableAction(UndoableAction):
+ done_ = True
+
+ def do(self):
+ self.done_ = True
+ self._done()
+
+ def undo(self):
+ self.done_ = False
+ self._undone()
+
+class TestUndoableAction(TestCase):
+ def testSimpleSignals(self):
+ """
+ Test signal emission from _done() and _undone().
+ """
+ state = {"done": False}
+ def doneCb(action, val):
+ state["done"] = val
+
+ action = DummyUndoableAction()
+ action.connect("done", doneCb, True)
+ action.connect("undone", doneCb, False)
+
+ action.undo()
+ self.failIf(state["done"])
+
+ action.do()
+ self.failUnless(state["done"])
+
+
+class TestUndoableActionStack(TestCase):
+ def testDoUndoEmpty(self):
+ """
+ Undo an empty stack.
+ """
+ state = {"done": True}
+ def doneCb(action, value):
+ state["done"] = value
+
+ stack = UndoableActionStack()
+ stack.connect("done", doneCb, True)
+ stack.connect("undone", doneCb, False)
+
+ stack.undo()
+ self.failIf(state["done"])
+
+ stack.do()
+ self.failUnless(state["done"])
+
+ def testUndoDo(self):
+ """
+ Test an undo() do() sequence.
+ """
+ state = {"done": True, "actions": 2}
+ def doneCb(action, value):
+ state["done"] = value
+
+ state["done"] = 2
+ class Action(UndoableAction):
+ def do(self):
+ state["actions"] += 1
+ self._done()
+
+ def undo(self):
+ state["actions"] -= 1
+ self._undone()
+
+ stack = UndoableActionStack()
+ stack.connect("done", doneCb, True)
+ stack.connect("undone", doneCb, False)
+ action1 = Action()
+ action2 = Action()
+ stack.push(action1)
+ stack.push(action2)
+
+ stack.undo()
+ self.failUnlessEqual(state["actions"], 0)
+ self.failIf(state["done"])
+
+ stack.do()
+ self.failUnlessEqual(state["actions"], 2)
+ self.failUnless(state["done"])
+
+ def testUndoError(self):
+ """
+ Undo a stack containing a failing action.
+ """
+ state = {"done": True, "error": None}
+ def doneCb(action, value):
+ state["done"] = value
+
+ def errorCb(action, exception):
+ state["error"] = exception
+
+ state["actions"] = 2
+ class Action(UndoableAction):
+ def undo(self):
+ state["actions"] -= 1
+ if state["actions"] == 1:
+ self.__class__.undo = self.__class__.undo_fail
+
+ self._undone()
+
+ def undo_fail(self):
+ self._error(UndoError("boom"))
+
+ stack = UndoableActionStack()
+ stack.connect("done", doneCb)
+ stack.connect("error", errorCb)
+ action1 = Action()
+ action2 = Action()
+ stack.push(action1)
+ stack.push(action2)
+
+ stack.undo()
+ self.failUnlessEqual(state["actions"], 1)
+ self.failUnless(state["done"])
+ self.failUnless(state["error"])
+
+ def testUndoBrokenAction(self):
+ """
+ Undo a stack containing a broken action (raises an exception whereas it
+ should be emitting the "error" signal.
+ """
+ state = {"done": True, "error": None}
+ def doneCb(action, value):
+ state["done"] = value
+
+ def errorCb(action, exception):
+ state["error"] = exception
+
+ state["actions"] = 2
+ class Action(UndoableAction):
+ def undo(self):
+ state["actions"] -= 1
+ if state["actions"] == 1:
+ self.__class__.undo = self.__class__.undo_fail
+
+ self._undone()
+
+ def undo_fail(self):
+ raise Exception("boom")
+
+ stack = UndoableActionStack()
+ stack.connect("undone", doneCb, False)
+ stack.connect("error", errorCb)
+ action1 = Action()
+ action2 = Action()
+ stack.push(action1)
+ stack.push(action2)
+
+ stack.undo()
+ self.failUnlessEqual(state["actions"], 1)
+ self.failUnless(state["done"])
+ self.failUnless(state["error"])
+
+
+class TestUndoableActionLog(TestCase):
+ def setUp(self):
+ self.log = UndoableActionLog()
+ self._connectToUndoableActionLog(self.log)
+ self.signals = []
+
+ def tearDown(self):
+ self._disconnectFromUndoableActionLog(self.log)
+
+ def _undoActionLogSignalCb(self, log, *args):
+ args = list(args)
+ signalName = args.pop(-1)
+ self.signals.append((signalName, args))
+
+ def _connectToUndoableActionLog(self, log):
+ for signalName in ("begin", "push", "rollback", "commit",
+ "undo", "redo", "error"):
+ log.connect(signalName, self._undoActionLogSignalCb, signalName)
+
+ def _disconnectFromUndoableActionLog(self, log):
+ self.log.disconnect_by_func(self._undoActionLogSignalCb)
+
+ def testRollbackWrongState(self):
+ self.log.rollback()
+ self.failUnlessEqual(len(self.signals), 1)
+ name, (exception,) = self.signals[0]
+ self.failUnlessEqual(name, "error")
+ self.failUnless(isinstance(exception, UndoWrongStateError))
+
+ def testCommitWrongState(self):
+ self.log.commit()
+ self.failUnlessEqual(len(self.signals), 1)
+ name, (exception,) = self.signals[0]
+ self.failUnlessEqual(name, "error")
+ self.failUnless(isinstance(exception, UndoWrongStateError))
+
+ def testPushWrongState(self):
+ self.log.push(None)
+ self.failUnlessEqual(len(self.signals), 1)
+ name, (exception,) = self.signals[0]
+ self.failUnlessEqual(name, "error")
+ self.failUnless(isinstance(exception, UndoWrongStateError))
+
+ def testUndoWrongState(self):
+ self.log.undo()
+ self.failUnlessEqual(len(self.signals), 1)
+ name, (exception,) = self.signals[0]
+ self.failUnlessEqual(name, "error")
+ self.failUnless(isinstance(exception, UndoWrongStateError))
+
+ def testRedoWrongState(self):
+ self.log.redo()
+ self.failUnlessEqual(len(self.signals), 1)
+ name, (exception,) = self.signals[0]
+ self.failUnlessEqual(name, "error")
+ self.failUnless(isinstance(exception, UndoWrongStateError))
+
+ def testCommit(self):
+ """
+ Commit a stack.
+ """
+ self.failUnlessEqual(len(self.log.undo_stacks), 0)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+ self.log.begin()
+ self.failUnlessEqual(len(self.signals), 1)
+ name, (stack, nested) = self.signals[0]
+ self.failUnlessEqual(name, "begin")
+ self.failIf(nested)
+
+ self.failUnlessEqual(self.log.undo_stacks, [])
+ self.log.commit()
+ self.failUnlessEqual(len(self.signals), 2)
+ name, (stack, nested) = self.signals[1]
+ self.failUnlessEqual(name, "commit")
+ self.failIf(nested)
+ self.failUnlessEqual(len(self.log.undo_stacks), 1)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+ def testNestedCommit(self):
+ """
+ Do two nested commits.
+ """
+ self.failUnlessEqual(len(self.log.undo_stacks), 0)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+ self.log.begin()
+ self.failUnlessEqual(len(self.signals), 1)
+ name, (stack, nested) = self.signals[0]
+ self.failUnlessEqual(name, "begin")
+ self.failIf(nested)
+
+ self.failUnlessEqual(len(self.log.undo_stacks), 0)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+ self.log.begin()
+ self.failUnlessEqual(len(self.signals), 2)
+ name, (stack, nested) = self.signals[1]
+ self.failUnlessEqual(name, "begin")
+ self.failUnless(nested)
+
+ self.failUnlessEqual(self.log.undo_stacks, [])
+ self.log.commit()
+ self.failUnlessEqual(len(self.signals), 3)
+ name, (stack, nested) = self.signals[2]
+ self.failUnlessEqual(name, "commit")
+ self.failUnless(nested)
+ self.failUnlessEqual(len(self.log.undo_stacks), 0)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+ self.failUnlessEqual(self.log.undo_stacks, [])
+ self.log.commit()
+ self.failUnlessEqual(len(self.signals), 4)
+ name, (stack, nested) = self.signals[3]
+ self.failUnlessEqual(name, "commit")
+ self.failIf(nested)
+ self.failUnlessEqual(len(self.log.undo_stacks), 1)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+ def testRollback(self):
+ """
+ Test a rollback.
+ """
+ self.failUnlessEqual(len(self.log.undo_stacks), 0)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+ self.log.begin()
+ self.failUnlessEqual(len(self.signals), 1)
+ name, (stack, nested) = self.signals[0]
+ self.failUnlessEqual(name, "begin")
+ self.failIf(nested)
+
+ self.log.rollback()
+ self.failUnlessEqual(len(self.signals), 2)
+ name, (stack, nested) = self.signals[1]
+ self.failUnlessEqual(name, "rollback")
+ self.failIf(nested)
+ self.failUnlessEqual(len(self.log.undo_stacks), 0)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+ def testNestedRollback(self):
+ """
+ Test two nested rollbacks.
+ """
+ self.failUnlessEqual(len(self.log.undo_stacks), 0)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+ self.log.begin()
+ self.failUnlessEqual(len(self.signals), 1)
+ name, (stack, nested) = self.signals[0]
+ self.failUnlessEqual(name, "begin")
+ self.failIf(nested)
+
+ self.failUnlessEqual(len(self.log.undo_stacks), 0)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+ self.log.begin()
+ self.failUnlessEqual(len(self.signals), 2)
+ name, (stack, nested) = self.signals[1]
+ self.failUnlessEqual(name, "begin")
+ self.failUnless(nested)
+
+ self.log.rollback()
+ self.failUnlessEqual(len(self.signals), 3)
+ name, (stack, nested) = self.signals[2]
+ self.failUnlessEqual(name, "rollback")
+ self.failUnless(nested)
+ self.failUnlessEqual(len(self.log.undo_stacks), 0)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+ self.log.rollback()
+ self.failUnlessEqual(len(self.signals), 4)
+ name, (stack, nested) = self.signals[3]
+ self.failUnlessEqual(name, "rollback")
+ self.failIf(nested)
+ self.failUnlessEqual(len(self.log.undo_stacks), 0)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+ def testUndoRedo(self):
+ """
+ Try an undo() redo() sequence.
+ """
+ # begin
+ self.log.begin()
+ self.failUnlessEqual(len(self.signals), 1)
+ name, (stack, nested) = self.signals[0]
+ self.failUnlessEqual(name, "begin")
+ self.failIf(nested)
+
+ # push two actions
+ action1 = DummyUndoableAction()
+ self.log.push(action1)
+ self.failUnlessEqual(len(self.signals), 2)
+ name, (stack, signalAction) = self.signals[1]
+ self.failUnlessEqual(name, "push")
+ self.failUnless(action1 is signalAction)
+
+ action2 = DummyUndoableAction()
+ self.log.push(action2)
+ self.failUnlessEqual(len(self.signals), 3)
+ name, (stack, signalAction) = self.signals[2]
+ self.failUnlessEqual(name, "push")
+ self.failUnless(action2 is signalAction)
+
+ # commit
+ self.failUnlessEqual(len(self.log.undo_stacks), 0)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+ self.log.commit()
+ self.failUnlessEqual(len(self.signals), 4)
+ name, (stack, nested) = self.signals[3]
+ self.failUnlessEqual(name, "commit")
+ self.failIf(nested)
+ self.failUnlessEqual(len(self.log.undo_stacks), 1)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+ self.failUnless(action1.done_)
+ self.failUnless(action2.done_)
+
+ # undo what we just committed
+ self.log.undo()
+ self.failUnlessEqual(len(self.signals), 5)
+ name, stack = self.signals[4]
+ self.failUnlessEqual(name, "undo")
+ self.failUnlessEqual(len(self.log.undo_stacks), 0)
+ self.failUnlessEqual(len(self.log.redo_stacks), 1)
+
+ self.failIf(action1.done_)
+ self.failIf(action2.done_)
+
+ # redo
+ self.log.redo()
+ self.failUnlessEqual(len(self.signals), 6)
+ name, stack = self.signals[5]
+ self.failUnlessEqual(name, "redo")
+ self.failUnlessEqual(len(self.log.undo_stacks), 1)
+ self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+ self.failUnless(action1.done_)
+ self.failUnless(action2.done_)
+
+ def testOrder(self):
+ """
+ Test that actions are undone and redone in the correct order.
+ """
+ call_sequence = []
+ class Action(UndoableAction):
+ def __init__(self, n):
+ self.n = n
+
+ def do(self):
+ call_sequence.append("do%s" % self.n)
+ self._done()
+
+ def undo(self):
+ call_sequence.append("undo%s" % self.n)
+ self._undone()
+
+ action1 = Action(1)
+ action2 = Action(2)
+ action3 = Action(3)
+
+ self.log.begin()
+ self.log.push(action1)
+ self.log.begin()
+ self.log.push(action2)
+ self.log.commit()
+ self.log.push(action3)
+ self.log.commit()
+
+ self.log.undo()
+ self.failUnlessEqual(call_sequence, ["undo3", "undo2", "undo1"])
+
+ call_sequence[:] = []
+ self.log.redo()
+ self.failUnlessEqual(call_sequence, ["do1", "do2", "do3"])
+
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]