如何手动提交不记录已通过 CommonErrorHandler 发送的 DLT 的偏移量

How to manual commit do not recorverd offset already sent DLT through CommonErrorHandler

目前正在通过spring kafka制作一个简单的例子。

如果服务层出现异常,我想重试后再提交原来的偏移量,加载到死信队列中

但是dead letter queue加载正常,但是由于commit没有处理,原来的消息还在kafka中。

向您展示我的代码,如下所示。

...
 @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setCommonErrorHandler(kafkaListenerErrorHandler());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    private CommonErrorHandler kafkaListenerErrorHandler() {
        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(template, DEAD_TOPIC_DESTINATION_RESOLVER),
            new FixedBackOff(1000, 3));
       
        defaultErrorHandler.setCommitRecovered(true);
        defaultErrorHandler.setAckAfterHandle(true);
        defaultErrorHandler.setResetStateOnRecoveryFailure(false);

        
        return defaultErrorHandler;
    }
...
...
@KafkaListener(topics = TOPIC_NAME, containerFactory = "kafkaListenerContainerFactory", groupId = "stock-adjustment-0")
    public void subscribe(final String message, Acknowledgment ack) throws IOException {

        log.info(String.format("Message Received : [%s]", message));

        StockAdjustment stockAdjustment = StockAdjustment.deserializeJSON(message);

        if(stockService.isAlreadyProcessedOrderId(stockAdjustment.getOrderId())) {
            log.info(String.format("AlreadyProcessedOrderId : [%s]", stockAdjustment.getOrderId()));
        } else {
            if(stockAdjustment.getAdjustmentType().equals("REDUCE")) {
                stockService.decreaseStock(stockAdjustment);
            }
        }
    ack.acknowledge(); // <<< does not work!
    }
...
...
        if(stockAdjustment.getQty() > stock.getAvailableStockQty()) {
            throw new RuntimeException(String.format("Stock decreased Request [decreasedQty: %s][availableQty : %s]", stockAdjustment.getQty(), stock.getAvailableStockQty()));
        }
...

此时服务层如上发生RuntimeException时,根据Kafka设置,通过CommonErrorhandler发行DLT

但是DLT发出后,原来的消息还在Kafka中,所以需要一个解决方案

我查了一下,发现我写的setting是通过SeekUtils.seekOrRecover()处理的,如果不处理即使不处理最大尝试次数,也会出现异常,原来的offset是回滚而不处理提交。

根据文档,好像AfterRollbackProcessor在失败时用默认值处理回滚,但我不知道怎么写代码即使失败也提交。

已编辑

以上代码和设置工作正常。

我以为会出现consumer lag,但是当我判断实际的逻辑代码(SeekUtils.seekOrRecover())并检查offset commit和lag时,我确认了正常工作。

我认为这是我的错误造成的。

永远不会删除记录(直到它们过期),更新消费者提交的偏移量。

使用 kafka-consumer-groups.sh 描述组以查看发送到 DLT 的失败记录的提交偏移量。