kafka消费者自动提交是如何工作的?

How does kafka consumer auto commit work?

我正在阅读this one:

Automatic Commit The easiest way to commit offsets is to allow the consumer to do it for you. If you configure enable.auto.commit=true, then every five seconds the consumer will commit the largest offset your client received from poll(). The five-second interval is the default and is controlled by setting auto.commit.interval.ms. Just like everything else in the consumer, the automatic commits are driven by the poll loop. Whenever you poll, the consumer checks if it is time to commit, and if it is, it will commit the offsets it returned in the last poll.

可能是我的英语不好,但我没有完全理解这个描述。

假设我使用默认间隔自动提交 - 5 秒,轮询每 7 秒发生一次。在这种情况下,提交将每 5 秒或每 7 秒发生一次?

如果轮询每 3 秒发生一次,您能否阐明行为?提交是每 5 秒还是每 6 秒发生一次?
我已阅读 this one:

Auto commits: You can set auto.commit to true and set the auto.commit.interval.ms property with a value in milliseconds. Once you've enabled this, the Kafka consumer will commit the offset of the last message received in response to its poll() call. The poll() call is issued in the background at the set auto.commit.interval.ms.

这与答案相矛盾。

你能详细解释一下这个东西吗?

假设我有这样的图表:

0 sec - poll
4 sec - poll
8 sec - poll

什么时候提交偏移量,什么时候提交?

每次轮询都会调用自动提交检查,检查经过的时间是否大于配置的时间。如果是,则提交偏移量。

如果提交间隔为 5 秒并且轮询在 7 秒后发生,则提交只会在 7 秒后发生。

它会尝试在轮询完成后尽快自动提交。你可以看看consumer coordinator的源码,里面定义了一组class级别的局部字段,用来了解是否启用自动提交,间隔时间是多少,下一个执行自动提交的截止日期是什么。

https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L625

以及轮询中执行存储调用的地方之一https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L279

据说每 7 秒执行一次轮询,自动提交设置为 5:

0 - poll, + set deadline to 5th second

7 - poll + commit due to deadline, update deadline to 7+5=12

14 - poll + commit due to deadline, update deadline to 12+5=17

但是,如果轮询设置为每 3 秒一次,并且自动提交设置为 5:

0 - poll, + set deadline to 5th second

3 - poll, no commit

6 - poll + commit due to deadline, update deadline to 6+5=11

看看下面的配置,它为 Kafka 消费者调优提供了另一个视角: 对于来自生产者的 30 条记录,如果消费者在 20 秒之前崩溃,那么整个 30 条记录集将被消费者再次读取,因为 max-poll-interval 和 auto-commit-interval 都设置为 20 秒

 auto-commit-interval: 20000
      auto-offset-reset: latest
      max-poll-records: 10
      max-poll-interval-ms: 20000

但是对于下面的配置,其中每 2 秒自动提交一次,并且如果消费者在任何时间点崩溃 > 2 秒,那么那些已经提交给 Kafka 生产者的记录将不会被消费者拾取再次.

 auto-commit-interval: 2000
      auto-offset-reset: latest
      max-poll-records: 10
      max-poll-interval-ms: 20000

此外,auto-commit-interval 始终优先于 max-poll-interval。如果由于某种奇怪的原因没有发生自动提交,那么在 20 秒的最大轮询间隔结束后,Kafka 经纪人会得出结论,消费者已经下线。

这里有一个简单的代码来测试它是如何工作的。

文档 -> https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html

public class KafkaTest {
    
    public static final String KAFKA_TOPIC_NAME = "kafka-xx-test-topic";
    public static final String CONSUMER_GROUP_ID = "test-consumer-xx";
    public static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        final KafkaProducer<Object, Object> kafkaProducer = new KafkaProducer<>(getProps());
        for (int i = 0; i < 1000; i++) {
            kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC_NAME, "Data_" + i));
        }
        final Consumer<Long, String> consumer = new KafkaConsumer<>(getProps());
        consumer.subscribe(Collections.singletonList(KAFKA_TOPIC_NAME));
        TopicPartition actualTopicPartition = new TopicPartition(KAFKA_TOPIC_NAME, 0);
        while (true) {
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(60));
            consumerRecords.forEach(record -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                }
            });
            final long committedOffset = consumer.committed(Collections.singleton(actualTopicPartition)).get(actualTopicPartition).offset();
            final long consumerCurrentOffset = consumer.position(actualTopicPartition);
            System.out.println("Poll finish.. consumer-offset: " + consumerCurrentOffset + " - committed-offset: " + committedOffset + " " + LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")));
        }
    }

    private static Map<String, Object> getProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //  Default: latest
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // Default: true
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // Default: 500
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Default: 5000
        return props;
    }
}
  • 每 2 秒轮询一次
  • 每 5 秒自动提交一次

输出如下

Poll finish.. consumer-offset: 1010 - committed-offset: 1000 17:07:05
Poll finish.. consumer-offset: 1020 - committed-offset: 1000 17:07:07
Poll finish.. consumer-offset: 1030 - committed-offset: 1000 17:07:09
Poll finish.. consumer-offset: 1040 - committed-offset: 1030 17:07:11 -> commit when poll finish because of elapsed time(6 sec) > commit interval(5 sec)
Poll finish.. consumer-offset: 1050 - committed-offset: 1030 17:07:13
Poll finish.. consumer-offset: 1060 - committed-offset: 1030 17:07:15
Poll finish.. consumer-offset: 1070 - committed-offset: 1060 17:07:17 -> auto commit 
Poll finish.. consumer-offset: 1080 - committed-offset: 1060 17:07:19
Poll finish.. consumer-offset: 1090 - committed-offset: 1060 17:07:21
Poll finish.. consumer-offset: 1100 - committed-offset: 1090 17:07:23 -> auto commit