Beam + Flink:使用 SDFBoundedSourceReader 时没有并行性

Beam + Flink: No parallelism when using SDFBoundedSourceReader

背景:我将 TFX 管道与 Flink 一起用作 Beam 的 运行ner(使用 flink-on-k8s-operator 的 flink 会话集群)。 Flink 集群有 2 个 taskmanager,每个 16 核,并行度设置为 32。TFX 组件调用 beam.io.ReadFromTFRecord 加载数据,传入一个 glob 文件模式。我有一个 TFRecords 数据集,分为 160 个文件。当我尝试 运行 组件时,所有 160 个文件的处理最终在 Flink 中的一个子任务中完成,即并行度实际上是 1。见下图:

我已经尝试了各种 Beam/Flink 选项和不同版本的 Beam/Flink 但行为保持不变。

此外,该行为会影响任何使用 apache_beam.io.iobase.SDFBoundedSourceReader 的东西,例如apache_beam.io.parquetio.ReadFromParquet也有同样的问题。我的配置中是否有一些模糊的设置,或者这是 Flink 运行ner 的错误?我还在互联网上进行了广泛的搜索,除了使用 beam.Reshuffle 的建议没有帮助外,找不到任何关于这个问题的提及。

从运行图中可以看出,operator节点之间是forward连接的,导致上游一个task的数据只能被下游一个task处理。 方案可以改变不同运营商的连接方式

public DataStream<T> rebalance() {
    return setConnectionType(new RebalancePartitioner<T>());
}

我遗漏的晦涩设置似乎是 Beam 管道选项中的 --experiments=pre_optimize=all。这导致以下代码 运行 和 RESHUFFLE 被包含在 Splittable DoFn 扩展中:https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1433

对于那些在未来阅读这篇文章的人来说,这适用于 Beam 2.32.0 和 Flink 1.13.2——这无疑会在某个时候发生变化,所以这个答案可能不再相关。