防止 Apache Beam / Dataflow 流式传输 (python) 管道中的融合以消除管道瓶颈
Prevent fusion in Apache Beam / Dataflow streaming (python) pipelines to remove pipeline bottleneck
我们目前正在使用 DataflowRunner 在 Apache Beam 上开发流式管道。我们正在读取来自 Pub/Sub 的消息并对它们进行一些处理,然后我们 window 在滑动 windows 中读取它们(目前 window 大小为 3 秒,间隔为 3 秒以及)。一旦触发了 window,我们就对 window 中的元素进行一些 post 处理。这个 post 处理步骤明显大于 window 大小,大约需要 15 秒。
管道的apache beam代码:
input = ( pipeline | beam.io.ReadFromPubSub(subscription=<subscription_path>)
| beam.Map(process_fn))
windows = input | beam.WindowInto(beam.window.SlidingWindows(3, 3),
trigger=AfterCount(30),
accumulation_mode = AccumulationModel.DISCARDING)
group = windows | beam.GroupByKey()
group | beam.Map(post_processing_fn)
如您所知,Dataflow 会尝试对您的流水线步骤进行一些优化。在我们的例子中,它融合了从 windowing 开始的所有内容(集群操作:1/ 处理 2/ windowing + post-processing),这导致了缓慢的顺序 post-仅由一名工人处理所有 windows。我们每 15 秒就会看到管道正在处理下一个 window 的日志。但是,我们希望让多个工作人员单独接手 windows,而不是将工作量交给一个工作人员。
因此,我们一直在寻找防止这种融合发生的方法,因此 Dataflow 将 window 与 post 的 windows 处理分开。通过这种方式,我们希望 Dataflow 能够再次将多个工作人员分配给 post-处理被解雇的 windows.
到目前为止我们已经尝试过的:
- 增加工人数量到20、30甚至40,但没有效果。只有 windowing 之前的步骤会分配给多个工作人员
- 运行 流水线 5 或 10 分钟,但我们注意到在 windowing[=35= 之后没有工作人员重新分配来帮助这个更大的 post 处理步骤]
- windowing 后,将它们放回全局 window
- 用虚拟密钥模拟另一个 GroupByKey(如 https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion 中所述)但没有成功。
最后两个动作确实创建了第三个集群操作(1/ processing 2/ windowing 3/ post-processing ) 但我们注意到在 windowing.
之后仍然是同一个工作人员执行所有操作
是否有任何解决方案可以解决此问题陈述?
我们现在正在考虑的当前解决方法是构建另一个接收 windows 的流式管道,以便这些工作人员可以并行处理 windows,但这很麻烦..
你在打破元素融合方面做了正确的事情。我怀疑可能是某个问题给您带来了麻烦。
对于流式处理,单个密钥总是在同一个工作程序中处理。有没有机会,您的所有或大部分记录都分配给了一个键?如果是这样,您的处理将由一个工人完成。
你可以做的是让 window 成为键的一部分,这样多个 windows 的元素可以在不同的工人中处理,即使他们有相同的密钥:
class KeyIntoKeyPlusWindow(core.DoFn):
def process(self, element, window=core.DoFn.WindowParam):
key, values = element
yield ((key, window), element)
group = windows | beam.ParDo(KeyIntoKeyPlusWindow() | beam.GroupByKey()
完成后,您可以应用 post-processing:
group | beam.Map(post_processing_fn)
我们目前正在使用 DataflowRunner 在 Apache Beam 上开发流式管道。我们正在读取来自 Pub/Sub 的消息并对它们进行一些处理,然后我们 window 在滑动 windows 中读取它们(目前 window 大小为 3 秒,间隔为 3 秒以及)。一旦触发了 window,我们就对 window 中的元素进行一些 post 处理。这个 post 处理步骤明显大于 window 大小,大约需要 15 秒。
管道的apache beam代码:
input = ( pipeline | beam.io.ReadFromPubSub(subscription=<subscription_path>)
| beam.Map(process_fn))
windows = input | beam.WindowInto(beam.window.SlidingWindows(3, 3),
trigger=AfterCount(30),
accumulation_mode = AccumulationModel.DISCARDING)
group = windows | beam.GroupByKey()
group | beam.Map(post_processing_fn)
如您所知,Dataflow 会尝试对您的流水线步骤进行一些优化。在我们的例子中,它融合了从 windowing 开始的所有内容(集群操作:1/ 处理 2/ windowing + post-processing),这导致了缓慢的顺序 post-仅由一名工人处理所有 windows。我们每 15 秒就会看到管道正在处理下一个 window 的日志。但是,我们希望让多个工作人员单独接手 windows,而不是将工作量交给一个工作人员。
因此,我们一直在寻找防止这种融合发生的方法,因此 Dataflow 将 window 与 post 的 windows 处理分开。通过这种方式,我们希望 Dataflow 能够再次将多个工作人员分配给 post-处理被解雇的 windows.
到目前为止我们已经尝试过的:
- 增加工人数量到20、30甚至40,但没有效果。只有 windowing 之前的步骤会分配给多个工作人员
- 运行 流水线 5 或 10 分钟,但我们注意到在 windowing[=35= 之后没有工作人员重新分配来帮助这个更大的 post 处理步骤]
- windowing 后,将它们放回全局 window
- 用虚拟密钥模拟另一个 GroupByKey(如 https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion 中所述)但没有成功。
最后两个动作确实创建了第三个集群操作(1/ processing 2/ windowing 3/ post-processing ) 但我们注意到在 windowing.
之后仍然是同一个工作人员执行所有操作是否有任何解决方案可以解决此问题陈述?
我们现在正在考虑的当前解决方法是构建另一个接收 windows 的流式管道,以便这些工作人员可以并行处理 windows,但这很麻烦..
你在打破元素融合方面做了正确的事情。我怀疑可能是某个问题给您带来了麻烦。
对于流式处理,单个密钥总是在同一个工作程序中处理。有没有机会,您的所有或大部分记录都分配给了一个键?如果是这样,您的处理将由一个工人完成。
你可以做的是让 window 成为键的一部分,这样多个 windows 的元素可以在不同的工人中处理,即使他们有相同的密钥:
class KeyIntoKeyPlusWindow(core.DoFn):
def process(self, element, window=core.DoFn.WindowParam):
key, values = element
yield ((key, window), element)
group = windows | beam.ParDo(KeyIntoKeyPlusWindow() | beam.GroupByKey()
完成后,您可以应用 post-processing:
group | beam.Map(post_processing_fn)