Apache Storm:如何从 Kafka Spout 微批处理事件

Apache Storm: How to micro batch events from Kafka Spout

我如何在 kafka spout 中对事件进行微批处理以减少后续 bolts 中的 IO 调用? 预期是:使用 kafka 中的事件发出最大大小为 100 的批次,但最多等待 1 秒以形成该批次。如果 1 秒内没有足够的事件,则发出可用的事件。

我可以通过 "source.groupedWithin" 方法在 Akka 中实现相同的功能。我如何对 kafka spout 做同样的事情?

查看 Storm 的 tick 元组,它提供了一种将预定元组(ticks)发送到螺栓的方法。对于您的情况,您可以每秒配置一个滴答声。与此同时,bolt 将简单地处理来自 Kafka spout 的元组并对它们进行批处理,当它达到 100 条消息(在你的情况下)或当你得到一个 tick 元组时发送一个批处理。请注意,您确实需要检查每个输入元组以查看它是滴答声还是 Kafka 消息。

除了 Chris 的回答,你还可以使用 Storm 的窗口功能 https://storm.apache.org/releases/2.0.0/Windowing.html. You can find an example of this at https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java

如果您愿意,也可以为此使用 Trident。设置 KafkaTridentSpoutOpaque 后,您可以使用 Kafka 客户端设置来控制每批中有多少消息。您将使用 KafkaSpoutConfig pollTimeoutMs 来设置您希望等待批次填充的时间,并通过 KafkaSpoutConfig.Builder.setProp 设置 max.poll.records Kafka 客户端配置来控制最大数量一批记录的数量。

有关使用 Kafka Trident spout 的完整示例,请参阅 https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java