当崩溃的核心客户端重新连接时,确认的消息被重新传递
Acknowledged messages are redelivered when crashed core client reconnects
我在本地设置了一个独立的 HornetQ 实例 运行。出于测试目的,我使用 HornetQ core API 创建了一个消费者,它将每 500 毫秒收到一条消息。
当我的客户端连接并从队列中读取所有消息时,我在消费者端面临一个奇怪的行为,如果我强制关闭它(没有正确关闭 session/connection),那么下次我启动这个消费者它将再次从队列中读取旧消息。这是我的消费者示例:
// HornetQ消费者代码
public void readMessage() {
ClientSession session = null;
try {
if (sf != null) {
session = sf.createSession(true, true);
ClientConsumer messageConsumer = session.createConsumer(JMS_QUEUE_NAME);
session.start();
while (true) {
ClientMessage messageReceived = messageConsumer.receive(1000);
if (messageReceived != null && messageReceived.getStringProperty(MESSAGE_PROPERTY_NAME) != null) {
System.out.println("Received JMS TextMessage:" + messageReceived.getStringProperty(MESSAGE_PROPERTY_NAME));
messageReceived.acknowledge();
}
Thread.sleep(500);
}
}
} catch (Exception e) {
LOGGER.error("Error while adding message by producer.", e);
} finally {
try {
session.close();
} catch (HornetQException e) {
LOGGER.error("Error while closing producer session,", e);
}
}
}
有人能告诉我为什么它会这样工作吗?我应该在 client/server 端使用什么样的配置,以便如果消息被消费者读取,它将从队列中删除?
您没有在确认完成后提交会话,也没有在启用自动提交确认的情况下创建会话。因此,您应该执行以下操作之一:
- 在
acknowledge()
的一次或多次调用后显式调用 session.commit()
- 或者通过使用
sf.createSession(true,true)
或 sf.createSession(false,true)
创建会话来启用确认的隐式自动提交(控制自动提交确认的布尔值是第二个)。
请记住,当您为确认启用自动提交时,有一个内部缓冲区需要在确认被刷新到代理之前达到特定大小。像这样的批处理确认可以极大地提高某些大容量用例的性能。默认情况下,您需要确认 1,048,576 字节的消息,以便刷新缓冲区并将确认发送给代理。您可以通过在 ServerLocator
实例上调用 setAckBatchSize
或使用不同的 createSession
方法(例如 sf.createSession(true, true, myAckBatchSize)
)来更改此缓冲区的大小。
如果确认缓冲区未刷新并且您的客户端崩溃,则当客户端返回时相应的消息仍将在队列中。如果缓冲区没有达到它的阈值,它仍然会在消费者正常关闭时被刷新。
我在本地设置了一个独立的 HornetQ 实例 运行。出于测试目的,我使用 HornetQ core API 创建了一个消费者,它将每 500 毫秒收到一条消息。
当我的客户端连接并从队列中读取所有消息时,我在消费者端面临一个奇怪的行为,如果我强制关闭它(没有正确关闭 session/connection),那么下次我启动这个消费者它将再次从队列中读取旧消息。这是我的消费者示例:
// HornetQ消费者代码
public void readMessage() {
ClientSession session = null;
try {
if (sf != null) {
session = sf.createSession(true, true);
ClientConsumer messageConsumer = session.createConsumer(JMS_QUEUE_NAME);
session.start();
while (true) {
ClientMessage messageReceived = messageConsumer.receive(1000);
if (messageReceived != null && messageReceived.getStringProperty(MESSAGE_PROPERTY_NAME) != null) {
System.out.println("Received JMS TextMessage:" + messageReceived.getStringProperty(MESSAGE_PROPERTY_NAME));
messageReceived.acknowledge();
}
Thread.sleep(500);
}
}
} catch (Exception e) {
LOGGER.error("Error while adding message by producer.", e);
} finally {
try {
session.close();
} catch (HornetQException e) {
LOGGER.error("Error while closing producer session,", e);
}
}
}
有人能告诉我为什么它会这样工作吗?我应该在 client/server 端使用什么样的配置,以便如果消息被消费者读取,它将从队列中删除?
您没有在确认完成后提交会话,也没有在启用自动提交确认的情况下创建会话。因此,您应该执行以下操作之一:
- 在
acknowledge()
的一次或多次调用后显式调用 - 或者通过使用
sf.createSession(true,true)
或sf.createSession(false,true)
创建会话来启用确认的隐式自动提交(控制自动提交确认的布尔值是第二个)。
session.commit()
请记住,当您为确认启用自动提交时,有一个内部缓冲区需要在确认被刷新到代理之前达到特定大小。像这样的批处理确认可以极大地提高某些大容量用例的性能。默认情况下,您需要确认 1,048,576 字节的消息,以便刷新缓冲区并将确认发送给代理。您可以通过在 ServerLocator
实例上调用 setAckBatchSize
或使用不同的 createSession
方法(例如 sf.createSession(true, true, myAckBatchSize)
)来更改此缓冲区的大小。
如果确认缓冲区未刷新并且您的客户端崩溃,则当客户端返回时相应的消息仍将在队列中。如果缓冲区没有达到它的阈值,它仍然会在消费者正常关闭时被刷新。