将 KafkaSpout 与 storm 一起使用,如何忽略旧消息?
Using KafkaSpout with storm, how to ignore old messages?
出于调试目的,当我启动拓扑时,我想忽略启动前 Kafka 队列中的任何消息。我相信这可以通过设置 spoutConfig.startOffsetTime
和 spoutConfig.useStartOffsetTimeIfOffsetOutOfRange
来完成。但我已经尝试将它们分别设置为 -1、-2、-3 和 true/false 的每个排列。我的拓扑在启动时继续从 Kafka 消耗。 (没有向 Kafka 发布新消息)。
有没有我可以用来忽略旧消息的配置?本质上,清除 Kafka 队列?
您是否正确配置了 Zookeeper?最后一个偏移量可以存储在那里,KafkaSpout
可以在启动时从 Zookeeper 读取最后一个偏移量。在这里查看更多详细信息:
出于调试目的,当我启动拓扑时,我想忽略启动前 Kafka 队列中的任何消息。我相信这可以通过设置 spoutConfig.startOffsetTime
和 spoutConfig.useStartOffsetTimeIfOffsetOutOfRange
来完成。但我已经尝试将它们分别设置为 -1、-2、-3 和 true/false 的每个排列。我的拓扑在启动时继续从 Kafka 消耗。 (没有向 Kafka 发布新消息)。
有没有我可以用来忽略旧消息的配置?本质上,清除 Kafka 队列?
您是否正确配置了 Zookeeper?最后一个偏移量可以存储在那里,KafkaSpout
可以在启动时从 Zookeeper 读取最后一个偏移量。在这里查看更多详细信息: