如何手动提交不记录已通过 CommonErrorHandler 发送的 DLT 的偏移量
How to manual commit do not recorverd offset already sent DLT through CommonErrorHandler
目前正在通过spring kafka制作一个简单的例子。
如果服务层出现异常,我想重试后再提交原来的偏移量,加载到死信队列中
但是dead letter queue加载正常,但是由于commit没有处理,原来的消息还在kafka中。
向您展示我的代码,如下所示。
- KafkaConfig.java
...
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(kafkaListenerErrorHandler());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return factory;
}
private CommonErrorHandler kafkaListenerErrorHandler() {
DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(template, DEAD_TOPIC_DESTINATION_RESOLVER),
new FixedBackOff(1000, 3));
defaultErrorHandler.setCommitRecovered(true);
defaultErrorHandler.setAckAfterHandle(true);
defaultErrorHandler.setResetStateOnRecoveryFailure(false);
return defaultErrorHandler;
}
...
- KafkaListener.java
...
@KafkaListener(topics = TOPIC_NAME, containerFactory = "kafkaListenerContainerFactory", groupId = "stock-adjustment-0")
public void subscribe(final String message, Acknowledgment ack) throws IOException {
log.info(String.format("Message Received : [%s]", message));
StockAdjustment stockAdjustment = StockAdjustment.deserializeJSON(message);
if(stockService.isAlreadyProcessedOrderId(stockAdjustment.getOrderId())) {
log.info(String.format("AlreadyProcessedOrderId : [%s]", stockAdjustment.getOrderId()));
} else {
if(stockAdjustment.getAdjustmentType().equals("REDUCE")) {
stockService.decreaseStock(stockAdjustment);
}
}
ack.acknowledge(); // <<< does not work!
}
...
- Stockservice.java
...
if(stockAdjustment.getQty() > stock.getAvailableStockQty()) {
throw new RuntimeException(String.format("Stock decreased Request [decreasedQty: %s][availableQty : %s]", stockAdjustment.getQty(), stock.getAvailableStockQty()));
}
...
此时服务层如上发生RuntimeException
时,根据Kafka设置,通过CommonErrorhandler
发行DLT
但是DLT发出后,原来的消息还在Kafka中,所以需要一个解决方案
我查了一下,发现我写的setting是通过SeekUtils.seekOrRecover()
处理的,如果不处理即使不处理最大尝试次数,也会出现异常,原来的offset是回滚而不处理提交。
根据文档,好像AfterRollbackProcessor
在失败时用默认值处理回滚,但我不知道怎么写代码即使失败也提交。
已编辑
以上代码和设置工作正常。
我以为会出现consumer lag,但是当我判断实际的逻辑代码(SeekUtils.seekOrRecover()
)并检查offset commit和lag时,我确认了正常工作。
我认为这是我的错误造成的。
永远不会删除记录(直到它们过期),更新消费者提交的偏移量。
使用 kafka-consumer-groups.sh
描述组以查看发送到 DLT 的失败记录的提交偏移量。
目前正在通过spring kafka制作一个简单的例子。
如果服务层出现异常,我想重试后再提交原来的偏移量,加载到死信队列中
但是dead letter queue加载正常,但是由于commit没有处理,原来的消息还在kafka中。
向您展示我的代码,如下所示。
- KafkaConfig.java
...
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(kafkaListenerErrorHandler());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return factory;
}
private CommonErrorHandler kafkaListenerErrorHandler() {
DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(template, DEAD_TOPIC_DESTINATION_RESOLVER),
new FixedBackOff(1000, 3));
defaultErrorHandler.setCommitRecovered(true);
defaultErrorHandler.setAckAfterHandle(true);
defaultErrorHandler.setResetStateOnRecoveryFailure(false);
return defaultErrorHandler;
}
...
- KafkaListener.java
...
@KafkaListener(topics = TOPIC_NAME, containerFactory = "kafkaListenerContainerFactory", groupId = "stock-adjustment-0")
public void subscribe(final String message, Acknowledgment ack) throws IOException {
log.info(String.format("Message Received : [%s]", message));
StockAdjustment stockAdjustment = StockAdjustment.deserializeJSON(message);
if(stockService.isAlreadyProcessedOrderId(stockAdjustment.getOrderId())) {
log.info(String.format("AlreadyProcessedOrderId : [%s]", stockAdjustment.getOrderId()));
} else {
if(stockAdjustment.getAdjustmentType().equals("REDUCE")) {
stockService.decreaseStock(stockAdjustment);
}
}
ack.acknowledge(); // <<< does not work!
}
...
- Stockservice.java
...
if(stockAdjustment.getQty() > stock.getAvailableStockQty()) {
throw new RuntimeException(String.format("Stock decreased Request [decreasedQty: %s][availableQty : %s]", stockAdjustment.getQty(), stock.getAvailableStockQty()));
}
...
此时服务层如上发生RuntimeException
时,根据Kafka设置,通过CommonErrorhandler
发行DLT
但是DLT发出后,原来的消息还在Kafka中,所以需要一个解决方案
我查了一下,发现我写的setting是通过SeekUtils.seekOrRecover()
处理的,如果不处理即使不处理最大尝试次数,也会出现异常,原来的offset是回滚而不处理提交。
根据文档,好像AfterRollbackProcessor
在失败时用默认值处理回滚,但我不知道怎么写代码即使失败也提交。
已编辑
以上代码和设置工作正常。
我以为会出现consumer lag,但是当我判断实际的逻辑代码(SeekUtils.seekOrRecover()
)并检查offset commit和lag时,我确认了正常工作。
我认为这是我的错误造成的。
永远不会删除记录(直到它们过期),更新消费者提交的偏移量。
使用 kafka-consumer-groups.sh
描述组以查看发送到 DLT 的失败记录的提交偏移量。