当崩溃的核心客户端重新连接时,确认的消息被重新传递

Acknowledged messages are redelivered when crashed core client reconnects

我在本地设置了一个独立的 HornetQ 实例 运行。出于测试目的,我使用 HornetQ core API 创建了一个消费者,它将每 500 毫秒收到一条消息。

当我的客户端连接并从队列中读取所有消息时,我在消费者端面临一个奇怪的行为,如果我强制关闭它(没有正确关闭 session/connection),那么下次我启动这个消费者它将再次从队列中读取旧消息。这是我的消费者示例:

// HornetQ消费者代码

   public void readMessage() {
    ClientSession session = null;
    try {
        if (sf != null) {
            session = sf.createSession(true, true);

            ClientConsumer messageConsumer = session.createConsumer(JMS_QUEUE_NAME);
            session.start();

            while (true) {
                ClientMessage messageReceived = messageConsumer.receive(1000);
                if (messageReceived != null && messageReceived.getStringProperty(MESSAGE_PROPERTY_NAME) != null) {
                    System.out.println("Received JMS TextMessage:" + messageReceived.getStringProperty(MESSAGE_PROPERTY_NAME));
                    messageReceived.acknowledge();
                }

                Thread.sleep(500);
            }
        }
    } catch (Exception e) {
        LOGGER.error("Error while adding message by producer.", e);
    } finally {
        try {
            session.close();
        } catch (HornetQException e) {
            LOGGER.error("Error while closing producer session,", e);
        }
    }
}

有人能告诉我为什么它会这样工作吗?我应该在 client/server 端使用什么样的配置,以便如果消息被消费者读取,它将从队列中删除?

您没有在确认完成后提交会话,也没有在启用自动提交确认的情况下创建会话。因此,您应该执行以下操作之一:

  • acknowledge()
  • 的一次或多次调用后显式调用 session.commit()
  • 或者通过使用 sf.createSession(true,true)sf.createSession(false,true) 创建会话来启用确认的隐式自动提交(控制自动提交确认的布尔值是第二个)。

请记住,当您为确认启用自动提交时,有一个内部缓冲区需要在确认被刷新到代理之前达到特定大小。像这样的批处理确认可以极大地提高某些大容量用例的性能。默认情况下,您需要确认 1,048,576 字节的消息,以便刷新缓冲区并将确认发送给代理。您可以通过在 ServerLocator 实例上调用 setAckBatchSize 或使用不同的 createSession 方法(例如 sf.createSession(true, true, myAckBatchSize))来更改此缓冲区的大小。

如果确认缓冲区未刷新并且您的客户端崩溃,则当客户端返回时相应的消息仍将在队列中。如果缓冲区没有达到它的阈值,它仍然会在消费者正常关闭时被刷新。