Flink多表join结果不一致
Inconsistent results when joining multiple tables in Flink
我们定义了 4 个 CDC 源,我们需要将其中的数据合并为一个结果 table。我们正在使用 SQL API 为每个来源创建一个 table,例如:
"CREATE TABLE IF NOT EXISTS PAA31 (\n" +
" WRK_SDL_DEF_NO STRING,\n" +
" HTR_FROM_DT BIGINT,\n" +
...
" update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n" +
" PRIMARY KEY (WRK_SDL_DEF_NO) NOT ENFORCED,\n" +
" WATERMARK FOR update_time AS update_time\n" +
") WITH ('value.format' = 'debezium-json' ... )";
在我们定义每个 table 之后,我们通过 运行 以下查询创建一个新的 table:
"SELECT PAA30.WRK_SDL_DEF_NO as id,\n" +
" PAA33.DSB_TX as description,\n" +
...
"FROM PAA30\n" +
"INNER JOIN PAA33 ON PAA30.WRK_SDL_DEF_NO = PAA33.WRK_SDL_DEF_NO AND PAA33.LGG_CD = 'NL' \n" +
"INNER JOIN PAA31 ON PAA30.WRK_SDL_DEF_NO = PAA31.WRK_SDL_DEF_NO \n" +
"INNER JOIN PAA32 ON PAA30.WRK_SDL_DEF_NO = PAA32.WRK_SDL_DEF_NO";
请注意,由于格式原因,某些行已被省略。
我们 运行 遇到的问题是执行这个确切的作业会导致不一致的结果,有时我们有 1750 个结果行(正确),但大多数时候结果行较少且随机。
这是 Flink 作业的计划概览。从源发送的记录数都是正确的,但是第一个连接语句发送的记录数不正确:
Flink 作业执行计划和数量
可能是什么原因以及我们如何才能一致地连接所有数据源?
我看到您的管道包含一个带有处理时间触发器的事件时间 window,并且对 out-of-order 事件进行了零容忍水印。这些可能会导致问题。
如果没有延迟事件,Flink 只能为涉及事件时间逻辑的流式工作负载生成完全正确、确定的结果。每当处理时间逻辑干扰水印时,就会发生延迟事件,例如,
- 如果水印生成器配置不正确,并且没有考虑到实际的 out-of-orderness
- 如果使用空闲检测,并且非活动流变为 re-activated
- 重启(或恢复,或重新缩放)发生后
不过只是猜测。需要查看更多详细信息才能给出更明智的答案。一个最小的、可重现的例子是理想的。
更新:
流式作业也不会发出它们的最后一组结果,除非采取某些措施来激发它们这样做。在这种情况下,您可以使用
./bin/flink stop $JOB_ID --drain --savepointPath /tmp/flink-savepoints
强制发出大水印以关闭最后一个 window。
更新 2:
常规联接不会产生带有时间属性或水印的结果。这是因为无法保证结果将以任何特定顺序发出,因此有意义的水印是不可能的。通常不可能在这样的加入后应用事件时间 windowing。
更新 3:
现在研究了最新的代码,这显然与水印没有任何关系。
如果我没理解错的话,问题是虽然结果总是包含应该生成的内容,但有不同数量的额外输出记录。我可以提出两个可能的原因:
(1) 当 Flink 与 Debezium 服务器一起使用时,可能会出现重复事件。我不认为这是解释,但这是需要注意的事情。
(2) 连接的结果是non-deterministic(不同于运行 运行)。发生这种情况是因为各种输入流相互竞争,并且摄取来自不同流的相关事件的确切顺序正在影响结果的生成方式。
加入的结果是更新日志流。我怀疑当结果完美时,没有发生撤回,而在其他情况下,产生了一些初步结果,稍后会更新。
如果您检查输出流中的 ROW_KIND 信息,您应该能够确认这个猜测是否正确。
我对 Pulsar 连接器不是很熟悉,但我猜你应该使用 upsert_pulsar 接收器。
,即使对于更大的数据集,我们也能够获得一致的结果
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "500 ms");
configuration.setString("table.exec.mini-batch.size", "5000");
这似乎解决了本地文件系统连接器和 Flink Pulsar 连接器的一致性问题。
从这些发现来看,Flink 似乎在我们吞吐量的状态管理开销方面存在问题。我们仍然需要评估实际的 CDC 初始加载处理,但到目前为止启用 MiniBatch 聚合似乎很有希望
感谢@david-anderson 与我们一起思考并试图解决这个问题。
我们定义了 4 个 CDC 源,我们需要将其中的数据合并为一个结果 table。我们正在使用 SQL API 为每个来源创建一个 table,例如:
"CREATE TABLE IF NOT EXISTS PAA31 (\n" +
" WRK_SDL_DEF_NO STRING,\n" +
" HTR_FROM_DT BIGINT,\n" +
...
" update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n" +
" PRIMARY KEY (WRK_SDL_DEF_NO) NOT ENFORCED,\n" +
" WATERMARK FOR update_time AS update_time\n" +
") WITH ('value.format' = 'debezium-json' ... )";
在我们定义每个 table 之后,我们通过 运行 以下查询创建一个新的 table:
"SELECT PAA30.WRK_SDL_DEF_NO as id,\n" +
" PAA33.DSB_TX as description,\n" +
...
"FROM PAA30\n" +
"INNER JOIN PAA33 ON PAA30.WRK_SDL_DEF_NO = PAA33.WRK_SDL_DEF_NO AND PAA33.LGG_CD = 'NL' \n" +
"INNER JOIN PAA31 ON PAA30.WRK_SDL_DEF_NO = PAA31.WRK_SDL_DEF_NO \n" +
"INNER JOIN PAA32 ON PAA30.WRK_SDL_DEF_NO = PAA32.WRK_SDL_DEF_NO";
请注意,由于格式原因,某些行已被省略。
我们 运行 遇到的问题是执行这个确切的作业会导致不一致的结果,有时我们有 1750 个结果行(正确),但大多数时候结果行较少且随机。
这是 Flink 作业的计划概览。从源发送的记录数都是正确的,但是第一个连接语句发送的记录数不正确:
Flink 作业执行计划和数量
可能是什么原因以及我们如何才能一致地连接所有数据源?
我看到您的管道包含一个带有处理时间触发器的事件时间 window,并且对 out-of-order 事件进行了零容忍水印。这些可能会导致问题。
如果没有延迟事件,Flink 只能为涉及事件时间逻辑的流式工作负载生成完全正确、确定的结果。每当处理时间逻辑干扰水印时,就会发生延迟事件,例如,
- 如果水印生成器配置不正确,并且没有考虑到实际的 out-of-orderness
- 如果使用空闲检测,并且非活动流变为 re-activated
- 重启(或恢复,或重新缩放)发生后
不过只是猜测。需要查看更多详细信息才能给出更明智的答案。一个最小的、可重现的例子是理想的。
更新:
流式作业也不会发出它们的最后一组结果,除非采取某些措施来激发它们这样做。在这种情况下,您可以使用
./bin/flink stop $JOB_ID --drain --savepointPath /tmp/flink-savepoints
强制发出大水印以关闭最后一个 window。
更新 2:
常规联接不会产生带有时间属性或水印的结果。这是因为无法保证结果将以任何特定顺序发出,因此有意义的水印是不可能的。通常不可能在这样的加入后应用事件时间 windowing。
更新 3:
现在研究了最新的代码,这显然与水印没有任何关系。
如果我没理解错的话,问题是虽然结果总是包含应该生成的内容,但有不同数量的额外输出记录。我可以提出两个可能的原因:
(1) 当 Flink 与 Debezium 服务器一起使用时,可能会出现重复事件。我不认为这是解释,但这是需要注意的事情。
(2) 连接的结果是non-deterministic(不同于运行 运行)。发生这种情况是因为各种输入流相互竞争,并且摄取来自不同流的相关事件的确切顺序正在影响结果的生成方式。
加入的结果是更新日志流。我怀疑当结果完美时,没有发生撤回,而在其他情况下,产生了一些初步结果,稍后会更新。
如果您检查输出流中的 ROW_KIND 信息,您应该能够确认这个猜测是否正确。
我对 Pulsar 连接器不是很熟悉,但我猜你应该使用 upsert_pulsar 接收器。
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "500 ms");
configuration.setString("table.exec.mini-batch.size", "5000");
这似乎解决了本地文件系统连接器和 Flink Pulsar 连接器的一致性问题。
从这些发现来看,Flink 似乎在我们吞吐量的状态管理开销方面存在问题。我们仍然需要评估实际的 CDC 初始加载处理,但到目前为止启用 MiniBatch 聚合似乎很有希望
感谢@david-anderson 与我们一起思考并试图解决这个问题。