conduit r1211 - in trunk: . conduit/modules/NetworkModule test/python-tests
- From: jstowers svn gnome org
- To: svn-commits-list gnome org
- Subject: conduit r1211 - in trunk: . conduit/modules/NetworkModule test/python-tests
- Date: Mon, 14 Jan 2008 06:13:06 +0000 (GMT)
Author: jstowers
Date: Mon Jan 14 06:13:06 2008
New Revision: 1211
URL: http://svn.gnome.org/viewvc/conduit?rev=1211&view=rev
Log:
"2008-01-14 John Stowers <john stowers gmail com>
* conduit/modules/NetworkModule/Server.py: Return the server from
share_dataprovider so the tests can use it
* test/python-tests/common.py: Add get_dataprovider_factory function and
update networked_dataprovider to the new API
* test/python-tests/TestDataProviderNetwork.py: Add simple test"
Added:
trunk/test/python-tests/TestDataProviderNetwork.py
Modified:
trunk/ChangeLog
trunk/conduit/modules/NetworkModule/Server.py
trunk/test/python-tests/common.py
Modified: trunk/conduit/modules/NetworkModule/Server.py
==============================================================================
--- trunk/conduit/modules/NetworkModule/Server.py (original)
+++ trunk/conduit/modules/NetworkModule/Server.py Mon Jan 14 06:13:06 2008
@@ -118,6 +118,7 @@
server.start()
self.shared[dpw.get_UID()] = server
self.DP_PORT += 1
+ return server
def unshare_dataprovider(self, dpw):
"""
Added: trunk/test/python-tests/TestDataProviderNetwork.py
==============================================================================
--- (empty file)
+++ trunk/test/python-tests/TestDataProviderNetwork.py Mon Jan 14 06:13:06 2008
@@ -0,0 +1,25 @@
+#common sets up the conduit environment
+from common import *
+
+test = SimpleSyncTest()
+
+#Source
+source = test.get_dataprovider("TestTwoWay")
+netsource = test.networked_dataprovider(source)
+
+#Sink
+sink = test.get_dataprovider("TestTwoWay")
+netsink = test.networked_dataprovider(sink)
+
+#Sync
+test.prepare(
+ netsource,
+ sink
+ )
+test.set_two_way_sync(True)
+test.sync(debug=False)
+aborted = test.sync_aborted()
+
+ok("Sync completed", aborted == False)
+
+finished()
Modified: trunk/test/python-tests/common.py
==============================================================================
--- trunk/test/python-tests/common.py (original)
+++ trunk/test/python-tests/common.py Mon Jan 14 06:13:06 2008
@@ -262,9 +262,17 @@
if dp.classname == name:
wrapper = self.model.get_new_module_instance(dp.get_key())
- ok("Find wrapper '%s'" % name, wrapper != None, die)
+ ok("Find DataProviderWrapper '%s'" % name, wrapper != None, die)
return wrapper
+ def get_dataprovider_factory(self, className, die=True):
+ factory = None
+ for f in self.model.dataproviderFactories:
+ if f.__class__.__name__ == className:
+ factory = f
+ ok("Find DataProviderFactory '%s'" % className, factory != None, die)
+ return factory
+
def wrap_dataprovider(self, dp):
wrapper = ModuleWrapper.ModuleWrapper (
getattr(dp, "_name_", ""),
@@ -286,29 +294,24 @@
"""
Dirty evil cludge so we can test networked sync...
"""
- found = False
+ factory = self.get_dataprovider_factory("NetworkServerFactory")
+ try:
+ server = factory.share_dataprovider(dp)
+ ok("Created new DataProviderServer", server != None)
+ except:
+ ok("Created new DataProviderServer", False)
+
conduit = Conduit.Conduit(self.sync_manager)
-
- #fixme: need cleaner way to get networked factory..
- for i in range(0, len(self.model.dataproviderFactories)):
- factory = self.model.dataproviderFactories[i]
- if str(factory).find("NetworkServerFactory") != -1:
- found = True
- factory.share_dataprovider(dp)
- break
-
- if found == False:
- return None
-
time.sleep(1)
- for i in range(0, len(self.model.dataproviderFactories)):
- factory = self.model.dataproviderFactories[i]
- if str(factory).find("NetworkClientFactory") != -1:
- newdp = factory.dataprovider_create("http://localhost:3400/", conduit.uid, None)
- ok("Created new DataProviderClient", newdp != None)
- return self.wrap_dataprovider( newdp() )
+ factory = self.get_dataprovider_factory("NetworkClientFactory")
+ try:
+ newdp = factory.dataprovider_create("http://localhost", conduit.uid, server.get_info())
+ ok("Created new DataProviderClient", newdp != None)
+ return self.wrap_dataprovider( newdp() )
+ except:
+ ok("Created new DataProviderClient", False)
def configure(self, source={}, sink={}):
if len(source) > 0:
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]