确认一条消息不应该导致它被重新传递给 Kafka 消费者吗?
Should not acknowledging a message cause it to be redelivered to the Kafka consumer?
我是 运行 一个 Spring Cloud Stream 应用程序,我从 Kafka 主题读取交易,处理交易,然后将它们发送到 IBM MQ。我试图在与 IBM MQ 没有连接时处理错误以防止事务丢失。在这种情况下,jms 模板将抛出异常,流侦听器将不会提交事务。预期的行为是事务保留在 Kafka 主题中,流侦听器再次读取它。但是,该消息似乎只被消耗了一次,并且没有发生 "rollback" 。为此,这是我的配置:
spring:
cloud:
stream:
kafka:
bindings:
input:
consumer:
auto-commit-offset: false
bindings:
input:
destination: kafka_topic
brokers: localhost:9092
这是代码:
public void handleMessage(Message<TransactionMessage> request, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
TransactionMessage message = request.getPayload();
System.out.println("Consumed a message");
try {
executionFlow.execute(message); // here the jmsTemplate throws an exception
System.out.println("doing the ack");
acknowledgment.acknowledge();
}
catch (RuntimeException e) {
System.out.println("did not send to MQ");
}
}
executionFlow调用的jmsTemplate代码:
public void sendMessage(String messageTarget) {
System.out.println("i am trying to send to MQ");
try {
jmsTemplate.convertAndSend(destinationTopicQueue, messageTarget);
} catch (Exception e) {
throw new RuntimeException("jmsTemplate failed to send to IBM MQ");
}
}
这是我关闭与 IBM MQ 的连接时的输出:
Consumed a message
i am trying to send to MQ
did not send to MQ
catch (RuntimeException e) {
System.out.println("did not send to MQ");
}
需要重新抛出异常才能回滚
您还需要在活页夹中启用 Kafka 事务。
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
Enables transactions in the binder. See transaction.id in the Kafka documentation and Transactions in the spring-kafka documentation. When transactions are enabled, individual producer properties are ignored and all producers use the spring.cloud.stream.kafka.binder.transaction.producer.* properties.
Default null (no transactions)
如果你也向kafka发送数据,你需要一个事务生产者
spring.cloud.stream.kafka.binder.transaction.producer.*
Global producer properties for producers in a transactional binder. See spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix and Kafka Producer Properties and the general producer properties supported by all binders.
Default: See individual producer properties.
我是 运行 一个 Spring Cloud Stream 应用程序,我从 Kafka 主题读取交易,处理交易,然后将它们发送到 IBM MQ。我试图在与 IBM MQ 没有连接时处理错误以防止事务丢失。在这种情况下,jms 模板将抛出异常,流侦听器将不会提交事务。预期的行为是事务保留在 Kafka 主题中,流侦听器再次读取它。但是,该消息似乎只被消耗了一次,并且没有发生 "rollback" 。为此,这是我的配置:
spring:
cloud:
stream:
kafka:
bindings:
input:
consumer:
auto-commit-offset: false
bindings:
input:
destination: kafka_topic
brokers: localhost:9092
这是代码:
public void handleMessage(Message<TransactionMessage> request, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
TransactionMessage message = request.getPayload();
System.out.println("Consumed a message");
try {
executionFlow.execute(message); // here the jmsTemplate throws an exception
System.out.println("doing the ack");
acknowledgment.acknowledge();
}
catch (RuntimeException e) {
System.out.println("did not send to MQ");
}
}
executionFlow调用的jmsTemplate代码:
public void sendMessage(String messageTarget) {
System.out.println("i am trying to send to MQ");
try {
jmsTemplate.convertAndSend(destinationTopicQueue, messageTarget);
} catch (Exception e) {
throw new RuntimeException("jmsTemplate failed to send to IBM MQ");
}
}
这是我关闭与 IBM MQ 的连接时的输出:
Consumed a message
i am trying to send to MQ
did not send to MQ
catch (RuntimeException e) {
System.out.println("did not send to MQ");
}
需要重新抛出异常才能回滚
您还需要在活页夹中启用 Kafka 事务。
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
Enables transactions in the binder. See transaction.id in the Kafka documentation and Transactions in the spring-kafka documentation. When transactions are enabled, individual producer properties are ignored and all producers use the spring.cloud.stream.kafka.binder.transaction.producer.* properties.
Default null (no transactions)
如果你也向kafka发送数据,你需要一个事务生产者
spring.cloud.stream.kafka.binder.transaction.producer.*
Global producer properties for producers in a transactional binder. See spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix and Kafka Producer Properties and the general producer properties supported by all binders.
Default: See individual producer properties.