当我重置 Spring 引导应用程序时消费者重新启动
Consumer restart when I reset Spring Boot app
我有一个包含数据的 Kafka 主题,名为 "topic01"
我想创建一个消费者,每次我启动 Spring Boot 2 应用程序时,都从头开始阅读该主题。
我有下面的代码,如果我在主题中添加了一些新的东西,如果它到达我,但是当第一次开始时,它不会从主题的开头读我。
@KafkaListener(topics = "topic01")
public void listenTopic01(ConsumerRecord<String, MiDTO> consumerRecord) throws Exception {
logger.info("KafkaHandler");
logger.info(consumerRecord.value().toString());
logger.info(consumerRecord.key().toString());
latch.countDown();
}
application.properties:
spring.kafka.consumer.group-id=XXXXX
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
我应该添加什么配置,以便这个@KafkaListener 每次重新启动我的应用程序时都从头读取主题。
要么每次都使用唯一(随机)组 ID,要么让您的侦听器 class 实施 ConsumerSeekAware
并添加
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
或
@KafkaListener(topics = "topic01",
groupId = "#{T(java.util.UUID).randomUUID().toString()}")
我有一个包含数据的 Kafka 主题,名为 "topic01"
我想创建一个消费者,每次我启动 Spring Boot 2 应用程序时,都从头开始阅读该主题。
我有下面的代码,如果我在主题中添加了一些新的东西,如果它到达我,但是当第一次开始时,它不会从主题的开头读我。
@KafkaListener(topics = "topic01")
public void listenTopic01(ConsumerRecord<String, MiDTO> consumerRecord) throws Exception {
logger.info("KafkaHandler");
logger.info(consumerRecord.value().toString());
logger.info(consumerRecord.key().toString());
latch.countDown();
}
application.properties:
spring.kafka.consumer.group-id=XXXXX
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
我应该添加什么配置,以便这个@KafkaListener 每次重新启动我的应用程序时都从头读取主题。
要么每次都使用唯一(随机)组 ID,要么让您的侦听器 class 实施 ConsumerSeekAware
并添加
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
或
@KafkaListener(topics = "topic01",
groupId = "#{T(java.util.UUID).randomUUID().toString()}")