KafkaSpout 非常高的延迟

KafkaSpout very high latency

我是 apache storm 和 kafka 的新手,作为 POC 的一部分,我正在尝试使用 Kafka 和 apache storm 处理消息流。我正在使用来自 https://github.com/apache/storm/tree/master/external/storm-kafka 的 storm-kafka 源代码,我能够创建一个示例程序,它使用 KafkaSpout 从 kafka 主题读取消息并将其输出到另一个 kafka 主题。我有 3 个节点的 kafka(所有三个 运行 在同一台服务器上)集群并创建了具有 8 个分区的主题。我将 KafkaSpout 并行度设置为 8,bolt 的并行度也设置为 8,并尝试使用 8 个执行程序和任务。我已经尝试在 kafka 级别、SpoutConfig 级别和风暴级别设置很多 tunnig 参数,但我遇到了非常高的整体延迟问题。我需要消息处理 garuntee,所以确实需要 acking。 Storm 集群有 1 个 supervisor,zookeeper 有 3 个 noed,由 kafka 和 storm 共享。它是 运行 在 Red Hat Linux 机器上,具有 144MB RAM 和 16CPU。使用以下参数,我可能会得到非常高的 spout 进程延迟,大约 40 秒,我需要达到大约 50K msg/sec 级别,你能帮我进行配置以实现它吗?我浏览了各种网站上的很多帖子并尝试了很多调整选项但没有结果。

Storm config
topology.receiver.buffer.size=16
topology.transfer.buffer.size=4096
topology.executor.receive.buffer.size=16384
topology.executor.send.buffer.size=16384
topology.spout.max.batch.size=65536
topology.max.spout.pending=10000
topology.acker.executors=20

Kafka config
fetch.size.bytes=1048576
socket.timeout.ms=10000
fetch.max.wait=10000
buffer.size.bytes=1048576

提前致谢。

风暴UI截图

查看您的 UI 屏幕截图,似乎您的 spout 发出了更多数据,可以由您的 bolt 处理。两个 spout 都发出了大约 500K 条消息,但只有 250k 被确认(同样可以通过执行的 bolt 元组数量推断——大约 480K,这是两个 spout 发出的元组的一半)。 40 秒的延迟从一开始就是相同的值吗?还是延迟会随着时间增加?如果它随着时间的推移而增加,很明显你的螺栓是瓶颈。您有两个选择:

  1. 增加螺栓的平行度and/or
  2. 设置参数 spout.max.pending 以调节喷口输出速率

第一个选项 只有在您有足够的内核时才有意义(但到目前为止这应该不是问题,因为您提到了 16 个可用的 CPU)。 第二个选项是否适用于您,取决于您想要达到的吞吐量。你提到了 50K msg/sec 但 UI 没有显示当前的吞吐量数字(即 spout 输出率),因此我无法判断节流是否是一个选项。此外,您必须通过试错来确定 spout.max.pending 的最佳值(从 1000 的值开始对我来说似乎是合理的)。

您的拓扑有几个问题:

  1. 你的 spout executor 数量应该和 kafka 一样 分区
  2. 您的拓扑无法足够快地处理元组。我是 对元组如何没有因超时而开始失败感到惊讶。用一个 topology.max.spout.pending 的合理值,我推荐 150 或 250。这只会防止超时,你的喷口会慢慢消耗元组,因为拓扑的其余部分无法处理它。
  3. 您需要为螺栓添加更多的执行器,只有让更多的执行单元发挥作用,您的拓扑才能变得更快。执行器和线程不是一回事,你需要在拓扑中放入更多的执行器。您的单个 执行器延迟是 0,097,这意味着您的单个执行器每秒可以处理大约 10309 个元组 ;也就是说,要达到每秒 50k 的目标,您需要 至少 5 个执行程序 。我确信您的 16 cpu 机器可以使用超过 1 CPU 来处理螺栓。
  4. 任务的主要目的是在重新平衡期间将它们提升为执行者;因此 num tasks >= num executors.
  5. 如果您使用的是全局分组,则需要重新设计拓扑以改用字段分组之类的东西。

我不知道你的问题是否解决了,但是除了根据你的延迟要求调整topology.max.spout.pending,你还需要调整你的批量大小。 将 topology.spout.max.batch.size 设置为较低的数字可能有助于减少延迟。