如何使用 JMS API 从 Solace 侦听器将 NACK 发送到 Solace 队列?
How to send NACK to Solace queue from Solace listener using JMS API?
需要您的帮助才能找到解决方案。当前实施细节:
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory(); // for create connection factory using host , vpn,trust store,keystore with auth scheme AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE
this.connection = connectionFactory.createConnection(); // connection creation
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // session creation using client acknowledge
使用MessageListener监听队列
public class MyListener implements MessageListener
public void onMessage(Message message) // receive the message and process
{
/* in this method validating the message (poison message)
* and publish to kafka, once receive the success message
* from kafka acknowledge the message
*/
message.acknowledge();
}
问题:如果Kafka broker挂了怎么重试?我们与内部 Solace 团队进行了讨论。
我们假设 如果客户端没有调用 message.acknowledge()
Solace 将进行重试,但内部 Solace 团队澄清说它正在使用窗口机制,因此如果后续消息被确认然后之前的消息也将被确认并删除。
他们的建议是针对失败的消息发回 NACK(否定确认),然后 Solace 可以重试该消息。如何使用 Java API?
发送 NACK
Max_Un_Ack_Message
和 Max_Redeliver_Count
的建议值是多少?
安慰的 Maven 依赖项:
<dependency>
<groupId>com.solacesystems</groupId>
<artifactId>sol-jms</artifactId>
<version>10.0.0</version>
</dependency>
如果您正在使用 JMS API 并且您想要触发重新传送,那么您有几个选择。
您可以使用 transacted session。成功处理消息后,您确认消息并 commit()
javax.jms.Session
。当处理不成功时,您在 javax.jms.Session
.
上调用 rollback()
或者,您可以使用 javax.jms.Session.recover()
。 JavaDoc 是这样说的:
Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all messages that have been delivered to the client.
Restarting a session causes it to take the following actions:
- Stop message delivery
- Mark all messages that might have been delivered but not acknowledged as "redelivered"
- Restart the delivery sequence including all unacknowledged messages that had been previously delivered. Redelivered messages do not have to be delivered in exactly their original delivery order.
当然,这假设 Solace JMS 客户端实际实现了此处指定的行为。
根据讨论 here,message.acknowledge()
与 windowing 无关。由于您在示例中使用的是异步解决方案,如果您不调用 stop()
,传输 window 将关闭并且消息将不会被确认。
message.acknowledge()
仅用于确认消息已被接收和使用,消息已从 Solace 代理的持久存储中删除。请注意,未确认的消息只会在下一个要连接的消费者上重新传递。
需要您的帮助才能找到解决方案。当前实施细节:
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory(); // for create connection factory using host , vpn,trust store,keystore with auth scheme AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE
this.connection = connectionFactory.createConnection(); // connection creation
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // session creation using client acknowledge
使用MessageListener监听队列
public class MyListener implements MessageListener
public void onMessage(Message message) // receive the message and process
{
/* in this method validating the message (poison message)
* and publish to kafka, once receive the success message
* from kafka acknowledge the message
*/
message.acknowledge();
}
问题:如果Kafka broker挂了怎么重试?我们与内部 Solace 团队进行了讨论。
我们假设 如果客户端没有调用 message.acknowledge()
Solace 将进行重试,但内部 Solace 团队澄清说它正在使用窗口机制,因此如果后续消息被确认然后之前的消息也将被确认并删除。
他们的建议是针对失败的消息发回 NACK(否定确认),然后 Solace 可以重试该消息。如何使用 Java API?
发送 NACKMax_Un_Ack_Message
和 Max_Redeliver_Count
的建议值是多少?
安慰的 Maven 依赖项:
<dependency>
<groupId>com.solacesystems</groupId>
<artifactId>sol-jms</artifactId>
<version>10.0.0</version>
</dependency>
如果您正在使用 JMS API 并且您想要触发重新传送,那么您有几个选择。
您可以使用 transacted session。成功处理消息后,您确认消息并 commit()
javax.jms.Session
。当处理不成功时,您在 javax.jms.Session
.
rollback()
或者,您可以使用 javax.jms.Session.recover()
。 JavaDoc 是这样说的:
Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all messages that have been delivered to the client.
Restarting a session causes it to take the following actions:
- Stop message delivery
- Mark all messages that might have been delivered but not acknowledged as "redelivered"
- Restart the delivery sequence including all unacknowledged messages that had been previously delivered. Redelivered messages do not have to be delivered in exactly their original delivery order.
当然,这假设 Solace JMS 客户端实际实现了此处指定的行为。
根据讨论 here,message.acknowledge()
与 windowing 无关。由于您在示例中使用的是异步解决方案,如果您不调用 stop()
,传输 window 将关闭并且消息将不会被确认。
message.acknowledge()
仅用于确认消息已被接收和使用,消息已从 Solace 代理的持久存储中删除。请注意,未确认的消息只会在下一个要连接的消费者上重新传递。