spring kafka offset increment even auto commit offset set to false

spring kafka offset increment even auto commit offset is set to false

我正在尝试为 kafka 上收到的消息实施 manual offset commit。我已将偏移量提交设置为 false,但偏移量值一直在增加。

不知道是什么原因。需要帮助解决问题。

下面是代码

application.yml

spring:
  application:
    name: kafka-consumer-sample
  resources:
    cache:
      period: 60m

kafka:
      bootstrapServers: localhost:9092
      options:
        enable:
          auto:
            commit: false

KafkaConfig.java

@Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return new DefaultKafkaConsumerFactory<>(config);
    }

 @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

KafkaConsumer.java

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "#{'${kafka-consumer.topics}'.split(',')}", groupId = "${kafka-consumer.groupId}")
    public void consume(ConsumerRecord<String, String> record) {

        System.out.println("Consumed Kafka Record: " + record);
        record.timestampType();
        System.out.println("record.timestamp() = " + record.timestamp());
        System.out.println("***********************************");
        System.out.println(record.timestamp());
        System.out.println("record.key() = " + record.key());
        System.out.println("Consumed String Message : " + record.value());
    }
}

输出如下

Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 31, CreateTime = 1573570989565, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 10)
record.timestamp() = 1573570989565
***********************************
1573570989565
record.key() = null
Consumed String Message : 10
Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 32, CreateTime = 1573570991535, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 11)
record.timestamp() = 1573570991535
***********************************
1573570991535
record.key() = null
Consumed String Message : 11

属性如下

auto.commit.interval.ms = 100000000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mygroup
heartbeat.interval.ms = 3000

这是在我重启消费者之后。我希望更早的数据也能打印出来。

我的理解对吗? 请注意,我正在重新启动我的 springboot 应用程序,希望消息从头开始。我的 kafka 服务器和 zookeeper 没有终止。

如果auto确认被禁用属性 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,那么你必须将容器级别的确认模式设置为MANUAL并且不要提交 offset 因为默认情况下它设置为 BATCH.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    return factory;
}

因为禁用自动确认时 container 级别确认设置为 BATCH

public void setAckMode(ContainerProperties.AckMode ackMode)

设置自动确认(在配置属性中)为假时使用的确认模式。

  1. RECORD:每条记录传递给侦听器后确认。
  2. BATCH:在从消费者收到的每批记录传递给侦听器后确认
  3. TIME:在此毫秒数后确认; (应该大于#setPollTimeout(long) pollTimeout.
  4. COUNT:至少收到此数量的记录后确认
  5. 手动:侦听器负责确认 - 使用 AcknowledgingMessageListener。

参数:

ackMode - the ContainerProperties.AckMode; default BATCH.

Committing Offsets

Several options are provided for committing offsets. If the enable.auto.commit consumer property is true, Kafka auto-commits the offsets according to its configuration. If it is false, the containers support several AckMode settings (described in the next list). The default AckMode is BATCH. Starting with version 2.3, the framework sets enable.auto.commit to false unless explicitly set in the configuration. Previously, the Kafka default (true) was used if the property was not set.

如果您想始终从头开始阅读,则必须将 属性 auto.offset.reset 设置为 earliest

config.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest");

注意:确保groupId必须是新的,在kafka

中没有任何偏移量