消息消费后出现错误,如何将消息保存在JMS Message Queue中?
How to hold messages in JMS Message Queue if there are any error after consuming the message?
我的方案是 - 我 post 将消息放入队列,一旦消息被使用,我就会将其发送到第三方中间件应用程序。如果该中间件应用程序已关闭,那么我的 posted 消息就被扔掉了。如果中间件应用程序关闭,我不想丢失该消息,而是希望它暂停或在队列中等待。请建议,如何处理这种情况?
虽然,您的应用程序是如何设计使用消息的并不是很清楚,但我建议您修改消费者应用程序以在本地事务中使用消息,并 post 向第三方中间件应用程序发送消息。如果 post 成功,那么您可以提交将从 JMS 队列中删除消息的事务。如果您的应用程序无法发送 post 消息,您可以简单地回滚事务,这将使消息重新出现在 JMS 队列中。该消息将再次发送。
您应该像这样创建会话:
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
当您尝试将消息发送到您的第三方应用程序时:
如果有效,您应该确认消息。
如果它已关闭,您不应该确认它,这样 JMS 提供程序将能够重新传送它,并且消息不会丢失。 message.acknowledge();
另外,你可以看看这个:JMS AUTO_ACKNOWLEDGE when is it acknowledged?
这可以通过使用会话确认来实现。为此,首先修改您的生产者代码以使用 Session.AUTO_ACKNOWLEDGE。在创建队列会话时,将 AUTO_ACKNOWLEDGE 设置为 false。这意味着消费者必须承认。当消费者发送确认消息时,消息将从队列中删除,否则将保留在队列中。
以下是生产者代码。
try {
QueueConnectionFactory qcf = AppUtils.getQueueConnectionFactory();
Queue q = AppUtils.getDestination();
QueueConnection qConnection = qcf.createQueueConnection();
QueueSession qSession = qConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender qSender = qSession.createSender(q);
qConnection.start();
TextMessage msg = qSession.createTextMessage("Hello");
qSender.send(msg);
qSender.close();
qConnection.close();
} catch (JMSException e) {
// log your error to log file
e.printStackTrace();
}
在消费者方面,您必须做同样的事情,创建一个 AUTO_ACKNOWLEDGE 为 false 的队列会话。
处理消息后,您可以发送确认以从队列中删除消息,否则消息将保留在队列中。
try {
QueueConnectionFactory qcf = getQueueConnectionFactory();
Queue q = getDestination();
QueueConnection qConnection = qcf.createQueueConnection();
QueueSession qSession = qConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver qReceiver = qSession.createReceiver(q);
qConnection.start();
Message msg = qReceiver.receive();
// here send your message to third party application
//if your third party application is down
if(thirdpartyapp is down){
//here you can raise an exception
//or just do nothing
// you're not sending acknowledgement here so the msg will
//remain in the queue
}else{
msg.acknowledge();//here youre sending ack, so msg will be deleted
qReceiver.close();
qConnection.close();
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
JMS 队列不是消息存储。
如果您有一个 "bad message" 处理继续失败,那么 JMS 服务器(如果已配置)将不可避免地将该消息转储到一个 "Dead Message Queue",它将慢慢填满直到不同的过程会耗尽它。
您不希望将不良消息保留在队列中,因为它们可能会阻塞队列(假设您有 10 个消费者,并且前 10 条消息都是不良消息,因此所有进程所做的就是继续连枷坏消息 - 拖延队列)。
因此,您需要一些机制来将消息存储到异常接收器中,然后它们可以稍后注入到主队列中进行处理。
死消息队列不是这种机制(不要将消息存储在 JMS 队列中),而是一种将异常消息路由到更永久的存储区域(即数据库 table或其他东西)。
到达那里后,可以对其进行审查(自动、手动等)并重新提交或取消。
但关键是您需要一个外部机制,单独的 JMS 服务器不适合此类消息。
我的方案是 - 我 post 将消息放入队列,一旦消息被使用,我就会将其发送到第三方中间件应用程序。如果该中间件应用程序已关闭,那么我的 posted 消息就被扔掉了。如果中间件应用程序关闭,我不想丢失该消息,而是希望它暂停或在队列中等待。请建议,如何处理这种情况?
虽然,您的应用程序是如何设计使用消息的并不是很清楚,但我建议您修改消费者应用程序以在本地事务中使用消息,并 post 向第三方中间件应用程序发送消息。如果 post 成功,那么您可以提交将从 JMS 队列中删除消息的事务。如果您的应用程序无法发送 post 消息,您可以简单地回滚事务,这将使消息重新出现在 JMS 队列中。该消息将再次发送。
您应该像这样创建会话:
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
当您尝试将消息发送到您的第三方应用程序时:
如果有效,您应该确认消息。
如果它已关闭,您不应该确认它,这样 JMS 提供程序将能够重新传送它,并且消息不会丢失。
message.acknowledge();
另外,你可以看看这个:JMS AUTO_ACKNOWLEDGE when is it acknowledged?
这可以通过使用会话确认来实现。为此,首先修改您的生产者代码以使用 Session.AUTO_ACKNOWLEDGE。在创建队列会话时,将 AUTO_ACKNOWLEDGE 设置为 false。这意味着消费者必须承认。当消费者发送确认消息时,消息将从队列中删除,否则将保留在队列中。
以下是生产者代码。
try {
QueueConnectionFactory qcf = AppUtils.getQueueConnectionFactory();
Queue q = AppUtils.getDestination();
QueueConnection qConnection = qcf.createQueueConnection();
QueueSession qSession = qConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender qSender = qSession.createSender(q);
qConnection.start();
TextMessage msg = qSession.createTextMessage("Hello");
qSender.send(msg);
qSender.close();
qConnection.close();
} catch (JMSException e) {
// log your error to log file
e.printStackTrace();
}
在消费者方面,您必须做同样的事情,创建一个 AUTO_ACKNOWLEDGE 为 false 的队列会话。
处理消息后,您可以发送确认以从队列中删除消息,否则消息将保留在队列中。
try {
QueueConnectionFactory qcf = getQueueConnectionFactory();
Queue q = getDestination();
QueueConnection qConnection = qcf.createQueueConnection();
QueueSession qSession = qConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver qReceiver = qSession.createReceiver(q);
qConnection.start();
Message msg = qReceiver.receive();
// here send your message to third party application
//if your third party application is down
if(thirdpartyapp is down){
//here you can raise an exception
//or just do nothing
// you're not sending acknowledgement here so the msg will
//remain in the queue
}else{
msg.acknowledge();//here youre sending ack, so msg will be deleted
qReceiver.close();
qConnection.close();
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
JMS 队列不是消息存储。
如果您有一个 "bad message" 处理继续失败,那么 JMS 服务器(如果已配置)将不可避免地将该消息转储到一个 "Dead Message Queue",它将慢慢填满直到不同的过程会耗尽它。
您不希望将不良消息保留在队列中,因为它们可能会阻塞队列(假设您有 10 个消费者,并且前 10 条消息都是不良消息,因此所有进程所做的就是继续连枷坏消息 - 拖延队列)。
因此,您需要一些机制来将消息存储到异常接收器中,然后它们可以稍后注入到主队列中进行处理。
死消息队列不是这种机制(不要将消息存储在 JMS 队列中),而是一种将异常消息路由到更永久的存储区域(即数据库 table或其他东西)。
到达那里后,可以对其进行审查(自动、手动等)并重新提交或取消。
但关键是您需要一个外部机制,单独的 JMS 服务器不适合此类消息。