Kafka Spout 在 Storm Topology 上读取了两次消息
Kafka Spout read twice message on Storm Topology
我正在尝试使用 Kafka 模拟流式传输到 Storm。我使用 KafkaSpout 从生产者发送的一个主题中读取一条消息,生产者阅读这些推文并将它们发送到一个主题。我的问题是,在拓扑消耗了该主题中发送的所有推文后,它继续读取该主题中的消息两次。如何阻止 KafkaSpout 读取两次?(复制因子设置为 1)
配置对我来说很好。
可能是双重确认问题。确保你只在 execute
.
中确认每个元组一次
如评论中所述,请考虑升级到更新的 Kafka 版本,以及切换到 storm-kafka-client
。
还有一些可能会让您的生活更轻松的事情:考虑延长 BaseBasicBolt
而不是 BaseRichBolt
。如果 运行 execute
没有抛出错误,BaseBasicBolt
会自动为您确认元组。如果你想让元组失败,你可以抛出 FailedException
。 BaseRichBolt
只应在您想进行更复杂的确认时使用,例如在 acking 之前从许多 execute
调用 in-memory 中聚合元组。
我正在尝试使用 Kafka 模拟流式传输到 Storm。我使用 KafkaSpout 从生产者发送的一个主题中读取一条消息,生产者阅读这些推文并将它们发送到一个主题。我的问题是,在拓扑消耗了该主题中发送的所有推文后,它继续读取该主题中的消息两次。如何阻止 KafkaSpout 读取两次?(复制因子设置为 1)
配置对我来说很好。
可能是双重确认问题。确保你只在 execute
.
如评论中所述,请考虑升级到更新的 Kafka 版本,以及切换到 storm-kafka-client
。
还有一些可能会让您的生活更轻松的事情:考虑延长 BaseBasicBolt
而不是 BaseRichBolt
。如果 运行 execute
没有抛出错误,BaseBasicBolt
会自动为您确认元组。如果你想让元组失败,你可以抛出 FailedException
。 BaseRichBolt
只应在您想进行更复杂的确认时使用,例如在 acking 之前从许多 execute
调用 in-memory 中聚合元组。