收到消息后关闭 JMS 会话和连接
Close JMS session and connection after message received
我有一个有状态会话 bean,我可以在其中发送和接收 JMS 消息。所有连接设置都是手动处理的,因此 bean 持有 javax.jms.connection 和 javax.jms.session 的实例。该 bean 还实现了 MessageListener 以便能够接收消息。
现在,当我发送消息时,我使用 session.createTemporaryQueue() 创建了一个临时队列。我将 message.setJMSReplyTo() 设置为同一个临时队列,最后创建该队列的使用者并将 MessageListener 设置为实现所有这些的同一个有状态会话 bean。
我很乐意将消息发送到 onMessage() 方法。但是,我想在收到消息后立即关闭会话和连接,这在 onMessage() 方法中显然是不允许的。
所以问题是:
收到消息后如何关闭会话和连接?我必须手动处理连接设置,不能使用 MDB。
注意:
这是在 Java EE 环境 (GlassFish 4.0)
中执行的
编辑:
import javax.ejb.LocalBean;
import javax.ejb.Stateful;
import javax.inject.Inject;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.QueueConnectionFactory;
@LocalBean
@Stateful
public class OpenMqClient implements MessageListener{
private Connection connection;
private Session session;
private MessageConsumer responseConsumer;
public OpenMqClient(){}
public void sendMessage(String messageContent, String jmsBrokerUri, String queueName) {
try{
String host = System.getProperty("foo", jmsBrokerUri);
QueueConnectionFactory cf = new QueueConnectionFactory();
cf.setProperty(ConnectionConfiguration.imqAddressList, host);
connection = null;
session = null;
//Setup connection
connection = cf.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//Setup queue and producer
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
//Reply destination
Queue responseQueue = session.createTemporaryQueue();
responseConsumer = session.createConsumer(responseQueue);
responseConsumer.setMessageListener(this);
//Create message
TextMessage textMessage = session.createTextMessage();
textMessage.setJMSReplyTo(responseQueue);
textMessage.setJMSCorrelationID("test0101");
textMessage.setText(messageContent);
producer.send(textMessage);
System.out.println("Message sent");
} catch (JMSException e) {
e.printStackTrace();
System.out.println("JMSException in Sender");
}
}
@Override
public void onMessage(Message arg0) {
//On this event I want to close the session and connection, but it's not permitted
}
}
就我个人而言,这就是我的做法(请注意,我没有测试或向这段代码添加太多错误处理)。
- 将连接设置为静态 - 您可以(可能应该)为所有 bean 重用相同的连接,除非您有特殊原因不这样做
在新线程中关闭会话
public class OpenMqClient implements MessageListener {
private static Connection connection;
private static final String mutex = "mutex";
private Session session;
private MessageConsumer responseConsumer;
public OpenMqClient() {
if(connection == null) {
synchronized(mutex) {
if(connection == null) {
String host = System.getProperty("foo", jmsBrokerUri);
QueueConnectionFactory cf = new QueueConnectionFactory();
cf.setProperty(ConnectionConfiguration.imqAddressList, host);
// Setup connection
connection = cf.createConnection();
connection.start();
}
}
}
}
public void sendMessage(String messageContent, String jmsBrokerUri, String queueName) {
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Setup queue and producer
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
// Reply destination
Queue responseQueue = session.createTemporaryQueue();
responseConsumer = session.createConsumer(responseQueue);
responseConsumer.setMessageListener(this);
// Create message
TextMessage textMessage = session.createTextMessage();
textMessage.setJMSReplyTo(responseQueue);
textMessage.setJMSCorrelationID("test0101");
textMessage.setText(messageContent);
producer.send(textMessage);
System.out.println("Message sent");
} catch (JMSException e) {
e.printStackTrace();
System.out.println("JMSException in Sender");
}
}
@Override
public void onMessage(Message arg0) {
// do stuff
new Thread(
new Runnable() {
@Override
public void run() {
if(session != null)
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
).start();
}
}
我有一个有状态会话 bean,我可以在其中发送和接收 JMS 消息。所有连接设置都是手动处理的,因此 bean 持有 javax.jms.connection 和 javax.jms.session 的实例。该 bean 还实现了 MessageListener 以便能够接收消息。
现在,当我发送消息时,我使用 session.createTemporaryQueue() 创建了一个临时队列。我将 message.setJMSReplyTo() 设置为同一个临时队列,最后创建该队列的使用者并将 MessageListener 设置为实现所有这些的同一个有状态会话 bean。
我很乐意将消息发送到 onMessage() 方法。但是,我想在收到消息后立即关闭会话和连接,这在 onMessage() 方法中显然是不允许的。
所以问题是: 收到消息后如何关闭会话和连接?我必须手动处理连接设置,不能使用 MDB。
注意: 这是在 Java EE 环境 (GlassFish 4.0)
中执行的编辑:
import javax.ejb.LocalBean;
import javax.ejb.Stateful;
import javax.inject.Inject;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.QueueConnectionFactory;
@LocalBean
@Stateful
public class OpenMqClient implements MessageListener{
private Connection connection;
private Session session;
private MessageConsumer responseConsumer;
public OpenMqClient(){}
public void sendMessage(String messageContent, String jmsBrokerUri, String queueName) {
try{
String host = System.getProperty("foo", jmsBrokerUri);
QueueConnectionFactory cf = new QueueConnectionFactory();
cf.setProperty(ConnectionConfiguration.imqAddressList, host);
connection = null;
session = null;
//Setup connection
connection = cf.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//Setup queue and producer
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
//Reply destination
Queue responseQueue = session.createTemporaryQueue();
responseConsumer = session.createConsumer(responseQueue);
responseConsumer.setMessageListener(this);
//Create message
TextMessage textMessage = session.createTextMessage();
textMessage.setJMSReplyTo(responseQueue);
textMessage.setJMSCorrelationID("test0101");
textMessage.setText(messageContent);
producer.send(textMessage);
System.out.println("Message sent");
} catch (JMSException e) {
e.printStackTrace();
System.out.println("JMSException in Sender");
}
}
@Override
public void onMessage(Message arg0) {
//On this event I want to close the session and connection, but it's not permitted
}
}
就我个人而言,这就是我的做法(请注意,我没有测试或向这段代码添加太多错误处理)。
- 将连接设置为静态 - 您可以(可能应该)为所有 bean 重用相同的连接,除非您有特殊原因不这样做
在新线程中关闭会话
public class OpenMqClient implements MessageListener { private static Connection connection; private static final String mutex = "mutex"; private Session session; private MessageConsumer responseConsumer; public OpenMqClient() { if(connection == null) { synchronized(mutex) { if(connection == null) { String host = System.getProperty("foo", jmsBrokerUri); QueueConnectionFactory cf = new QueueConnectionFactory(); cf.setProperty(ConnectionConfiguration.imqAddressList, host); // Setup connection connection = cf.createConnection(); connection.start(); } } } } public void sendMessage(String messageContent, String jmsBrokerUri, String queueName) { try { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Setup queue and producer Queue queue = session.createQueue(queueName); MessageProducer producer = session.createProducer(queue); // Reply destination Queue responseQueue = session.createTemporaryQueue(); responseConsumer = session.createConsumer(responseQueue); responseConsumer.setMessageListener(this); // Create message TextMessage textMessage = session.createTextMessage(); textMessage.setJMSReplyTo(responseQueue); textMessage.setJMSCorrelationID("test0101"); textMessage.setText(messageContent); producer.send(textMessage); System.out.println("Message sent"); } catch (JMSException e) { e.printStackTrace(); System.out.println("JMSException in Sender"); } } @Override public void onMessage(Message arg0) { // do stuff new Thread( new Runnable() { @Override public void run() { if(session != null) try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } } ).start(); } }