r7299 - in dumbhippo/trunk: openfire/src/plugins/hippo/src/java/com/dumbhippo/jive server/src/com/dumbhippo/dm



Author: otaylor
Date: 2008-02-08 14:11:00 -0600 (Fri, 08 Feb 2008)
New Revision: 7299

Added:
   dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/VoidIQMethod.java
Removed:
   dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/UpdateIQMethod.java
Modified:
   dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/AnnotatedIQMethod.java
   dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/GenericIQMethod.java
   dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/MultiQueryIQMethod.java
   dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/QueryIQMethod.java
   dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/SingleQueryIQMethod.java
   dumbhippo/trunk/server/src/com/dumbhippo/dm/ChangeNotificationSet.java
   dumbhippo/trunk/server/src/com/dumbhippo/dm/DataModel.java
   dumbhippo/trunk/server/src/com/dumbhippo/dm/ReadWriteSession.java
Log:
QueryIQMethod SingleQueryIQMethod GenericIQMethod AnnotatedIQMethod MultiIQMethod:
 - Allow update IQ methods to have results
 - Order things so that change notifications resulting from an update are sent *before*
   the response to the IQ method

VoidIQMethod: Rename UpdateIQMethod since updates can now return results.

DataModel ChangeNotificationSet ReadWriteSession: Make provisions for controlling
  when change notifications are generated.


Modified: dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/AnnotatedIQMethod.java
===================================================================
--- dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/AnnotatedIQMethod.java	2008-02-07 23:45:05 UTC (rev 7298)
+++ dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/AnnotatedIQMethod.java	2008-02-08 20:11:00 UTC (rev 7299)
@@ -11,9 +11,11 @@
 import org.xmpp.packet.JID;
 
 import com.dumbhippo.Site;
+import com.dumbhippo.dm.ChangeNotificationSet;
 import com.dumbhippo.dm.DMObject;
 import com.dumbhippo.dm.DMSession;
 import com.dumbhippo.dm.DataModel;
+import com.dumbhippo.dm.ReadWriteSession;
 import com.dumbhippo.identity20.Guid;
 import com.dumbhippo.identity20.Guid.ParseException;
 import com.dumbhippo.jive.annotations.IQMethod;
@@ -40,8 +42,27 @@
 	public String getName() {
 		return annotation.name();
 	}
+
+	// We split IQ processing into two phases
+	//
+	//  - Phase1 makes updates and computes what data model objects to return
+	//  - Phase2 fetches data to return to the user
+	//
+	// In the case of an update (set) method, we run the phases in separate
+	// transactions, and we generate any change notifications from the update
+	// before phase2.
+	//
+	// The downside of generating the change notifications synchronously
+	// is that we are generating them for *all* users on the local server, 
+	// not just the notification we need (for this user). With a bit more
+	// coding we could split things so that we do the resolution of the
+	// change set for all users synchronously, but only do the notification
+	// fetch synchronously for *this* user and do the notifications for other
+	// users asynchronously. The speed of returning from the IQ may determine 
+	// the quality of the user interaction on the client.
 	
-	public abstract void doIQ(UserViewpoint viewpoint, IQ request, IQ reply) throws IQException, RetryException;
+	public abstract Object doIQPhase1(UserViewpoint viewpoint, IQ request, IQ reply) throws IQException, RetryException;
+	public abstract void doIQPhase2(UserViewpoint viewpoint, IQ request, IQ reply, Object resultObject) throws IQException, RetryException;
 	
 	protected Object invokeMethod(Object... params) throws IQException, RetryException {
 		try {
@@ -75,46 +96,111 @@
 		Log.debug("handling IQ packet " + request);
 
 		try {
+			final DataModel model = DataService.getModel();
 			final XmppClient client = XmppClientManager.getInstance().getClient(request.getFrom());
-			long serial = client.getStoreClient().allocateSerial();
+			boolean readWrite = (annotation.type() == IQ.Type.set || annotation.forceReadWrite());
+			final IQ reply = IQ.createResultIQ(request);
 			boolean success = false;
 			
-			try {
-				final IQ reply = IQ.createResultIQ(request);
+			if (readWrite && annotation.needsTransaction()) {
+				final ChangeNotificationSet[] notifications = new ChangeNotificationSet[1];
 				
-				if (annotation.needsTransaction()) {
-					TxUtils.runInTransaction(new Callable<Boolean>() {
-						public Boolean call() throws IQException, RetryException {
-							DataModel model = DataService.getModel();
-							DMSession session;
+				final Object resultObject = TxUtils.runInTransaction(new Callable<Object>() {
+					public Object call() throws IQException, RetryException {
+						ReadWriteSession session = model.initializeReadWriteSession(client);
+						
+						// We want to order things so that the notifications get sent
+						// back *before* our response to the update IQ
+						notifications[0] = session.getNotifications();
+						notifications[0].setAutoNotify(false);
+
+						// Passing in the request to "phase 1" is for GenericIQMethod
+						// wherethe IQ might return "plain old data", not fetched
+						// from the data model
+						return doIQPhase1((UserViewpoint)session.getViewpoint(), request, reply);
+					}
+				});
+				
+				model.sendNotifications(notifications[0]);
+				
+				long serial = client.getStoreClient().allocateSerial();
+				
+				try {
+					if (needsFetchTransaction()) {
+						TxUtils.runInTransaction(new Callable<Boolean>() {
+							public Boolean call() throws IQException, RetryException {
+								DataModel model = DataService.getModel();
+								DMSession session = model.initializeReadOnlySession(client);
+								doIQPhase2((UserViewpoint)session.getViewpoint(), request, reply, resultObject);
+								return true;
+							}
+						});
+					}
+					
+					success = true;
+				} finally {
+					if (success)
+						client.queuePacket(reply, serial);
+					else
+						client.nullNotification(serial);
+				}
+				
+			} else {
+				long serial = client.getStoreClient().allocateSerial();
+				
+				try {
+					if (annotation.needsTransaction()) {
+						TxUtils.runInTransaction(new Callable<Boolean>() {
+							public Boolean call() throws IQException, RetryException {
+								DMSession session;
+									
+								session = model.initializeReadOnlySession(client);
 								
-							if (annotation.type() == IQ.Type.get && !annotation.forceReadWrite())
-								session = model.initializeReadOnlySession(client);
-							else
-								session = model.initializeReadWriteSession(client);
-							
-							doIQ((UserViewpoint)session.getViewpoint(), request, reply);
-							return true;
-						}
-					});
-				} else {
-					doIQ(new UserViewpoint(getUserId(request), Site.XMPP), request, reply);
+								Object resultObject = doIQPhase1((UserViewpoint)session.getViewpoint(), request, reply);
+								doIQPhase2((UserViewpoint)session.getViewpoint(), request, reply, resultObject);
+								
+								return true;
+							}
+						});
+					} else {
+						UserViewpoint viewpoint = new UserViewpoint(getUserId(request), Site.XMPP);
+						Object resultObject = doIQPhase1(viewpoint, request, reply);
+						doIQPhase2(viewpoint, request, reply, resultObject);
+					}
+					
+					success = true;
+				} finally {
+					if (success)
+						client.queuePacket(reply, serial);
+					else
+						client.nullNotification(serial);
 				}
-	
-				client.queuePacket(reply, serial);
-				success = true;
-			} finally {
-				if (!success)
-					client.nullNotification(serial);
 			}
-				
 		} catch (IQException e) {
 			throw e;
 		} catch (Exception e) {
 			throw new RuntimeException("Unexpected exception running IQ method in transaction", e);
 		}
 	}
-	
+
+	/**
+	 * We don't want to fetch data from the data model within a ReadWrite transaction
+	 * because:
+	 * 
+	 *  - Caching is disabled within a ReadWrite transaction
+	 *  - If the transaction is rolled back on commit, then we'll have an incorrect
+	 *    view of what data the client has received
+	 *    
+	 * So in order to return a result from an IQ method, we first do the update in
+	 * a read-write transaction, and then use a separate read-only transaction to
+	 * fetch the requested data.
+	 * 
+	 * @return whether there should be separate transactions for update and fetch 
+	 */
+	protected boolean needsFetchTransaction() {
+		return false;
+	}
+
 	public static AnnotatedIQMethod getForMethod(AnnotatedIQHandler handler, Method method) {
 		IQMethod annotation = method.getAnnotation(IQMethod.class);
 		if (annotation == null)
@@ -134,18 +220,12 @@
 			       parameterTypes[2].equals(IQ.class)) {
 			return new GenericIQMethod(handler, method, annotation, true);
 		} else if (DMObject.class.isAssignableFrom(method.getReturnType())) {
-			if (annotation.type() != IQ.Type.get)
-				throw new RuntimeException("Update IQ methods cannot have a return");
-			
 			return new SingleQueryIQMethod(handler, method, annotation);
 		} else if (Collection.class.isAssignableFrom(method.getReturnType())) {
-			if (annotation.type() != IQ.Type.get)
-				throw new RuntimeException("Update IQ methods cannot have a return");
-			
 			return MultiQueryIQMethod.getForMethod(handler, method, annotation);
 		} else if (method.getReturnType().equals(void.class) &&
 				   annotation.type() == IQ.Type.set) {
-			return new UpdateIQMethod(handler, method, annotation);
+			return new VoidIQMethod(handler, method, annotation);
 		} else {
 			throw new RuntimeException(method + ": Unexpected signature for IQ handler method");
 		}

Modified: dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/GenericIQMethod.java
===================================================================
--- dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/GenericIQMethod.java	2008-02-07 23:45:05 UTC (rev 7298)
+++ dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/GenericIQMethod.java	2008-02-08 20:11:00 UTC (rev 7299)
@@ -18,10 +18,16 @@
 	}
 
 	@Override
-	public void doIQ(UserViewpoint viewpoint, IQ request, IQ reply) throws IQException, RetryException {
+	public Object doIQPhase1(UserViewpoint viewpoint, IQ request, IQ reply) throws IQException, RetryException {
 		if (needsViewpoint)
 			invokeMethod(viewpoint, request, reply);
 		else
 			invokeMethod(request, reply);
+		
+		return null;
 	}
+	
+	@Override
+	public void doIQPhase2(UserViewpoint viewpoint, IQ request, IQ reply, Object resultObject) throws IQException, RetryException {
+	}
 }

Modified: dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/MultiQueryIQMethod.java
===================================================================
--- dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/MultiQueryIQMethod.java	2008-02-07 23:45:05 UTC (rev 7298)
+++ dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/MultiQueryIQMethod.java	2008-02-08 20:11:00 UTC (rev 7299)
@@ -34,10 +34,16 @@
 		this.classHolder = classHolder;
 	}
 
+	@SuppressWarnings("unchecked")
+	private T reattachInSession(T object, DMSession session) {
+		return (T)session.findUnchecked(object.getStoreKey());
+		
+	}
+	
 	@Override
-	public void doIQ(UserViewpoint viewpoint, IQ request, IQ reply) throws IQException, RetryException {
+	public void doIQPhase2(UserViewpoint viewpoint, IQ request, IQ reply, Object resultObject) throws IQException, RetryException {
 		@SuppressWarnings("unchecked")
-		Collection<? extends T> resultObjects = (Collection)invokeMethod(getParams(viewpoint, request));
+		Collection<? extends T> resultObjects = (Collection)resultObject;
 		
 		DMSession session = DataService.currentSessionRO();
 		Element root = reply.setChildElement(annotation.name(), handler.getInfo().getNamespace());
@@ -57,6 +63,9 @@
 		XmppFetchVisitor visitor = new XmppFetchVisitor(root, session.getModel());
 		
 		for (T object : resultObjects) {
+			if (annotation.type() == IQ.Type.set)
+				object = reattachInSession(object, session);
+			
 			session.visitFetch(object, fetch, visitor);
 		}
 		

Modified: dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/QueryIQMethod.java
===================================================================
--- dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/QueryIQMethod.java	2008-02-07 23:45:05 UTC (rev 7298)
+++ dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/QueryIQMethod.java	2008-02-08 20:11:00 UTC (rev 7299)
@@ -22,6 +22,7 @@
 import com.dumbhippo.server.dm.DataService;
 import com.dumbhippo.server.views.UserViewpoint;
 import com.dumbhippo.server.views.Viewpoint;
+import com.dumbhippo.tx.RetryException;
 
 public abstract class QueryIQMethod extends AnnotatedIQMethod {
 	static final QName FETCH_QNAME = QName.get("fetch", Namespace.get("http://mugshot.org/p/system";));
@@ -89,6 +90,15 @@
 		}
 	}
 	
+	/* An update method requires one transaction to update the data in the database,
+	 * and a second transaction to fetch the data. (We'll override this in a 
+	 * VoidIQMethod to avoid the unnecessary second transaction.)  
+	 */
+	@Override
+	public boolean needsFetchTransaction() {
+		return annotation.type() == IQ.Type.set;
+	}
+	
 	public FetchNode getFetchNode(IQ request) throws IQException {
 		String fetchString = request.getChildElement().attributeValue(FETCH_QNAME, "+");
 
@@ -118,6 +128,11 @@
 		return params;
 	}
 	
+	@Override
+	public Object doIQPhase1(UserViewpoint viewpoint, IQ request, IQ reply) throws IQException, RetryException {
+		return invokeMethod(getParams(viewpoint, request));
+	}
+	
 	private static abstract class ParamInfo {
 		protected String name;
 		protected boolean optional; 

Modified: dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/SingleQueryIQMethod.java
===================================================================
--- dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/SingleQueryIQMethod.java	2008-02-07 23:45:05 UTC (rev 7298)
+++ dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/SingleQueryIQMethod.java	2008-02-08 20:11:00 UTC (rev 7299)
@@ -29,18 +29,20 @@
 	}
 	
 	@Override
-	public void doIQ(UserViewpoint viewpoint, IQ request, IQ reply) throws IQException, RetryException {
+	public void doIQPhase2(UserViewpoint viewpoint, IQ request, IQ reply, Object resultObject) throws IQException, RetryException {
 		DMSession session = DataService.currentSessionRO();
 		Element root = reply.setChildElement(annotation.name(), handler.getInfo().getNamespace());
 		
 		FetchNode fetchNode = getFetchNode(request);
 		
-		DMObject<?> resultObject = (DMObject<?>)invokeMethod(getParams(viewpoint, request));
+		DMObject<?> resultDMObject;
 		
-		resultObject.getClassHolder();
+		resultDMObject = (DMObject<?>)resultObject;
+		if (annotation.type() == IQ.Type.set)
+			resultDMObject = session.findUnchecked(resultDMObject.getStoreKey());
 		
 		XmppFetchVisitor visitor = new XmppFetchVisitor(root, session.getModel());
-		fetchAndVisit(session, resultObject, fetchNode, visitor);
+		fetchAndVisit(session, resultDMObject, fetchNode, visitor);
 		visitor.finish();
 	}
 }

Deleted: dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/UpdateIQMethod.java
===================================================================
--- dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/UpdateIQMethod.java	2008-02-07 23:45:05 UTC (rev 7298)
+++ dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/UpdateIQMethod.java	2008-02-08 20:11:00 UTC (rev 7299)
@@ -1,23 +0,0 @@
-package com.dumbhippo.jive;
-
-import java.lang.reflect.Method;
-
-import org.xmpp.packet.IQ;
-
-import com.dumbhippo.jive.annotations.IQMethod;
-import com.dumbhippo.server.views.UserViewpoint;
-import com.dumbhippo.tx.RetryException;
-
-/**
- * Used for data-model update IQ methods (which always have no return).
- */
-public class UpdateIQMethod extends QueryIQMethod {
-	public UpdateIQMethod(AnnotatedIQHandler handler, Method method, IQMethod annotation) {
-		super(handler, method, annotation);
-	}
-	
-	@Override
-	public void doIQ(UserViewpoint viewpoint, IQ request, IQ reply) throws IQException, RetryException {
-		invokeMethod(getParams(viewpoint, request));
-	}
-}

Copied: dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/VoidIQMethod.java (from rev 7286, dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/UpdateIQMethod.java)
===================================================================
--- dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/UpdateIQMethod.java	2008-02-05 20:29:29 UTC (rev 7286)
+++ dumbhippo/trunk/openfire/src/plugins/hippo/src/java/com/dumbhippo/jive/VoidIQMethod.java	2008-02-08 20:11:00 UTC (rev 7299)
@@ -0,0 +1,28 @@
+package com.dumbhippo.jive;
+
+import java.lang.reflect.Method;
+
+import org.xmpp.packet.IQ;
+
+import com.dumbhippo.jive.annotations.IQMethod;
+import com.dumbhippo.server.views.UserViewpoint;
+import com.dumbhippo.tx.RetryException;
+
+/**
+ * Used for data-model update IQ methods with no return. (Query methods always have
+ * a return.)
+ */
+public class VoidIQMethod extends QueryIQMethod {
+	public VoidIQMethod(AnnotatedIQHandler handler, Method method, IQMethod annotation) {
+		super(handler, method, annotation);
+	}
+
+	@Override
+	public boolean needsFetchTransaction() {
+		return false;
+	}
+	
+	@Override
+	public void doIQPhase2(UserViewpoint viewpoint, IQ request, IQ reply, Object resultObject) throws IQException, RetryException {
+	}
+}

Modified: dumbhippo/trunk/server/src/com/dumbhippo/dm/ChangeNotificationSet.java
===================================================================
--- dumbhippo/trunk/server/src/com/dumbhippo/dm/ChangeNotificationSet.java	2008-02-07 23:45:05 UTC (rev 7298)
+++ dumbhippo/trunk/server/src/com/dumbhippo/dm/ChangeNotificationSet.java	2008-02-08 20:11:00 UTC (rev 7299)
@@ -28,6 +28,7 @@
 	private Map<ChangeNotification<?,?>, ChangeNotification<?,?>> notifications;
 	private List<ChangeNotification<?,?>> matchedNotifications;
 	private long timestamp;
+	private boolean autoNotify = true;
 	
 	public ChangeNotificationSet(DataModel model) {
 	}
@@ -116,6 +117,25 @@
 		return notifications == null && matchedNotifications == null;
 	}
 
+	public boolean getAutoNotify() {
+		return autoNotify;
+	}
+
+	/**
+	 * Normally notifications are sent out to local clients asynchronously
+	 * at some point after the transaction commits. If you set autoNotify
+	 * to false, then this doesn't happen and you must call DataModel.sendNotifications()
+	 * manually to trigger the local notifications. (invalidations and notifications 
+	 * on other cluster nodes will happen as per normal.) This is useful in the
+	 * case where you want to control the order that notifications are sent out
+	 * with respect to some other event. 
+	 * 
+	 * @param autoNotify whether to automatically send out local notifications
+	 */
+	public void setAutoNotify(boolean autoNotify) {
+		this.autoNotify = autoNotify;
+	}
+	
 	@Override
 	public String toString() {
 		StringBuilder sb = new StringBuilder();

Modified: dumbhippo/trunk/server/src/com/dumbhippo/dm/DataModel.java
===================================================================
--- dumbhippo/trunk/server/src/com/dumbhippo/dm/DataModel.java	2008-02-07 23:45:05 UTC (rev 7298)
+++ dumbhippo/trunk/server/src/com/dumbhippo/dm/DataModel.java	2008-02-08 20:11:00 UTC (rev 7299)
@@ -255,7 +255,15 @@
 		return (ChangeNotification<K, T>) classHolder.makeChangeNotification(key, matcher);
 	}
 
-	private void sendNotifications(ChangeNotificationSet notifications) {
+	/**
+	 * Send out local notifications resulting from a ReadWriteSession. Normally this 
+	 * happens automatically and you must not call this function automatically, but
+	 * the automatic sending can be disabled with { link ChangeNotificationSet#setAutoNotify(boolean)}
+	 * if you want explicit control over ordering.	
+	 * 
+	 * @param notifications the change notification set created in a ReadWriteSession
+	 */
+	public void sendNotifications(ChangeNotificationSet notifications) {
 		logger.debug("Sending notifications for {}", notifications);
 		ClientNotificationSet clientNotifications = notifications.resolveNotifications(this);
 		
@@ -281,11 +289,13 @@
 			
 			notifications.doInvalidations(this);
 			
-			notificationExecutor.execute(new Runnable() {
-				public void run() {
-					sendNotifications(notifications);
-				}
-			});
+			if (notifications.getAutoNotify()) {
+				notificationExecutor.execute(new Runnable() {
+					public void run() {
+						sendNotifications(notifications);
+					}
+				});
+			}
 		} catch (Exception e) {
 			// Since we are running afterCompletion, exceptions get swallowed
 			// so trap and log here

Modified: dumbhippo/trunk/server/src/com/dumbhippo/dm/ReadWriteSession.java
===================================================================
--- dumbhippo/trunk/server/src/com/dumbhippo/dm/ReadWriteSession.java	2008-02-07 23:45:05 UTC (rev 7298)
+++ dumbhippo/trunk/server/src/com/dumbhippo/dm/ReadWriteSession.java	2008-02-08 20:11:00 UTC (rev 7299)
@@ -119,6 +119,18 @@
 		notificationSet.removed(model, clazz, key);
 	}
 
+	/**
+	 * Get the change notification set that holds information about notifications
+	 * that need to be sent out after this transaction commits. See the docs for
+	 * { link ChangeNotificationSet#setAutoNotify(boolean)} for more information
+	 * about manually controlling when notifications are sent out.
+	 * 
+	 * @return the change notification set for the session.
+	 */
+	public ChangeNotificationSet getNotifications() {
+		return notificationSet;
+	}
+
 	@Override
 	public void afterCompletion(int status) {
 		if (status == Status.STATUS_COMMITTED)



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]