从单个 Kafka 主题消费的多个风暴拓扑
Multiple storm topologies consuming from a single Kafka topic
Storm 提供的性能调优文档指出扩展多个并行拓扑的绝对最佳性能可以产生比简单扩展 worker 更好的性能。
我正在尝试将这一理论与缩放工作者进行比较。
但是,使用版本 1.2.1 时,风暴 Kafka spout 在多个不同拓扑中的表现并不像我预期的那样。
为单个主题的所有拓扑中的 kafka spout 消费者设置一个通用的 client.id 和 group.id,每个拓扑仍然订阅所有可用的分区和重复的元组,并且已经抛出错误提交的元组被重新提交。
我对这种行为感到惊讶,因为我假设消费者 API 会支持这个相当简单的用例。
如果有人能解释一下,我将不胜感激
- kafka spout这个行为的实现逻辑是什么?
- 有什么办法解决这个问题吗?
spout 的默认行为是使用 KafkaConsumer.assign
API 将主题的所有分区分配给拓扑中的工作人员。这就是您所看到的行为。对于这种行为,您不应在拓扑之间共享组 ID。
如果您想更好地控制将哪些分区分配给哪些工作程序或拓扑,您可以实现 TopicFilter 接口,并将其传递给您的 KafkaSpoutConfig
。这应该让你做你想做的事。
关于 运行 多个拓扑更快,我假设您指的是文档中的这一部分:In multiworker mode, messages often cross worker process boundaries. For performance sensitive cases, if it is possible to configure a topology to run as many single-worker instances [...] it may yield significantly better throughput and latency
。这里的 objective 是为了避免在 worker 之间发送消息,而是将每个分区的处理保留在一个 worker 内部。如果你想避免 运行 许多拓扑,你可以考虑自定义 Storm 调度程序以使其分配,例如每个工人都有一份完整的管道副本。这样,如果您使用 localOrShuffleGrouping
,总会有一个本地 bolt 可以发送到,因此您不必通过网络发送给另一个 worker。
Storm 提供的性能调优文档指出扩展多个并行拓扑的绝对最佳性能可以产生比简单扩展 worker 更好的性能。
我正在尝试将这一理论与缩放工作者进行比较。
但是,使用版本 1.2.1 时,风暴 Kafka spout 在多个不同拓扑中的表现并不像我预期的那样。
为单个主题的所有拓扑中的 kafka spout 消费者设置一个通用的 client.id 和 group.id,每个拓扑仍然订阅所有可用的分区和重复的元组,并且已经抛出错误提交的元组被重新提交。
我对这种行为感到惊讶,因为我假设消费者 API 会支持这个相当简单的用例。
如果有人能解释一下,我将不胜感激
- kafka spout这个行为的实现逻辑是什么?
- 有什么办法解决这个问题吗?
spout 的默认行为是使用 KafkaConsumer.assign
API 将主题的所有分区分配给拓扑中的工作人员。这就是您所看到的行为。对于这种行为,您不应在拓扑之间共享组 ID。
如果您想更好地控制将哪些分区分配给哪些工作程序或拓扑,您可以实现 TopicFilter 接口,并将其传递给您的 KafkaSpoutConfig
。这应该让你做你想做的事。
关于 运行 多个拓扑更快,我假设您指的是文档中的这一部分:In multiworker mode, messages often cross worker process boundaries. For performance sensitive cases, if it is possible to configure a topology to run as many single-worker instances [...] it may yield significantly better throughput and latency
。这里的 objective 是为了避免在 worker 之间发送消息,而是将每个分区的处理保留在一个 worker 内部。如果你想避免 运行 许多拓扑,你可以考虑自定义 Storm 调度程序以使其分配,例如每个工人都有一份完整的管道副本。这样,如果您使用 localOrShuffleGrouping
,总会有一个本地 bolt 可以发送到,因此您不必通过网络发送给另一个 worker。