[pitivi] Add simple undo/redo system.



commit 0fb582ed5cf845cba7c6a6f02c33493acc47c099
Author: Alessandro Decina <alessandro d gmail com>
Date:   Tue Jun 9 18:10:02 2009 +0200

    Add simple undo/redo system.

 pitivi/undo.py     |  216 +++++++++++++++++++++++++
 tests/test_undo.py |  454 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 670 insertions(+), 0 deletions(-)
---
diff --git a/pitivi/undo.py b/pitivi/undo.py
new file mode 100644
index 0000000..dfd3aa4
--- /dev/null
+++ b/pitivi/undo.py
@@ -0,0 +1,216 @@
+# PiTiVi , Non-linear video editor
+#
+#       pitivi/undo.py
+#
+# Copyright (c) 2009, Alessandro Decina <alessandro d gmail com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+
+from pitivi.signalinterface import Signallable
+
+class UndoError(Exception):
+    pass
+
+class UndoWrongStateError(UndoError):
+    pass
+
+class UndoableAction(Signallable):
+    __signals__ = {
+        "done": [],
+        "undone": [],
+        "undone": [],
+        "error": ["exception"]
+    }
+
+    def do(self):
+        raise NotImplementedError()
+
+    def undo(self):
+        raise NotImplementedError()
+
+    def _done(self):
+        self.emit("done")
+
+    def _undone(self):
+        self.emit("undone")
+
+    def _error(self, exception):
+        self.emit("error", exception)
+
+class UndoableActionStack(UndoableAction):
+    __signals__ = {
+        "done": [],
+        "undone": [],
+        "error": ["exception"],
+    }
+
+    def __init__(self):
+        self.done_actions = []
+        self.undone_actions = []
+        self.actions = []
+
+    def push(self, action):
+        self.done_actions.append(action)
+
+    def _runAction(self, action_list, methodName, signalName,
+            continueCallback, finishCallback):
+        try:
+            action = action_list.pop(-1)
+        except IndexError:
+            finishCallback()
+            return
+
+        if action_list is self.done_actions:
+            self.undone_actions.append(action)
+        else:
+            self.done_actions.append(action)
+
+        self._connectToAction(action, action_list,
+                signalName, continueCallback, finishCallback)
+
+        method = getattr(action, methodName)
+        try:
+            method()
+        except Exception, e:
+            self._actionErrorCb(action, e, finishCallback)
+
+    def do(self):
+        self._runAction(self.undone_actions, "do", "done",
+                continueCallback=self.do, finishCallback=self._done)
+
+    def undo(self):
+        self._runAction(self.done_actions, "undo", "undone",
+                continueCallback=self.undo, finishCallback=self._undone)
+
+    def _connectToAction(self, action, action_list, signalName,
+            continueCallback, finishCallback):
+        action.connect(signalName, self._actionDoneOrUndoneCb,
+                action_list, continueCallback, finishCallback)
+        action.connect("error", self._actionErrorCb, finishCallback)
+
+    def _disconnectFromAction(self, action):
+        action.disconnect_by_func(self._actionDoneOrUndoneCb)
+        action.disconnect_by_func(self._actionErrorCb)
+
+    def _actionDoneOrUndoneCb(self, action, action_list,
+            continueCallback, finishCallback):
+        self._disconnectFromAction(action)
+
+        if not action_list:
+            finishCallback()
+            return
+
+        continueCallback()
+
+    def _actionErrorCb(self, action, exception, finishCallback):
+        self._disconnectFromAction(action)
+
+        self._error(exception)
+
+    def _done(self):
+        self.emit("done")
+
+    def _undone(self):
+        self.emit("undone")
+
+    def _error(self, exception):
+        self.emit("error", exception)
+
+class UndoableActionLog(Signallable):
+    __signals__ = {
+        "begin": ["stack", "nested"],
+        "push": ["stack", "action"],
+        "rollback": ["stack", "nested"],
+        "commit": ["stack", "nested"],
+        "undo": ["stack"],
+        "redo": ["stack"],
+        "error": ["exception"]
+    }
+    def __init__(self):
+        self.undo_stacks = []
+        self.redo_stacks = []
+        self.stacks = []
+
+    def begin(self):
+        stack = UndoableActionStack()
+        nested = self._stackIsNested(stack)
+        self.stacks.append(stack)
+        self.emit("begin", stack, nested)
+
+    def push(self, action):
+        stack = self._getTopmostStack()
+        if stack is None:
+            return
+        stack.push(action)
+        self.emit("push", stack, action)
+
+    def rollback(self):
+        stack = self._getTopmostStack(pop=True)
+        if stack is None:
+            return
+        nested = self._stackIsNested(stack)
+        self.emit("rollback", stack, nested)
+        stack.undo()
+
+    def commit(self):
+        stack = self._getTopmostStack(pop=True)
+        if stack is None:
+            return
+        nested = self._stackIsNested(stack)
+        if not self.stacks:
+            self.undo_stacks.append(stack)
+        else:
+            self.stacks[-1].push(stack)
+
+        self.emit("commit", stack, nested)
+
+    def undo(self):
+        if self.stacks or not self.undo_stacks:
+            self._error(UndoWrongStateError())
+            return
+
+        stack = self.undo_stacks.pop(-1)
+        stack.undo()
+
+        self.redo_stacks.append(stack)
+        self.emit("undo", stack)
+
+    def redo(self):
+        if self.stacks or not self.redo_stacks:
+            return self._error(UndoWrongStateError())
+
+        stack = self.redo_stacks.pop(-1)
+        stack.do()
+        self.undo_stacks.append(stack)
+        self.emit("redo", stack)
+
+    def _getTopmostStack(self, pop=False):
+        stack = None
+        try:
+            if pop:
+                stack = self.stacks.pop(-1)
+            else:
+                stack = self.stacks[-1]
+        except IndexError:
+            return self._error(UndoWrongStateError())
+
+        return stack
+
+    def _stackIsNested(self, stack):
+        return bool(len(self.stacks))
+
+    def _error(self, exception):
+        self.emit("error", exception)
diff --git a/tests/test_undo.py b/tests/test_undo.py
new file mode 100644
index 0000000..e36977c
--- /dev/null
+++ b/tests/test_undo.py
@@ -0,0 +1,454 @@
+# PiTiVi , Non-linear video editor
+#
+#       tests/test_undo.py
+#
+# Copyright (c) 2009, Alessandro Decina <alessandro d gmail com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the
+# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+
+from unittest import TestCase
+
+from pitivi.undo import UndoError, UndoWrongStateError, UndoableAction, \
+        UndoableActionStack, UndoableActionLog
+
+class DummyUndoableAction(UndoableAction):
+    done_ = True
+
+    def do(self):
+        self.done_ = True
+        self._done()
+
+    def undo(self):
+        self.done_ = False
+        self._undone()
+
+class TestUndoableAction(TestCase):
+    def testSimpleSignals(self):
+        """
+        Test signal emission from _done() and _undone().
+        """
+        state = {"done": False}
+        def doneCb(action, val):
+            state["done"] = val
+
+        action = DummyUndoableAction()
+        action.connect("done", doneCb, True)
+        action.connect("undone", doneCb, False)
+
+        action.undo()
+        self.failIf(state["done"])
+
+        action.do()
+        self.failUnless(state["done"])
+
+
+class TestUndoableActionStack(TestCase):
+    def testDoUndoEmpty(self):
+        """
+        Undo an empty stack.
+        """
+        state = {"done": True}
+        def doneCb(action, value):
+            state["done"] = value
+
+        stack = UndoableActionStack()
+        stack.connect("done", doneCb, True)
+        stack.connect("undone", doneCb, False)
+
+        stack.undo()
+        self.failIf(state["done"])
+
+        stack.do()
+        self.failUnless(state["done"])
+
+    def testUndoDo(self):
+        """
+        Test an undo() do() sequence.
+        """
+        state = {"done": True, "actions": 2}
+        def doneCb(action, value):
+            state["done"] = value
+
+        state["done"] = 2
+        class Action(UndoableAction):
+            def do(self):
+                state["actions"] += 1
+                self._done()
+
+            def undo(self):
+                state["actions"] -= 1
+                self._undone()
+
+        stack = UndoableActionStack()
+        stack.connect("done", doneCb, True)
+        stack.connect("undone", doneCb, False)
+        action1 = Action()
+        action2 = Action()
+        stack.push(action1)
+        stack.push(action2)
+
+        stack.undo()
+        self.failUnlessEqual(state["actions"], 0)
+        self.failIf(state["done"])
+
+        stack.do()
+        self.failUnlessEqual(state["actions"], 2)
+        self.failUnless(state["done"])
+
+    def testUndoError(self):
+        """
+        Undo a stack containing a failing action.
+        """
+        state = {"done": True, "error": None}
+        def doneCb(action, value):
+            state["done"] = value
+
+        def errorCb(action, exception):
+            state["error"] = exception
+
+        state["actions"] = 2
+        class Action(UndoableAction):
+            def undo(self):
+                state["actions"] -= 1
+                if state["actions"] == 1:
+                    self.__class__.undo = self.__class__.undo_fail
+
+                self._undone()
+
+            def undo_fail(self):
+                self._error(UndoError("boom"))
+
+        stack = UndoableActionStack()
+        stack.connect("done", doneCb)
+        stack.connect("error", errorCb)
+        action1 = Action()
+        action2 = Action()
+        stack.push(action1)
+        stack.push(action2)
+
+        stack.undo()
+        self.failUnlessEqual(state["actions"], 1)
+        self.failUnless(state["done"])
+        self.failUnless(state["error"])
+
+    def testUndoBrokenAction(self):
+        """
+        Undo a stack containing a broken action (raises an exception whereas it
+        should be emitting the "error" signal.
+        """
+        state = {"done": True, "error": None}
+        def doneCb(action, value):
+            state["done"] = value
+
+        def errorCb(action, exception):
+            state["error"] = exception
+
+        state["actions"] = 2
+        class Action(UndoableAction):
+            def undo(self):
+                state["actions"] -= 1
+                if state["actions"] == 1:
+                    self.__class__.undo = self.__class__.undo_fail
+
+                self._undone()
+
+            def undo_fail(self):
+                raise Exception("boom")
+
+        stack = UndoableActionStack()
+        stack.connect("undone", doneCb, False)
+        stack.connect("error", errorCb)
+        action1 = Action()
+        action2 = Action()
+        stack.push(action1)
+        stack.push(action2)
+
+        stack.undo()
+        self.failUnlessEqual(state["actions"], 1)
+        self.failUnless(state["done"])
+        self.failUnless(state["error"])
+
+
+class TestUndoableActionLog(TestCase):
+    def setUp(self):
+        self.log = UndoableActionLog()
+        self._connectToUndoableActionLog(self.log)
+        self.signals = []
+
+    def tearDown(self):
+        self._disconnectFromUndoableActionLog(self.log)
+
+    def _undoActionLogSignalCb(self, log, *args):
+        args = list(args)
+        signalName = args.pop(-1)
+        self.signals.append((signalName, args))
+
+    def _connectToUndoableActionLog(self, log):
+        for signalName in ("begin", "push", "rollback", "commit",
+                    "undo", "redo", "error"):
+            log.connect(signalName, self._undoActionLogSignalCb, signalName)
+
+    def _disconnectFromUndoableActionLog(self, log):
+        self.log.disconnect_by_func(self._undoActionLogSignalCb)
+
+    def testRollbackWrongState(self):
+        self.log.rollback()
+        self.failUnlessEqual(len(self.signals), 1)
+        name, (exception,) = self.signals[0]
+        self.failUnlessEqual(name, "error")
+        self.failUnless(isinstance(exception, UndoWrongStateError))
+
+    def testCommitWrongState(self):
+        self.log.commit()
+        self.failUnlessEqual(len(self.signals), 1)
+        name, (exception,) = self.signals[0]
+        self.failUnlessEqual(name, "error")
+        self.failUnless(isinstance(exception, UndoWrongStateError))
+
+    def testPushWrongState(self):
+        self.log.push(None)
+        self.failUnlessEqual(len(self.signals), 1)
+        name, (exception,) = self.signals[0]
+        self.failUnlessEqual(name, "error")
+        self.failUnless(isinstance(exception, UndoWrongStateError))
+
+    def testUndoWrongState(self):
+        self.log.undo()
+        self.failUnlessEqual(len(self.signals), 1)
+        name, (exception,) = self.signals[0]
+        self.failUnlessEqual(name, "error")
+        self.failUnless(isinstance(exception, UndoWrongStateError))
+
+    def testRedoWrongState(self):
+        self.log.redo()
+        self.failUnlessEqual(len(self.signals), 1)
+        name, (exception,) = self.signals[0]
+        self.failUnlessEqual(name, "error")
+        self.failUnless(isinstance(exception, UndoWrongStateError))
+
+    def testCommit(self):
+        """
+        Commit a stack.
+        """
+        self.failUnlessEqual(len(self.log.undo_stacks), 0)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+        self.log.begin()
+        self.failUnlessEqual(len(self.signals), 1)
+        name, (stack, nested) = self.signals[0]
+        self.failUnlessEqual(name, "begin")
+        self.failIf(nested)
+
+        self.failUnlessEqual(self.log.undo_stacks, [])
+        self.log.commit()
+        self.failUnlessEqual(len(self.signals), 2)
+        name, (stack, nested) = self.signals[1]
+        self.failUnlessEqual(name, "commit")
+        self.failIf(nested)
+        self.failUnlessEqual(len(self.log.undo_stacks), 1)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+    def testNestedCommit(self):
+        """
+        Do two nested commits.
+        """
+        self.failUnlessEqual(len(self.log.undo_stacks), 0)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+        self.log.begin()
+        self.failUnlessEqual(len(self.signals), 1)
+        name, (stack, nested) = self.signals[0]
+        self.failUnlessEqual(name, "begin")
+        self.failIf(nested)
+
+        self.failUnlessEqual(len(self.log.undo_stacks), 0)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+        self.log.begin()
+        self.failUnlessEqual(len(self.signals), 2)
+        name, (stack, nested) = self.signals[1]
+        self.failUnlessEqual(name, "begin")
+        self.failUnless(nested)
+
+        self.failUnlessEqual(self.log.undo_stacks, [])
+        self.log.commit()
+        self.failUnlessEqual(len(self.signals), 3)
+        name, (stack, nested) = self.signals[2]
+        self.failUnlessEqual(name, "commit")
+        self.failUnless(nested)
+        self.failUnlessEqual(len(self.log.undo_stacks), 0)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+        self.failUnlessEqual(self.log.undo_stacks, [])
+        self.log.commit()
+        self.failUnlessEqual(len(self.signals), 4)
+        name, (stack, nested) = self.signals[3]
+        self.failUnlessEqual(name, "commit")
+        self.failIf(nested)
+        self.failUnlessEqual(len(self.log.undo_stacks), 1)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+    def testRollback(self):
+        """
+        Test a rollback.
+        """
+        self.failUnlessEqual(len(self.log.undo_stacks), 0)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+        self.log.begin()
+        self.failUnlessEqual(len(self.signals), 1)
+        name, (stack, nested) = self.signals[0]
+        self.failUnlessEqual(name, "begin")
+        self.failIf(nested)
+
+        self.log.rollback()
+        self.failUnlessEqual(len(self.signals), 2)
+        name, (stack, nested) = self.signals[1]
+        self.failUnlessEqual(name, "rollback")
+        self.failIf(nested)
+        self.failUnlessEqual(len(self.log.undo_stacks), 0)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+    def testNestedRollback(self):
+        """
+        Test two nested rollbacks.
+        """
+        self.failUnlessEqual(len(self.log.undo_stacks), 0)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+        self.log.begin()
+        self.failUnlessEqual(len(self.signals), 1)
+        name, (stack, nested) = self.signals[0]
+        self.failUnlessEqual(name, "begin")
+        self.failIf(nested)
+
+        self.failUnlessEqual(len(self.log.undo_stacks), 0)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+        self.log.begin()
+        self.failUnlessEqual(len(self.signals), 2)
+        name, (stack, nested) = self.signals[1]
+        self.failUnlessEqual(name, "begin")
+        self.failUnless(nested)
+
+        self.log.rollback()
+        self.failUnlessEqual(len(self.signals), 3)
+        name, (stack, nested) = self.signals[2]
+        self.failUnlessEqual(name, "rollback")
+        self.failUnless(nested)
+        self.failUnlessEqual(len(self.log.undo_stacks), 0)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+        self.log.rollback()
+        self.failUnlessEqual(len(self.signals), 4)
+        name, (stack, nested) = self.signals[3]
+        self.failUnlessEqual(name, "rollback")
+        self.failIf(nested)
+        self.failUnlessEqual(len(self.log.undo_stacks), 0)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+    def testUndoRedo(self):
+        """
+        Try an undo() redo() sequence.
+        """
+        # begin
+        self.log.begin()
+        self.failUnlessEqual(len(self.signals), 1)
+        name, (stack, nested) = self.signals[0]
+        self.failUnlessEqual(name, "begin")
+        self.failIf(nested)
+
+        # push two actions
+        action1 = DummyUndoableAction()
+        self.log.push(action1)
+        self.failUnlessEqual(len(self.signals), 2)
+        name, (stack, signalAction) = self.signals[1]
+        self.failUnlessEqual(name, "push")
+        self.failUnless(action1 is signalAction)
+
+        action2 = DummyUndoableAction()
+        self.log.push(action2)
+        self.failUnlessEqual(len(self.signals), 3)
+        name, (stack, signalAction) = self.signals[2]
+        self.failUnlessEqual(name, "push")
+        self.failUnless(action2 is signalAction)
+
+        # commit
+        self.failUnlessEqual(len(self.log.undo_stacks), 0)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+        self.log.commit()
+        self.failUnlessEqual(len(self.signals), 4)
+        name, (stack, nested) = self.signals[3]
+        self.failUnlessEqual(name, "commit")
+        self.failIf(nested)
+        self.failUnlessEqual(len(self.log.undo_stacks), 1)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+        self.failUnless(action1.done_)
+        self.failUnless(action2.done_)
+
+        # undo what we just committed
+        self.log.undo()
+        self.failUnlessEqual(len(self.signals), 5)
+        name, stack = self.signals[4]
+        self.failUnlessEqual(name, "undo")
+        self.failUnlessEqual(len(self.log.undo_stacks), 0)
+        self.failUnlessEqual(len(self.log.redo_stacks), 1)
+
+        self.failIf(action1.done_)
+        self.failIf(action2.done_)
+
+        # redo
+        self.log.redo()
+        self.failUnlessEqual(len(self.signals), 6)
+        name, stack = self.signals[5]
+        self.failUnlessEqual(name, "redo")
+        self.failUnlessEqual(len(self.log.undo_stacks), 1)
+        self.failUnlessEqual(len(self.log.redo_stacks), 0)
+
+        self.failUnless(action1.done_)
+        self.failUnless(action2.done_)
+
+    def testOrder(self):
+        """
+        Test that actions are undone and redone in the correct order.
+        """
+        call_sequence = []
+        class Action(UndoableAction):
+            def __init__(self, n):
+                self.n = n
+
+            def do(self):
+               call_sequence.append("do%s" % self.n)
+               self._done()
+
+            def undo(self):
+               call_sequence.append("undo%s" % self.n)
+               self._undone()
+
+        action1 = Action(1)
+        action2 = Action(2)
+        action3 = Action(3)
+
+        self.log.begin()
+        self.log.push(action1)
+        self.log.begin()
+        self.log.push(action2)
+        self.log.commit()
+        self.log.push(action3)
+        self.log.commit()
+
+        self.log.undo()
+        self.failUnlessEqual(call_sequence, ["undo3", "undo2", "undo1"])
+
+        call_sequence[:] = []
+        self.log.redo()
+        self.failUnlessEqual(call_sequence, ["do1", "do2", "do3"])
+



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]