[conduit: 137/138] Move conduit specific code back out to test_sync



commit 5f2e819dff30bad3d7ad3142db792354ecbe3372
Author: John Carr <john carr unrouted co uk>
Date:   Thu May 7 10:28:07 2009 -0700

    Move conduit specific code back out to test_sync
---
 test/soup/test_synchronization.py |   88 +++++++++++++++++++++++++++++++++++++
 test/soup/utils/test.py           |   86 ------------------------------------
 2 files changed, 88 insertions(+), 86 deletions(-)

diff --git a/test/soup/test_synchronization.py b/test/soup/test_synchronization.py
index 88952a5..28b2cb3 100644
--- a/test/soup/test_synchronization.py
+++ b/test/soup/test_synchronization.py
@@ -4,6 +4,18 @@ from soup.modules import ModuleLoader
 
 import conduit
 
+#FIXME: Work out which of these i can drop :)
+import os
+import conduit
+import conduit.utils as Utils
+import conduit.MappingDB as MappingDB
+import conduit.Module as Module
+import conduit.TypeConverter as TypeConverter
+import conduit.Synchronization as Synchronization
+import conduit.ModuleWrapper as ModuleWrapper
+import conduit.Conduit as Conduit
+import conduit.Settings as Settings
+
 def make_testcase(src, src_data, snk, snk_data):
     class TestSynchronization(soup.utils.test.TestCase):
         source_class = src
@@ -20,6 +32,31 @@ def make_testcase(src, src_data, snk, snk_data):
                 for thing in obj.requires:
                     yield thing
 
+        def setUpSync(self):
+            # FIXME: I'd put this in an EnvironmentWrapper, but i need priorities before i can do that :/
+            conduit.IS_INSTALLED =              False
+            conduit.IS_DEVELOPMENT_VERSION =    True
+            conduit.SHARED_DATA_DIR =           os.path.join(soup.get_root(),"data")
+            conduit.SHARED_MODULE_DIR =         os.path.join(soup.get_root(),"conduit","modules")
+            conduit.FILE_IMPL =                 os.environ.get("CONDUIT_FILE_IMPL","GIO")
+            conduit.BROWSER_IMPL =              os.environ.get("CONDUIT_BROWSER_IMPL","system")
+            conduit.SETTINGS_IMPL =             os.environ.get("CONDUIT_SETTINGS_IMPL","Python")
+            conduit.GLOBALS.settings =          Settings.Settings()
+
+            #Set up our own mapping DB so we dont pollute the global one
+            dbFile = os.path.join(os.environ['TEST_DIRECTORY'],Utils.random_string()+".db")
+            conduit.GLOBALS.mappingDB = MappingDB.MappingDB(dbFile)
+
+            self.modules = Module.ModuleManager([])
+            conduit.GLOBALS.moduleManager = self.modules
+            self.modules.load_all(whitelist=None, blacklist=None)
+
+            self.type_converter = conduit.TypeConverter.TypeConverter(self.modules)
+            conduit.GLOBALS.typeConverter = self.type_converter
+            self.sync_manager = conduit.Synchronization.SyncManager(self.type_converter)
+            conduit.GLOBALS.syncManager = self.sync_manager
+
+
         def setUp(self):
             super(TestSynchronization, self).setUp()
             self.setUpSync()
@@ -43,6 +80,57 @@ def make_testcase(src, src_data, snk, snk_data):
             self.source.destroy_dataprovider()
             self.sink.destroy_dataprovider()
 
+        def get_dataprovider(self, name):
+            wrapper = None
+            for dp in self.modules.get_all_modules():
+                if dp.classname == name:
+                    wrapper = self.modules.get_module_wrapper_with_instance(dp.get_key())
+            assert wrapper != None
+            return wrapper
+
+        def get_dataprovider_factory(self, className, die=True):
+            factory = None
+            for f in self.model.dataproviderFactories:
+                if f.__class__.__name__ == className:
+                    factory = f
+            assert factory != None
+            return factory
+
+        def wrap_dataprovider(self, dp):
+            wrapper = ModuleWrapper.ModuleWrapper(
+                             klass=dp.__class__,
+                             initargs=(),
+                             category=None
+                             )
+            wrapper.module = dp
+            return wrapper
+
+        def networked_dataprovider(self, dp):
+            """
+            Dirty evil cludge so we can test networked sync...
+            """
+            factory = self.get_dataprovider_factory("NetworkServerFactory")
+            server = factory.share_dataprovider(dp)
+            assert server != None
+
+            conduit = Conduit.Conduit(self.sync_manager)
+            time.sleep(1)
+
+            factory = self.get_dataprovider_factory("NetworkClientFactory")
+            newdp = factory.dataprovider_create("http://localhost";, conduit.uid, server.get_info())
+            assert newdp != None
+            return self.wrap_dataprovider( newdp() )
+
+        def create_conduit(self):
+            return Conduit.Conduit(self.sync_manager)
+
+        def create_syncset(self):
+            return SyncSet.SyncSet(
+                moduleManager=self.modules,
+                syncManager=self.sync_manager
+            )
+
+
         def add_testdata(self, target, target_data):
             count = 0
             for data in target_data.iter_samples():
diff --git a/test/soup/utils/test.py b/test/soup/utils/test.py
index 033fcbd..d939c43 100644
--- a/test/soup/utils/test.py
+++ b/test/soup/utils/test.py
@@ -8,18 +8,6 @@ import cgitb
 import traceback
 import time
 
-# FIXME: These only need to be here because i'm mixing unittest tweaks with conduit helpers
-import os
-import conduit
-import conduit.utils as Utils
-import conduit.MappingDB as MappingDB
-import conduit.Module as Module
-import conduit.TypeConverter as TypeConverter
-import conduit.Synchronization as Synchronization
-import conduit.ModuleWrapper as ModuleWrapper
-import conduit.Conduit as Conduit
-import conduit.Settings as Settings
-
 CHANGE_ADD = 1
 CHANGE_REPLACE = 2
 CHANGE_DELETE = 3
@@ -54,80 +42,6 @@ class TestCase(unittest.TestCase):
         for feature in self.requires():
             feature.require()
 
-    def setUpSync(self):
-        # FIXME: I'd put this in an EnvironmentWrapper, but i need priorities before i can do that :/
-        conduit.IS_INSTALLED =              False
-        conduit.IS_DEVELOPMENT_VERSION =    True
-        conduit.SHARED_DATA_DIR =           os.path.join(soup.get_root(),"data")
-        conduit.SHARED_MODULE_DIR =         os.path.join(soup.get_root(),"conduit","modules")
-        conduit.FILE_IMPL =                 os.environ.get("CONDUIT_FILE_IMPL","GIO")
-        conduit.BROWSER_IMPL =              os.environ.get("CONDUIT_BROWSER_IMPL","system")
-        conduit.SETTINGS_IMPL =             os.environ.get("CONDUIT_SETTINGS_IMPL","Python")
-        conduit.GLOBALS.settings =          Settings.Settings()
-
-        #Set up our own mapping DB so we dont pollute the global one
-        dbFile = os.path.join(os.environ['TEST_DIRECTORY'],Utils.random_string()+".db")
-        conduit.GLOBALS.mappingDB = MappingDB.MappingDB(dbFile)
-
-        self.modules = Module.ModuleManager([])
-        conduit.GLOBALS.moduleManager = self.modules
-        self.modules.load_all(whitelist=None, blacklist=None)
-
-        self.type_converter = conduit.TypeConverter.TypeConverter(self.modules)
-        conduit.GLOBALS.typeConverter = self.type_converter
-        self.sync_manager = conduit.Synchronization.SyncManager(self.type_converter)
-        conduit.GLOBALS.syncManager = self.sync_manager
-
-    def get_dataprovider(self, name):
-        wrapper = None
-        for dp in self.modules.get_all_modules():
-            if dp.classname == name:
-                wrapper = self.modules.get_module_wrapper_with_instance(dp.get_key())
-        assert wrapper != None
-        return wrapper
-
-    def get_dataprovider_factory(self, className, die=True):
-        factory = None
-        for f in self.model.dataproviderFactories:
-            if f.__class__.__name__ == className:
-                factory = f
-        assert factory != None
-        return factory
-
-    def wrap_dataprovider(self, dp):
-        wrapper = ModuleWrapper.ModuleWrapper(
-                         klass=dp.__class__,
-                         initargs=(),
-                         category=None
-                         )
-        wrapper.module = dp
-        return wrapper
-
-    def networked_dataprovider(self, dp):
-        """
-        Dirty evil cludge so we can test networked sync...
-        """
-        factory = self.get_dataprovider_factory("NetworkServerFactory")
-        server = factory.share_dataprovider(dp)
-        assert server != None
-
-        conduit = Conduit.Conduit(self.sync_manager)
-        time.sleep(1)
-
-        factory = self.get_dataprovider_factory("NetworkClientFactory")
-        newdp = factory.dataprovider_create("http://localhost";, conduit.uid, server.get_info())
-        assert newdp != None
-        return self.wrap_dataprovider( newdp() )
-
-    def create_conduit(self):
-        return Conduit.Conduit(self.sync_manager)
-
-    def create_syncset(self):
-        return SyncSet.SyncSet(
-            moduleManager=self.modules,
-            syncManager=self.sync_manager
-        )
-
 
 #
 # Custom exceptions



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]