什么时候在 Spring Boot 的 KafkaListener.java 中提交偏移量?

When is offset committed in KafkaListener.java in Springboot?

我的问题分为两部分。

  1. 在下面的Springboot KafkaListener 实现中,什么时候为偏移策略auto-offset-reset: latestenable-auto-commit: true提交偏移量?是在消费者收到消息后立即执行还是在实现 KafkaListener 的整个方法完成后执行?

KafkaConsumer.java

@KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public ResponseEntity<String> consume(String message) {
        log.info("Message recieved from Kafka topic {}", message); // offset committed HERE?
        KafkaResponse kafkaResponse = new Gson().fromJson(message, KafkaResponse.class);
        myBusinessService.processKafkaResponse(kafkaResponse);
        return new ResponseEntity<>("Successfully Received", HttpStatus.OK);// OR offset committed HERE?
    }
  1. application.yml 中,对于以下属性,哪些陈述是 True/False/what 的正确答案?:
max-poll-records: 100
max-poll-interval-ms: 200000
enable-auto-commit: true 
auto-commit-interval: 3000
auto-offset-reset: latest
isolation-level: READ_UNCOMMITTED
fetch-max-bytes: 52428800

提前致谢!

最好不要使用enable.auto.commit=true。 Spring 以更确定的方式提交偏移量,在处理完轮询中的所有记录后(默认 - AckMode.BATCH)或在处理每个记录后 AckMode.RECORD.

enable.auto.commit 直到下一个 poll() 才会提交,然后只有 auto.commit.interval 已经过去。