r7371 - in dumbhippo/trunk/server/src/com/dumbhippo: live server/impl



Author: otaylor
Date: 2008-03-19 12:47:40 -0500 (Wed, 19 Mar 2008)
New Revision: 7371

Modified:
   dumbhippo/trunk/server/src/com/dumbhippo/live/LiveQueueConsumerBean.java
   dumbhippo/trunk/server/src/com/dumbhippo/server/impl/AimQueueConsumerBean.java
Log:
Remove usage of MessageDriven beans:
 - Deal with problem with JBoss 4.0.5 and MDB's we don't understand
 - Allows us to do transaction retries within our framework


Modified: dumbhippo/trunk/server/src/com/dumbhippo/live/LiveQueueConsumerBean.java
===================================================================
--- dumbhippo/trunk/server/src/com/dumbhippo/live/LiveQueueConsumerBean.java	2008-03-18 22:45:19 UTC (rev 7370)
+++ dumbhippo/trunk/server/src/com/dumbhippo/live/LiveQueueConsumerBean.java	2008-03-19 17:47:40 UTC (rev 7371)
@@ -1,38 +1,53 @@
 package com.dumbhippo.live;
 
 import javax.annotation.Resource;
-import javax.ejb.ActivationConfigProperty;
-import javax.ejb.MessageDriven;
 import javax.ejb.SessionContext;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageListener;
 import javax.jms.ObjectMessage;
 
+import org.jboss.annotation.ejb.Service;
 import org.slf4j.Logger;
 
 import com.dumbhippo.GlobalSetup;
+import com.dumbhippo.Site;
 import com.dumbhippo.dm.ChangeNotificationSet;
+import com.dumbhippo.jms.JmsConnectionType;
+import com.dumbhippo.jms.JmsConsumer;
+import com.dumbhippo.jms.JmsShutdownException;
+import com.dumbhippo.server.SimpleServiceMBean;
 import com.dumbhippo.server.dm.DataService;
+import com.dumbhippo.server.views.AnonymousViewpoint;
+import com.dumbhippo.tx.RetryException;
+import com.dumbhippo.tx.TxRunnable;
+import com.dumbhippo.tx.TxUtils;
 
 //
 // Handles taking events queued via LiveState.queueUpdate and dispatching
 // them to the appropriate "processor bean"
 //
 
- MessageDriven(activationConfig =
-{
-  @ActivationConfigProperty(propertyName="destinationType",
-    propertyValue="javax.jms.Topic"),
-  @ActivationConfigProperty(propertyName="destination",
-    propertyValue=LiveEvent.TOPIC_NAME)
-})
-public class LiveQueueConsumerBean implements MessageListener {
+ Service
+public class LiveQueueConsumerBean implements SimpleServiceMBean {
 	
 	@Resource SessionContext context;
 
 	static private final Logger logger = GlobalSetup.getLogger(LiveQueueConsumerBean.class);
 	
+	private JmsConsumer consumer;
+	private Thread consumerThread;
+
+	public void start() {
+		consumer = new JmsConsumer(LiveEvent.TOPIC_NAME, JmsConnectionType.NONTRANSACTED_IN_SERVER);
+		consumerThread = new Thread(new LiveTopicConsumer(), "LiveTopicConsumer");
+		consumerThread.start(); 
+	}
+
+	public void stop() {
+		consumer.close(); // Will stop consumer thread as a side effect
+		consumer = null;
+	}
+
 	private void process(LiveEvent event, boolean isLocal) {
 		// To find the right "processor bean" for this event, we have the 
 		// processor beans register themselves in JDNI under their class name
@@ -52,34 +67,56 @@
 		LiveState.getInstance().invokeEventListeners(event);
 	}
 
-	public void onMessage(Message message) {
+	private void handleMessage(ObjectMessage message) {
+		Object obj;
+		String sourceAddress;
 		try {
-			// Message.toString() is kind of crap, so this isn't useful debug most of the time
-			//logger.debug("Got message from {}: {}", LiveEvent.QUEUE, message);
-			if (message instanceof ObjectMessage) {
-				ObjectMessage objectMessage = (ObjectMessage) message;
-				Object obj = objectMessage.getObject();
-				String sourceAddress = message.getStringProperty("sourceAddress");
-				String localAddress = System.getProperty("jboss.bind.address");
-				boolean isLocal = localAddress.equals(sourceAddress);
-				
-				logger.debug("Got object in " + LiveEvent.TOPIC_NAME + ": " + obj + " (isLocal=" + isLocal + ")");
-				
-				if (obj instanceof LiveEvent) {
-					process((LiveEvent) obj, isLocal);
-				} else if (obj instanceof ChangeNotificationSet) {
-					if (!isLocal)
-						DataService.getModel().notifyRemoteChange((ChangeNotificationSet)obj);
-				} else {
-					logger.warn("Got unknown object: " + obj);
+			obj = ((message).getObject());
+			sourceAddress = message.getStringProperty("sourceAddress");
+		} catch (JMSException e) {
+			logger.warn("Error retrieving object from queue.", e);
+			return;
+		}
+		
+		String localAddress = System.getProperty("jboss.bind.address");
+		boolean isLocal = localAddress.equals(sourceAddress);
+		
+		logger.debug("Got object in " + LiveEvent.TOPIC_NAME + ": " + obj + " (isLocal=" + isLocal + ")");
+		
+		if (obj instanceof LiveEvent) {
+			process((LiveEvent) obj, isLocal);
+		} else if (obj instanceof ChangeNotificationSet) {
+			if (!isLocal)
+				DataService.getModel().notifyRemoteChange((ChangeNotificationSet)obj);
+		} else {
+			logger.warn("Got unknown object: " + obj);
+		}
+	}
+	
+	private class LiveTopicConsumer implements Runnable {
+		public void run() {
+			while (true) {
+				try {
+					final Message message = consumer.receive();
+					if (!(message instanceof ObjectMessage)) {
+						logger.warn("Got unexpected type of message in queue.");
+						continue;
+					}
+					TxUtils.runInTransaction(new TxRunnable() {
+						public void run() throws RetryException {
+							// Any database work should have been done on the sending side before sending the
+							// message. Here we are just updating transient state and notifying.
+							DataService.getModel().initializeReadOnlySession(AnonymousViewpoint.getInstance(Site.NONE));
+							handleMessage((ObjectMessage)message);
+						}
+					});
+				} catch (JmsShutdownException e) {
+					logger.debug("Queue was shut down, exiting thread");
+					break;
+				} catch (RuntimeException e) {
+					logger.error("Unexpected error receiving live topic messages", e);
 				}
-			} else {
-				logger.warn("Got unknown JMS message: " + message);
 			}
-		} catch (JMSException e) {
-			logger.warn("JMS exception in live event queue onMessage", e);
-		} catch (Exception e) {
-			logger.warn("Exception processing JMS message", e);
 		}
 	}
 }

Modified: dumbhippo/trunk/server/src/com/dumbhippo/server/impl/AimQueueConsumerBean.java
===================================================================
--- dumbhippo/trunk/server/src/com/dumbhippo/server/impl/AimQueueConsumerBean.java	2008-03-18 22:45:19 UTC (rev 7370)
+++ dumbhippo/trunk/server/src/com/dumbhippo/server/impl/AimQueueConsumerBean.java	2008-03-19 17:47:40 UTC (rev 7371)
@@ -1,13 +1,11 @@
 package com.dumbhippo.server.impl;
 
-import javax.ejb.ActivationConfigProperty;
 import javax.ejb.EJB;
-import javax.ejb.MessageDriven;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageListener;
 import javax.jms.ObjectMessage;
 
+import org.jboss.annotation.ejb.Service;
 import org.slf4j.Logger;
 
 import com.dumbhippo.GlobalSetup;
@@ -16,7 +14,11 @@
 import com.dumbhippo.botcom.BotEvent;
 import com.dumbhippo.botcom.BotEventLogin;
 import com.dumbhippo.botcom.BotEventToken;
+import com.dumbhippo.botcom.BotTask;
 import com.dumbhippo.botcom.BotTaskMessage;
+import com.dumbhippo.jms.JmsConnectionType;
+import com.dumbhippo.jms.JmsConsumer;
+import com.dumbhippo.jms.JmsShutdownException;
 import com.dumbhippo.persistence.AimResource;
 import com.dumbhippo.persistence.ResourceClaimToken;
 import com.dumbhippo.persistence.Token;
@@ -26,6 +28,7 @@
 import com.dumbhippo.server.HumanVisibleException;
 import com.dumbhippo.server.IdentitySpider;
 import com.dumbhippo.server.SigninSystem;
+import com.dumbhippo.server.SimpleServiceMBean;
 import com.dumbhippo.server.TokenExpiredException;
 import com.dumbhippo.server.TokenSystem;
 import com.dumbhippo.server.TokenUnknownException;
@@ -33,16 +36,14 @@
 import com.dumbhippo.server.views.AnonymousViewpoint;
 import com.dumbhippo.server.views.SystemViewpoint;
 import com.dumbhippo.tx.RetryException;
+import com.dumbhippo.tx.TxRunnable;
+import com.dumbhippo.tx.TxUtils;
 
- MessageDriven(activationConfig =
- {
-   @ActivationConfigProperty(propertyName="destinationType",
-     propertyValue="javax.jms.Queue"),
-   @ActivationConfigProperty(propertyName="destination",
-     propertyValue=BotEvent.QUEUE_NAME)
-})
-public class AimQueueConsumerBean implements MessageListener {
+ Service
+public class AimQueueConsumerBean implements SimpleServiceMBean {
 	static private final Logger logger = GlobalSetup.getLogger(AimQueueConsumerBean.class);
+	private JmsConsumer consumer;
+	private Thread consumerThread;
 	
 	@EJB
 	private TokenSystem tokenSystem;
@@ -59,6 +60,18 @@
 	@EJB
 	private AimQueueSender aimQueueSender;
 	
+	public void start() {
+		consumer = new JmsConsumer(BotTask.QUEUE_NAME, JmsConnectionType.NONTRANSACTED_IN_SERVER);
+		consumerThread = new Thread(new AimQueueConsumer(), "AimQueueConsumer");
+		consumerThread.start(); 
+		
+	}
+
+	public void stop() {
+		consumer.close(); // Will stop consumer thread as a side effect
+		consumer = null;
+	}
+
 	private void sendHtmlReplyMessage(BotEvent event, String aimName, String htmlMessage) {
 		BotTaskMessage message = new BotTaskMessage(event.getBotName(), aimName, htmlMessage);
 		aimQueueSender.sendMessage(message);
@@ -120,35 +133,50 @@
 		}
 	}
 
-	public void onMessage(Message message) {
-		DataService.getModel().initializeReadWriteSession(AnonymousViewpoint.getInstance(Site.NONE));
+	private void handleMessage(ObjectMessage message) throws RetryException {
+		Object obj;
+		try {
+			obj = ((message).getObject());
+		} catch (JMSException e) {
+			logger.warn("Error retrieving object from queue.", e);
+			return;
+		}
 		
-		try {
-			if (message instanceof ObjectMessage) {
-				ObjectMessage objectMessage = (ObjectMessage) message;
-				Object obj = objectMessage.getObject();
-				
-				logger.debug("Got object in {}: {}", BotEvent.QUEUE_NAME, obj);
-					
-				if (obj instanceof BotEventToken) {
-					BotEventToken event = (BotEventToken) obj;
-					processTokenEvent(event);
-				} else if (obj instanceof BotEventLogin) {
-					BotEventLogin event = (BotEventLogin) obj;
-					processLoginEvent(event);
-				} else {
-					logger.warn("Got unknown object: " + obj);
+		logger.debug("Got object in {}: {}", BotEvent.QUEUE_NAME, obj);
+			
+		if (obj instanceof BotEventToken) {
+			BotEventToken event = (BotEventToken) obj;
+			processTokenEvent(event);
+		} else if (obj instanceof BotEventLogin) {
+			BotEventLogin event = (BotEventLogin) obj;
+			processLoginEvent(event);
+		} else {
+			logger.warn("Got unknown object: " + obj);
+		}
+	}
+	
+	private class AimQueueConsumer implements Runnable {
+		public void run() {
+			while (true) {
+				try {
+					final Message message = consumer.receive();
+					if (!(message instanceof ObjectMessage)) {
+						logger.warn("Got unexpected type of message in queue.");
+						continue;
+					}
+					TxUtils.runInTransaction(new TxRunnable() {
+						public void run() throws RetryException {
+							DataService.getModel().initializeReadWriteSession(AnonymousViewpoint.getInstance(Site.NONE));
+							handleMessage((ObjectMessage)message);
+						}
+					});
+				} catch (JmsShutdownException e) {
+					logger.debug("Queue was shut down, exiting thread");
+					break;
+				} catch (RuntimeException e) {
+					logger.error("Unexpected error receiving AIM queue messages", e);
 				}
-			} else {
-				logger.warn("Got unknown jms message: {}", message);
 			}
-		} catch (RetryException e) {
-			// We hope that the messaging system will redeliver the message and provide
-			// the retry. We can't do the retry ourself since we are already in a 
-			// transaction
-			throw new RuntimeException(e);
-		} catch (JMSException e) {
-			logger.warn("JMS exception in bot event queue", e);
 		}
 	}
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]