Apache Flink 中的 Hash Join 和 Sort 合并异常

Hash Join and Sort merger exception in Apache Flink

集群基础设施:

我们有 4 个节点的 Flink 独立集群,每个节点有 16 个内核 CPU 和 32Gb 物理内存,其中 16GB 设置为 Flink 托管内存,其余全部设置为 UDFs 和 Java堆。 因此,每个插槽,我们分配了 1 个内核和 1GB 内存。

场景说明:

我们正在尝试连接两个数据集 A 和 B,其中数据集 A 是 的元组,数据集 B 有一个 自定义 POJO 两个数据集的连接键是 String.

对于这两种数据集大小都没有保证,在一个时间点 A 可以很大,而在另一个时间点数据集 B 可以更大。此外,一个数据集很可能包含多个重复条目列表。

例如: 数据集 A 的信息为 size = 51 mb
数据集 B 可能包含大小为 171 mb
的信息 joining key: Location example, Mumbai, NewYork etc.

因此,为了加入这个,我们选择了一个 joinHint 策略作为 Repartition_Hash_First。这个策略有时工作正常,有时会抛出以下异常,

java.lang.RuntimeException: Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident.
Probable cause Too many duplicate keys.

所以我们尝试使用 Repartition_Hash_Second 但结果是一样的。

因此,根据我的理解,Flink 在内部为提供即第一或第二的一侧创建了一个散列 table,数据的另一侧被迭代到散列 table,反之亦然,因为其中一个键有很多数据,在创建哈希时无法容纳到 flink 的实际内存中table 它抛出 太多重复键 .[= 的异常12=]

所以在第二步,我们尝试用 Repartition_Sort_merge 来实现它,但我们得到了以下异常,

java.lang.Exception:caused an error obtaining the sort input. the record exceeds maximum size of sort buffer.

如果我们需要将 flink 托管内存增加到 2 GB 甚至更多,谁能建议我?还是我们应该选择一些不同的策略来处理这个问题?

我很清楚你的问题是有一个太大的重复组。

此外,重复组可能在两侧,为该组产生 O(n^2) 大小,n 是最大重复组大小。

我建议您在可能的情况下事先对双方进行重复数据删除,例如使用 DeduplicateKeepLastRowFunction 之类的东西。或者使用行中的其他数据构建更精细的键。