使用 Spring Kafka / Spring Boot 从主题的开头(偏移量 = 0)读取
Reading from beginning (offset = 0) of topic with Spring Kafka / Spring Boot
如果我有 @KafkaListener 注释,我如何在每次我的应用程序启动时从头开始阅读主题?
一种方法是使用匿名组管理:没有 id()
也没有 groupId()
@KafkaListener
。如果idIsGroup() == true
(默认),当然。
而且您还需要使用 @TopicPartition
,就像这里解释的那样:https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#manual-assignment
Starting with version 2.5.5, you can apply an initial offset to all assigned partitions:
topicPartitions = { @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
另一种方法是为您的 @KafkaListener
class 实现一个 AbstractConsumerSeekAware
并在您在 onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback)
中分配分区时调用它 ConsumerSeekCallback.seekToBeginning()
。 =22=]
如果我有 @KafkaListener 注释,我如何在每次我的应用程序启动时从头开始阅读主题?
一种方法是使用匿名组管理:没有 id()
也没有 groupId()
@KafkaListener
。如果idIsGroup() == true
(默认),当然。
而且您还需要使用 @TopicPartition
,就像这里解释的那样:https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#manual-assignment
Starting with version 2.5.5, you can apply an initial offset to all assigned partitions:
topicPartitions = { @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
另一种方法是为您的 @KafkaListener
class 实现一个 AbstractConsumerSeekAware
并在您在 onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback)
中分配分区时调用它 ConsumerSeekCallback.seekToBeginning()
。 =22=]