r7371 - in dumbhippo/trunk/server/src/com/dumbhippo: live server/impl
- From: commits mugshot org
- To: online-desktop-list gnome org
- Subject: r7371 - in dumbhippo/trunk/server/src/com/dumbhippo: live server/impl
- Date: Wed, 19 Mar 2008 12:47:40 -0500 (CDT)
Author: otaylor
Date: 2008-03-19 12:47:40 -0500 (Wed, 19 Mar 2008)
New Revision: 7371
Modified:
dumbhippo/trunk/server/src/com/dumbhippo/live/LiveQueueConsumerBean.java
dumbhippo/trunk/server/src/com/dumbhippo/server/impl/AimQueueConsumerBean.java
Log:
Remove usage of MessageDriven beans:
- Deal with problem with JBoss 4.0.5 and MDB's we don't understand
- Allows us to do transaction retries within our framework
Modified: dumbhippo/trunk/server/src/com/dumbhippo/live/LiveQueueConsumerBean.java
===================================================================
--- dumbhippo/trunk/server/src/com/dumbhippo/live/LiveQueueConsumerBean.java 2008-03-18 22:45:19 UTC (rev 7370)
+++ dumbhippo/trunk/server/src/com/dumbhippo/live/LiveQueueConsumerBean.java 2008-03-19 17:47:40 UTC (rev 7371)
@@ -1,38 +1,53 @@
package com.dumbhippo.live;
import javax.annotation.Resource;
-import javax.ejb.ActivationConfigProperty;
-import javax.ejb.MessageDriven;
import javax.ejb.SessionContext;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
+import org.jboss.annotation.ejb.Service;
import org.slf4j.Logger;
import com.dumbhippo.GlobalSetup;
+import com.dumbhippo.Site;
import com.dumbhippo.dm.ChangeNotificationSet;
+import com.dumbhippo.jms.JmsConnectionType;
+import com.dumbhippo.jms.JmsConsumer;
+import com.dumbhippo.jms.JmsShutdownException;
+import com.dumbhippo.server.SimpleServiceMBean;
import com.dumbhippo.server.dm.DataService;
+import com.dumbhippo.server.views.AnonymousViewpoint;
+import com.dumbhippo.tx.RetryException;
+import com.dumbhippo.tx.TxRunnable;
+import com.dumbhippo.tx.TxUtils;
//
// Handles taking events queued via LiveState.queueUpdate and dispatching
// them to the appropriate "processor bean"
//
- MessageDriven(activationConfig =
-{
- @ActivationConfigProperty(propertyName="destinationType",
- propertyValue="javax.jms.Topic"),
- @ActivationConfigProperty(propertyName="destination",
- propertyValue=LiveEvent.TOPIC_NAME)
-})
-public class LiveQueueConsumerBean implements MessageListener {
+ Service
+public class LiveQueueConsumerBean implements SimpleServiceMBean {
@Resource SessionContext context;
static private final Logger logger = GlobalSetup.getLogger(LiveQueueConsumerBean.class);
+ private JmsConsumer consumer;
+ private Thread consumerThread;
+
+ public void start() {
+ consumer = new JmsConsumer(LiveEvent.TOPIC_NAME, JmsConnectionType.NONTRANSACTED_IN_SERVER);
+ consumerThread = new Thread(new LiveTopicConsumer(), "LiveTopicConsumer");
+ consumerThread.start();
+ }
+
+ public void stop() {
+ consumer.close(); // Will stop consumer thread as a side effect
+ consumer = null;
+ }
+
private void process(LiveEvent event, boolean isLocal) {
// To find the right "processor bean" for this event, we have the
// processor beans register themselves in JDNI under their class name
@@ -52,34 +67,56 @@
LiveState.getInstance().invokeEventListeners(event);
}
- public void onMessage(Message message) {
+ private void handleMessage(ObjectMessage message) {
+ Object obj;
+ String sourceAddress;
try {
- // Message.toString() is kind of crap, so this isn't useful debug most of the time
- //logger.debug("Got message from {}: {}", LiveEvent.QUEUE, message);
- if (message instanceof ObjectMessage) {
- ObjectMessage objectMessage = (ObjectMessage) message;
- Object obj = objectMessage.getObject();
- String sourceAddress = message.getStringProperty("sourceAddress");
- String localAddress = System.getProperty("jboss.bind.address");
- boolean isLocal = localAddress.equals(sourceAddress);
-
- logger.debug("Got object in " + LiveEvent.TOPIC_NAME + ": " + obj + " (isLocal=" + isLocal + ")");
-
- if (obj instanceof LiveEvent) {
- process((LiveEvent) obj, isLocal);
- } else if (obj instanceof ChangeNotificationSet) {
- if (!isLocal)
- DataService.getModel().notifyRemoteChange((ChangeNotificationSet)obj);
- } else {
- logger.warn("Got unknown object: " + obj);
+ obj = ((message).getObject());
+ sourceAddress = message.getStringProperty("sourceAddress");
+ } catch (JMSException e) {
+ logger.warn("Error retrieving object from queue.", e);
+ return;
+ }
+
+ String localAddress = System.getProperty("jboss.bind.address");
+ boolean isLocal = localAddress.equals(sourceAddress);
+
+ logger.debug("Got object in " + LiveEvent.TOPIC_NAME + ": " + obj + " (isLocal=" + isLocal + ")");
+
+ if (obj instanceof LiveEvent) {
+ process((LiveEvent) obj, isLocal);
+ } else if (obj instanceof ChangeNotificationSet) {
+ if (!isLocal)
+ DataService.getModel().notifyRemoteChange((ChangeNotificationSet)obj);
+ } else {
+ logger.warn("Got unknown object: " + obj);
+ }
+ }
+
+ private class LiveTopicConsumer implements Runnable {
+ public void run() {
+ while (true) {
+ try {
+ final Message message = consumer.receive();
+ if (!(message instanceof ObjectMessage)) {
+ logger.warn("Got unexpected type of message in queue.");
+ continue;
+ }
+ TxUtils.runInTransaction(new TxRunnable() {
+ public void run() throws RetryException {
+ // Any database work should have been done on the sending side before sending the
+ // message. Here we are just updating transient state and notifying.
+ DataService.getModel().initializeReadOnlySession(AnonymousViewpoint.getInstance(Site.NONE));
+ handleMessage((ObjectMessage)message);
+ }
+ });
+ } catch (JmsShutdownException e) {
+ logger.debug("Queue was shut down, exiting thread");
+ break;
+ } catch (RuntimeException e) {
+ logger.error("Unexpected error receiving live topic messages", e);
}
- } else {
- logger.warn("Got unknown JMS message: " + message);
}
- } catch (JMSException e) {
- logger.warn("JMS exception in live event queue onMessage", e);
- } catch (Exception e) {
- logger.warn("Exception processing JMS message", e);
}
}
}
Modified: dumbhippo/trunk/server/src/com/dumbhippo/server/impl/AimQueueConsumerBean.java
===================================================================
--- dumbhippo/trunk/server/src/com/dumbhippo/server/impl/AimQueueConsumerBean.java 2008-03-18 22:45:19 UTC (rev 7370)
+++ dumbhippo/trunk/server/src/com/dumbhippo/server/impl/AimQueueConsumerBean.java 2008-03-19 17:47:40 UTC (rev 7371)
@@ -1,13 +1,11 @@
package com.dumbhippo.server.impl;
-import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
-import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
+import org.jboss.annotation.ejb.Service;
import org.slf4j.Logger;
import com.dumbhippo.GlobalSetup;
@@ -16,7 +14,11 @@
import com.dumbhippo.botcom.BotEvent;
import com.dumbhippo.botcom.BotEventLogin;
import com.dumbhippo.botcom.BotEventToken;
+import com.dumbhippo.botcom.BotTask;
import com.dumbhippo.botcom.BotTaskMessage;
+import com.dumbhippo.jms.JmsConnectionType;
+import com.dumbhippo.jms.JmsConsumer;
+import com.dumbhippo.jms.JmsShutdownException;
import com.dumbhippo.persistence.AimResource;
import com.dumbhippo.persistence.ResourceClaimToken;
import com.dumbhippo.persistence.Token;
@@ -26,6 +28,7 @@
import com.dumbhippo.server.HumanVisibleException;
import com.dumbhippo.server.IdentitySpider;
import com.dumbhippo.server.SigninSystem;
+import com.dumbhippo.server.SimpleServiceMBean;
import com.dumbhippo.server.TokenExpiredException;
import com.dumbhippo.server.TokenSystem;
import com.dumbhippo.server.TokenUnknownException;
@@ -33,16 +36,14 @@
import com.dumbhippo.server.views.AnonymousViewpoint;
import com.dumbhippo.server.views.SystemViewpoint;
import com.dumbhippo.tx.RetryException;
+import com.dumbhippo.tx.TxRunnable;
+import com.dumbhippo.tx.TxUtils;
- MessageDriven(activationConfig =
- {
- @ActivationConfigProperty(propertyName="destinationType",
- propertyValue="javax.jms.Queue"),
- @ActivationConfigProperty(propertyName="destination",
- propertyValue=BotEvent.QUEUE_NAME)
-})
-public class AimQueueConsumerBean implements MessageListener {
+ Service
+public class AimQueueConsumerBean implements SimpleServiceMBean {
static private final Logger logger = GlobalSetup.getLogger(AimQueueConsumerBean.class);
+ private JmsConsumer consumer;
+ private Thread consumerThread;
@EJB
private TokenSystem tokenSystem;
@@ -59,6 +60,18 @@
@EJB
private AimQueueSender aimQueueSender;
+ public void start() {
+ consumer = new JmsConsumer(BotTask.QUEUE_NAME, JmsConnectionType.NONTRANSACTED_IN_SERVER);
+ consumerThread = new Thread(new AimQueueConsumer(), "AimQueueConsumer");
+ consumerThread.start();
+
+ }
+
+ public void stop() {
+ consumer.close(); // Will stop consumer thread as a side effect
+ consumer = null;
+ }
+
private void sendHtmlReplyMessage(BotEvent event, String aimName, String htmlMessage) {
BotTaskMessage message = new BotTaskMessage(event.getBotName(), aimName, htmlMessage);
aimQueueSender.sendMessage(message);
@@ -120,35 +133,50 @@
}
}
- public void onMessage(Message message) {
- DataService.getModel().initializeReadWriteSession(AnonymousViewpoint.getInstance(Site.NONE));
+ private void handleMessage(ObjectMessage message) throws RetryException {
+ Object obj;
+ try {
+ obj = ((message).getObject());
+ } catch (JMSException e) {
+ logger.warn("Error retrieving object from queue.", e);
+ return;
+ }
- try {
- if (message instanceof ObjectMessage) {
- ObjectMessage objectMessage = (ObjectMessage) message;
- Object obj = objectMessage.getObject();
-
- logger.debug("Got object in {}: {}", BotEvent.QUEUE_NAME, obj);
-
- if (obj instanceof BotEventToken) {
- BotEventToken event = (BotEventToken) obj;
- processTokenEvent(event);
- } else if (obj instanceof BotEventLogin) {
- BotEventLogin event = (BotEventLogin) obj;
- processLoginEvent(event);
- } else {
- logger.warn("Got unknown object: " + obj);
+ logger.debug("Got object in {}: {}", BotEvent.QUEUE_NAME, obj);
+
+ if (obj instanceof BotEventToken) {
+ BotEventToken event = (BotEventToken) obj;
+ processTokenEvent(event);
+ } else if (obj instanceof BotEventLogin) {
+ BotEventLogin event = (BotEventLogin) obj;
+ processLoginEvent(event);
+ } else {
+ logger.warn("Got unknown object: " + obj);
+ }
+ }
+
+ private class AimQueueConsumer implements Runnable {
+ public void run() {
+ while (true) {
+ try {
+ final Message message = consumer.receive();
+ if (!(message instanceof ObjectMessage)) {
+ logger.warn("Got unexpected type of message in queue.");
+ continue;
+ }
+ TxUtils.runInTransaction(new TxRunnable() {
+ public void run() throws RetryException {
+ DataService.getModel().initializeReadWriteSession(AnonymousViewpoint.getInstance(Site.NONE));
+ handleMessage((ObjectMessage)message);
+ }
+ });
+ } catch (JmsShutdownException e) {
+ logger.debug("Queue was shut down, exiting thread");
+ break;
+ } catch (RuntimeException e) {
+ logger.error("Unexpected error receiving AIM queue messages", e);
}
- } else {
- logger.warn("Got unknown jms message: {}", message);
}
- } catch (RetryException e) {
- // We hope that the messaging system will redeliver the message and provide
- // the retry. We can't do the retry ourself since we are already in a
- // transaction
- throw new RuntimeException(e);
- } catch (JMSException e) {
- logger.warn("JMS exception in bot event queue", e);
}
}
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]