消费者代码异常导致kafka消息丢失

losing kafka messages due to exceptions in consumer code

我编写了一个容易出错的消费者,它以 50% 的比率随机抛出异常。

这个消费者消费了所有消息,但只将其中的 85% 写入数据库。

问题出在哪里?

github: https://github.com/grzegorz-brzeczyszczykiewicz/kafka-losing-messages

我这样配置了自动提交

spring.cloud.stream.kafka.bindings.my-channel.consumer.autoCommitOffset=false
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.isolation-level=read_commited

我的消费者看起来像

package com.example.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import java.time.ZonedDateTime;
import java.util.Random;

@Configuration
@Component
@EnableTransactionManagement
@Slf4j
public class Consumer {

    @Autowired
    private MyDataRepository repository;
    private int methodCounter = 0;
    private int defectCounter = 0;

    //@Transactional
    @StreamListener(Stream.IN_CHANNEL)
    public void handleMessage(final Message<String> msg) throws Exception {
        //msg.getHeaders().forEach((key, value) -> System.out.println(key + ": " + value));
        final Acknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        System.out.println("payload: " + msg.getPayload());
        methodCounter++;
        System.out.println("method counter: " + methodCounter);
        if(!new Random().nextBoolean()){
            defectCounter++;
            System.out.println("defect counter: " + defectCounter);
            throw new Exception("kaputt");
        }

        System.out.println("");
        MyData data = new MyData();
        String[] split = msg.getPayload().split(" time: ");
        data.setMessage(split[0] + "_" + methodCounter + "_" + defectCounter);
        data.setTime(ZonedDateTime.parse(split[1]));
        repository.save(data);
//        if (acknowledgment!=null) {
//            System.out.println("ack nonnull");
            acknowledgment.acknowledge();
//        } else {
//            System.out.println("ack null");
//        }
    }
}

我找到了解决办法。只需将所有错误代码包装在一个 try catch 块中,然后在 catch 块中执行 nack()

我找到了解决办法。只需将所有错误代码包装在一个 try catch 块中,然后在 catch 块中执行 nack()