Flink 中 Streaming Join 的实现

Implementation of Streaming Join in Flink

正在看Flink中join的各种实现。在批处理模式下,我遇到了 hybrid-hash joinsort-merge join。在这两种情况下,在连接之前都会进行阻塞改组,因此连接之前运算符的输出被具体化为一些非临时存储,如here

我现在正在查看流连接案例。我见过一个实现,其中为两个输入创建了两个散列 table。每当输入出现时,它都会保存在其散列 table 中,并且还会针对其他散列 table 进行探测以产生结果。为了限制散列 table 的大小,我们在散列 table 中放置了一个 window,为其保存了一个输入。我的第一个问题是:

Do all stream join cases have this requirement of a window?

具体来说,我想讨论连接的实现,其中将大型静态 customers table 与 Orders 流连接起来。在我看来,物理实现应该是这样的:

customers table 首先是散列分区。然后,orders流开始流入。由于执行模式是streaming,因此orders table直接发送到join tasks,没有任何实体化。

flink有没有这样的join,或者我可以在Flink中实现吗?

嗯,这正是 BATCH 的实施方式。

STREAMING 中,您没有完整的客户 table,因为根据定义它是无限的。

对于BATCH,我只引用这个post from their official blog:

Flink has streaming runtime operators for many operations, but also specialized operators for bounded inputs [...] The batch join can read one input fully into a hash table and then probe with the other input. The stream join needs to build tables for both sides, because it needs to continuously process both inputs

这个 link 也有关于输入大小的信息:它可以溢出到磁盘。不需要开窗(但如果您指定它,它肯定会帮助您保持 performance/deployment 大小要求)


现在,如果您处于 STREAMING 模式并且知道一侧不会改变,您仍然可以将其告知 Flink,以便它围绕这一点进行优化。 Use JOIN <table> FOR SYSTEM_TIME AS OF <table>.{ proctime | rowtime } for that effect:

Temporal joins take an arbitrary table (left input/probe site) and correlate each row to the corresponding row’s relevant version in the versioned table (right input/build side)

但是请注意,如果您使用 JDBC,这些探测端请求将直接通过 Flink 并在数据库中查找(确保您在连接上有一个索引键)