从头开始阅读 Kafka Topic

Read Kafka Topic from beginning

我有一个 spring 启动应用程序,它不断读取来自特定主题的所有消息。 我的问题是我有一个问题,当应用程序重新启动时,它需要再次读取主题中的所有消息。 我认为这两个选项结合起来就可以了,但它不起作用:

这是我的方法;

 @StreamListener(myTopic)
 public void handle(@Payload Input input) {
    /** Do other stuff not related **/
 }

这是我的 application.yaml

spring:
  cloud.stream:
    bindings:
      myTopic:
        destination: TOPIC
        content-type: application/**avro
        group: group-topic
        concurrency: 1
        resetOffsets: true
        startOffset: earliest

您需要将 application.yaml 中的 group 更改为一个新的且唯一的组名称(参见下面的示例,其中我将消费者组设置为 new-consumer-group -id:

spring:
  cloud.stream:
    bindings:
      myTopic:
        destination: TOPIC
        content-type: application/**avro
        group: new-consumer-group-id
        concurrency: 1
        resetOffsets: true
        startOffset: earliest

如果继续使用同一个ConsumerGroup,配置resetOffsetstartingOffset不会有任何影响。

作为替代方案,您可以使用命令行工具 kafka-consumer-groups.sh 为每个消费者组重置偏移量。模板如下:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
 --execute --reset-offsets \
 --group groupA\
 --topic topicC \
 --partition 0 \
 --to-offset <insert number>