从头开始阅读 Kafka Topic
Read Kafka Topic from beginning
我有一个 spring 启动应用程序,它不断读取来自特定主题的所有消息。
我的问题是我有一个问题,当应用程序重新启动时,它需要再次读取主题中的所有消息。
我认为这两个选项结合起来就可以了,但它不起作用:
- resetOffsets: true
- startOffset: 最早
这是我的方法;
@StreamListener(myTopic)
public void handle(@Payload Input input) {
/** Do other stuff not related **/
}
这是我的 application.yaml
spring:
cloud.stream:
bindings:
myTopic:
destination: TOPIC
content-type: application/**avro
group: group-topic
concurrency: 1
resetOffsets: true
startOffset: earliest
您需要将 application.yaml 中的 group
更改为一个新的且唯一的组名称(参见下面的示例,其中我将消费者组设置为 new-consumer-group -id:
spring:
cloud.stream:
bindings:
myTopic:
destination: TOPIC
content-type: application/**avro
group: new-consumer-group-id
concurrency: 1
resetOffsets: true
startOffset: earliest
如果继续使用同一个ConsumerGroup,配置resetOffset
和startingOffset
不会有任何影响。
作为替代方案,您可以使用命令行工具 kafka-consumer-groups.sh
为每个消费者组重置偏移量。模板如下:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--execute --reset-offsets \
--group groupA\
--topic topicC \
--partition 0 \
--to-offset <insert number>
我有一个 spring 启动应用程序,它不断读取来自特定主题的所有消息。 我的问题是我有一个问题,当应用程序重新启动时,它需要再次读取主题中的所有消息。 我认为这两个选项结合起来就可以了,但它不起作用:
- resetOffsets: true
- startOffset: 最早
这是我的方法;
@StreamListener(myTopic)
public void handle(@Payload Input input) {
/** Do other stuff not related **/
}
这是我的 application.yaml
spring:
cloud.stream:
bindings:
myTopic:
destination: TOPIC
content-type: application/**avro
group: group-topic
concurrency: 1
resetOffsets: true
startOffset: earliest
您需要将 application.yaml 中的 group
更改为一个新的且唯一的组名称(参见下面的示例,其中我将消费者组设置为 new-consumer-group -id:
spring:
cloud.stream:
bindings:
myTopic:
destination: TOPIC
content-type: application/**avro
group: new-consumer-group-id
concurrency: 1
resetOffsets: true
startOffset: earliest
如果继续使用同一个ConsumerGroup,配置resetOffset
和startingOffset
不会有任何影响。
作为替代方案,您可以使用命令行工具 kafka-consumer-groups.sh
为每个消费者组重置偏移量。模板如下:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--execute --reset-offsets \
--group groupA\
--topic topicC \
--partition 0 \
--to-offset <insert number>