当我重置 Spring 引导应用程序时消费者重新启动

Consumer restart when I reset Spring Boot app

我有一个包含数据的 Kafka 主题,名为 "topic01"

我想创建一个消费者,每次我启动 Spring Boot 2 应用程序时,都从头开始阅读该主题。

我有下面的代码,如果我在主题中添加了一些新的东西,如果它到达我,但是当第一次开始时,它不会从主题的开头读我。

@KafkaListener(topics = "topic01")
public void listenTopic01(ConsumerRecord<String, MiDTO> consumerRecord) throws Exception {
    logger.info("KafkaHandler");
    logger.info(consumerRecord.value().toString());
    logger.info(consumerRecord.key().toString());
    latch.countDown();
}

application.properties:

spring.kafka.consumer.group-id=XXXXX
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

我应该添加什么配置,以便这个@KafkaListener 每次重新启动我的应用程序时都从头读取主题。

要么每次都使用唯一(随机)组 ID,要么让您的侦听器 class 实施 ConsumerSeekAware 并添加

@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
    consumer.seekToBeginning(partitions);
}

@KafkaListener(topics = "topic01", 
    groupId = "#{T(java.util.UUID).randomUUID().toString()}")