比较 Flink 中的 2 个数据流以检索丢失的数据

Compare 2 Data streams in Flink to retrieve missing data

我有 2 个数据流“dataStream1”和“dataStream2”,它们都是字符串类型,我已经用它们创建了 2 个表

   Table t1  = streamTableEnvironment.fromDataStream(dataStream1,$("a"),$("atime").proctime());
   Table t2 =  streamTableEnvironment.fromDataStream(dataStream2,$("b"),$("btime").proctime());

我需要在 5 秒的时间范围内获取 T1 中不存在于 T2 中的所有数据,就像如果我在 T1 中插入一条记录,它应该在 5 秒内存在于 T2 中,否则它们应该被收集,因为它们将被视为错误数据。

有什么提示吗?我尝试使用 SQL 但我没有看到正确的方法,例如我知道如何在 SQL 的时间间隔内毫无问题地获取公共数据...但我不知道如何获取丢失的数据,因为 SQL 中不存在无界流的减号运算符。

如果您出于某种原因需要并想要使用 SQL,您可以尝试在间隔连接上使用 IN 子句。因此,您基本上 select T1 中也在 T2 中的所有元素来创建 T3,然后只有 select T1 中不在 T3 中的元素使用 NOT IN 子句,如 documentation。请注意,这仅在元素是唯一的时才有效。

你可以尝试的另一件事是使用 CoProcessFunction 自己处理这个问题,所以你可以这样做:

dataStream1.keyBy(_).connect(datastream2.keyBy(_))
.process(new MyProcessFunction())

在函数内部您将简单地拥有一个状态,该状态将保留 dataStream1 中的每个元素,并且每当来自 dataStream2 的任何内容到达时您将检查您是否可以加入,如果它具有给定的时间戳边界,如果是的话,你会从状态中删除数据,因为它不会被发射。您还可以有一个已注册的计时器,它会发出所有未加入的元素。