使用多线程侦听器处理 JMS 事务和重新传递
Handling JMS transactions and redelivery with multi-threaded listeners
我正在使用 JMS 在 Java 1.8 SE 环境中处理消息。消息源自 Oracle Advanced Queue。因为处理消息可能需要一段时间,所以我决定有一个包含 5 个工作线程(MessageHandler 对象)的池,这样多个线程可以同时处理消息。我想要有保证的传递,没有重复的消息传递。
我用
queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
创建 QueueSession。我使用下面的代码来处理传入的消息。基本上,onMessage
生成一个处理消息的线程。
public class JmsQueueListener implements MessageListener
{
/** A pool of worker threads for handling requests. */
private final ExecutorService pool;
OracleJmsQueue queue;
public void onMessage(Message msg)
{
pool.execute(new MessageHandler(msg));
// can't commit here - the thread may still be processing
}
/**
* This class provides a "worker thread" for processing a message
* from the queue.
*/
private class MessageHandler implements Runnable {
/**
* The message to process
*/
Message message;
/**
* The constructor stores the passed in message as a field
*/
MessageHandler(Message message) {
this.message = message;
}
/**
* Processes the message provided to the constructor by
* calling the appropriate business logic.
*/
public void run() {
QueueSession queueSession = queue.getQueueSession();
try {
String result = requestManager.processMessage(message);
if (result != null) {
queueSession.commit();
}
else {
queueSession.rollback();
}
}
catch (Exception ex) {
try {
queueSession.rollback();
}
catch (JMSException e) {
}
}
}
} // class MessageHandler
我的问题是我不知道如何向原始队列指示处理是否已成功完成。我无法在 onMessage
结束时提交,因为线程可能尚未完成处理。我认为我目前拥有 commit
s 和 rollback
s 的地方也没有什么好处。例如,如果 5 个工作线程处于各种完成状态,正在提交的队列会话的状态是什么?
我想我一定是遗漏了一些关于如何以多线程方式处理 JMS 的基本概念。任何帮助将不胜感激。
您正在使用异步消息处理,因此,除非您实施一种方法确保每条消息处理按时间顺序完成,否则您最终会遇到较旧的消息处理在最近的消息处理之后完成的情况。那么为什么要使用消息服务?
您的问题的一个简单解决方案是 commit
在 onMessage
方法的末尾,在 messageHandler
re-enqueue 的正文中错误。但是,如果 re-enqueue 本身失败,此解决方案可能会出现问题。
我正在使用 JMS 在 Java 1.8 SE 环境中处理消息。消息源自 Oracle Advanced Queue。因为处理消息可能需要一段时间,所以我决定有一个包含 5 个工作线程(MessageHandler 对象)的池,这样多个线程可以同时处理消息。我想要有保证的传递,没有重复的消息传递。
我用
queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
创建 QueueSession。我使用下面的代码来处理传入的消息。基本上,onMessage
生成一个处理消息的线程。
public class JmsQueueListener implements MessageListener
{
/** A pool of worker threads for handling requests. */
private final ExecutorService pool;
OracleJmsQueue queue;
public void onMessage(Message msg)
{
pool.execute(new MessageHandler(msg));
// can't commit here - the thread may still be processing
}
/**
* This class provides a "worker thread" for processing a message
* from the queue.
*/
private class MessageHandler implements Runnable {
/**
* The message to process
*/
Message message;
/**
* The constructor stores the passed in message as a field
*/
MessageHandler(Message message) {
this.message = message;
}
/**
* Processes the message provided to the constructor by
* calling the appropriate business logic.
*/
public void run() {
QueueSession queueSession = queue.getQueueSession();
try {
String result = requestManager.processMessage(message);
if (result != null) {
queueSession.commit();
}
else {
queueSession.rollback();
}
}
catch (Exception ex) {
try {
queueSession.rollback();
}
catch (JMSException e) {
}
}
}
} // class MessageHandler
我的问题是我不知道如何向原始队列指示处理是否已成功完成。我无法在 onMessage
结束时提交,因为线程可能尚未完成处理。我认为我目前拥有 commit
s 和 rollback
s 的地方也没有什么好处。例如,如果 5 个工作线程处于各种完成状态,正在提交的队列会话的状态是什么?
我想我一定是遗漏了一些关于如何以多线程方式处理 JMS 的基本概念。任何帮助将不胜感激。
您正在使用异步消息处理,因此,除非您实施一种方法确保每条消息处理按时间顺序完成,否则您最终会遇到较旧的消息处理在最近的消息处理之后完成的情况。那么为什么要使用消息服务?
您的问题的一个简单解决方案是 commit
在 onMessage
方法的末尾,在 messageHandler
re-enqueue 的正文中错误。但是,如果 re-enqueue 本身失败,此解决方案可能会出现问题。