Flume 1.7&Kafka - 如何在题目开头重启?
Flume 1.7 & Kafka - How to restart at the beginning of the topic?
我正在使用 Flume 1.7 Kafka source
将数据从 Apache Kafka 提取到我的 AbstractSink
。在过去,我可以通过使用 ./kafka-consumer-groups.sh --delete
删除主题偏移量来重新启动主题开头的偏移量,但是由于 Flume 1.7(显然)使用 "new" 消费者,尝试 ./kafka-consumer-groups.sh --delete
现在给出以下错误信息:
Option [delete] is not valid with [new-consumer]. Note that there's no
need to delete group metadata for the new consumer as it is
automatically deleted when the last member leaves
那么,实现所需行为的推荐方法是什么(即我们将从主题开头重新处理数据)?
这是我的 flume 配置的一部分:
myagent.sources.my-kafka-source.type = org.apache.flume.source.kafka.KafkaSource
myagent.sources.my-kafka-source.kafka.bootstrap.servers = kafka.example.net:9092
myagent.sources.my-kafka-source.kafka.consumer.group.id = my-gid
myagent.sources.my-kafka-source.kafka.topics = my.topic
myagent.sources.my-kafka-source.kafka.auto.offset.reset = earliest
myagent.sources.my-kafka-source.channels = my_channel
Flume 不提供对倒带功能的直接支持,尽管 kafka 确实附带 KafkaConsumer#seek 允许您 re-consume 消息。似乎您必须使用新的组 ID 来执行此操作,这需要重新启动 Flume 代理。
我正在使用 Flume 1.7 Kafka source
将数据从 Apache Kafka 提取到我的 AbstractSink
。在过去,我可以通过使用 ./kafka-consumer-groups.sh --delete
删除主题偏移量来重新启动主题开头的偏移量,但是由于 Flume 1.7(显然)使用 "new" 消费者,尝试 ./kafka-consumer-groups.sh --delete
现在给出以下错误信息:
Option [delete] is not valid with [new-consumer]. Note that there's no need to delete group metadata for the new consumer as it is automatically deleted when the last member leaves
那么,实现所需行为的推荐方法是什么(即我们将从主题开头重新处理数据)?
这是我的 flume 配置的一部分:
myagent.sources.my-kafka-source.type = org.apache.flume.source.kafka.KafkaSource
myagent.sources.my-kafka-source.kafka.bootstrap.servers = kafka.example.net:9092
myagent.sources.my-kafka-source.kafka.consumer.group.id = my-gid
myagent.sources.my-kafka-source.kafka.topics = my.topic
myagent.sources.my-kafka-source.kafka.auto.offset.reset = earliest
myagent.sources.my-kafka-source.channels = my_channel
Flume 不提供对倒带功能的直接支持,尽管 kafka 确实附带 KafkaConsumer#seek 允许您 re-consume 消息。似乎您必须使用新的组 ID 来执行此操作,这需要重新启动 Flume 代理。