消息消费后出现错误,如何将消息保存在JMS Message Queue中?

How to hold messages in JMS Message Queue if there are any error after consuming the message?

我的方案是 - 我 post 将消息放入队列,一旦消息被使用,我就会将其发送到第三方中间件应用程序。如果该中间件应用程序已关闭,那么我的 posted 消息就被扔掉了。如果中间件应用程序关闭,我不想丢失该消息,而是希望它暂停或在队列中等待。请建议,如何处理这种情况?

虽然,您的应用程序是如何设计使用消息的并不是很清楚,但我建议您修改消费者应用程序以在本地事务中使用消息,并 post 向第三方中间件应用程序发送消息。如果 post 成功,那么您可以提交将从 JMS 队列中删除消息的事务。如果您的应用程序无法发送 post 消息,您可以简单地回滚事务,这将使消息重新出现在 JMS 队列中。该消息将再次发送。

您应该像这样创建会话:

Session session = connection.createSession(false,
                       Session.CLIENT_ACKNOWLEDGE);

当您尝试将消息发送到您的第三方应用程序时:

  • 如果有效,您应该确认消息。

  • 如果它已关闭,您不应该确认它,这样 JMS 提供程序将能够重新传送它,并且消息不会丢失。 message.acknowledge();

另外,你可以看看这个:JMS AUTO_ACKNOWLEDGE when is it acknowledged?

这可以通过使用会话确认来实现。为此,首先修改您的生产者代码以使用 Session.AUTO_ACKNOWLEDGE。在创建队列会话时,将 AUTO_ACKNOWLEDGE 设置为 false。这意味着消费者必须承认。当消费者发送确认消息时,消息将从队列中删除,否则将保留在队列中。

以下是生产者代码。

try {
        QueueConnectionFactory qcf = AppUtils.getQueueConnectionFactory(); 
        Queue q = AppUtils.getDestination();
        QueueConnection qConnection = qcf.createQueueConnection();
        QueueSession qSession = qConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

        QueueSender qSender = qSession.createSender(q);

        qConnection.start();
        TextMessage msg = qSession.createTextMessage("Hello");
        qSender.send(msg);

        qSender.close();
        qConnection.close();

    } catch (JMSException e) {
        // log your error to log file
        e.printStackTrace();
    }

在消费者方面,您必须做同样的事情,创建一个 AUTO_ACKNOWLEDGE 为 false 的队列会话。

处理消息后,您可以发送确认以从队列中删除消息,否则消息将保留在队列中。

try {
        QueueConnectionFactory qcf = getQueueConnectionFactory(); 
        Queue q = getDestination();
        QueueConnection qConnection = qcf.createQueueConnection();
        QueueSession qSession = qConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

        QueueReceiver qReceiver = qSession.createReceiver(q);

        qConnection.start();
        Message msg = qReceiver.receive();

  // here send your message to third party application 
  //if your third party application is down 
        if(thirdpartyapp is down){
          //here you can raise an exception 
          //or just do nothing 
          // you're not sending acknowledgement here so the msg will 
           //remain in the queue
        }else{      
          msg.acknowledge();//here youre sending ack, so msg will be deleted
          qReceiver.close();
          qConnection.close();
        }

    } catch (JMSException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

JMS 队列不是消息存储。

如果您有一个 "bad message" 处理继续失败,那么 JMS 服务器(如果已配置)将不可避免地将该消息转储到一个 "Dead Message Queue",它将慢慢填满直到不同的过程会耗尽它。

您不希望将不良消息保留在队列中,因为它们可能会阻塞队列(假设您有 10 个消费者,并且前 10 条消息都是不良消息,因此所有进程所做的就是继续连枷坏消息 - 拖延队列)。

因此,您需要一些机制来将消息存储到异常接收器中,然后它们可以稍后注入到主队列中进行处理。

死消息队列不是这种机制(不要将消息存储在 JMS 队列中),而是一种将异常消息路由到更永久的存储区域(即数据库 table或其他东西)。

到达那里后,可以对其进行审查(自动、手动等)并重新提交或取消。

但关键是您需要一个外部机制,单独的 JMS 服务器不适合此类消息。