Apache Flink 中的 Event-time Temporal Join 仅适用于小型数据集
Event-time Temporal Join in Apache Flink only works with small datasets
背景:我正在尝试使用从 CSV 文件读取的两个 'large(r)' datasets/tables 进行事件时间临时连接(左侧 table 中有 16K+ 行) , 右边 table) 稍微少一些。两个 tables 都是仅附加 tables,即它们的数据源当前是 CSV 文件,但将成为 Debezium 通过 Pulsar 发出的 CDC 变更日志。
我正在使用相当新的 SYSTEM_TIME AS OF
语法。
问题:连接结果只有部分正确,即在查询执行开始时(前 20% 左右),左侧的行 不是 与右侧的行匹配,而在理论上,它们应该。几秒钟后,有更多的匹配项,到查询结束时,左侧的行与右侧的行正确 matched/joined 。
每次我 运行 查询时,它都会根据哪些行(不)匹配显示其他结果。
两个数据集都没有按各自的事件时间排序。它们按主键排序。所以真的是this case,只是数据多了。
本质上,右侧是一个查找-table,它会随着时间的推移而变化,我们确信对于每个左侧记录都有一个匹配的右侧记录,因为它们都是在原始数据库中创建的在 +/- 同一时刻。最终我们的目标是一个动态物化视图,它包含与我们在启用 CDC 的源数据库(SQL 服务器)中加入 2 table 时相同的数据。
显然,我想实现正确 加入完整 数据集,如in the Flink docs
所述
与只有几行的小数据集(如 here)的简单示例和 Flink 测试代码不同,较大数据集的连接 不会 产生正确的结果。
我怀疑,当 probing/left table 开始流动时, build/right table 还没有 'in memory' 这意味着左边的行没有找不到匹配的右行,而他们应该——如果右 table 会更早开始流动的话。这就是为什么右边 table.
列的 left join
returns 为空值的原因
我已经包含了我的代码:
@Slf4j(topic = "TO_FILE")
public class CsvTemporalJoinTest {
private final String emr01Ddl =
"CREATE TABLE EMR01\n" +
"(\n" +
" SRC_NO STRING,\n" +
" JRD_ETT_NO STRING,\n" +
" STT_DT DATE,\n" +
" MGT_SLT_DT DATE,\n" +
" ATM_CRT_DT DATE,\n" +
" LTD_MDT_IC STRING,\n" +
" CPN_ORG_NO STRING,\n" +
" PTY_NO STRING,\n" +
" REG_USER_CD STRING,\n" +
" REG_TS TIMESTAMP,\n" +
" MUT_USER_CD STRING,\n" +
" MUT_TS TIMESTAMP(3),\n" +
" WATERMARK FOR MUT_TS AS MUT_TS,\n" +
" PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = '" + getCsv1() + "',\n" +
" 'format' = 'csv'\n" +
")";
private final String emr02Ddl =
"CREATE TABLE EMR02\n" +
"(\n" +
" CPN_ORG_NO STRING,\n" +
" DSB_TX STRING,\n" +
" REG_USER_CD STRING,\n" +
" REG_TS TIMESTAMP,\n" +
" MUT_USER_CD STRING,\n" +
" MUT_TS TIMESTAMP(3),\n" +
" WATERMARK FOR MUT_TS AS MUT_TS,\n" +
" PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = '" + getCsv2() + "',\n" +
" 'format' = 'csv'\n" +
")";
@Test
public void testEventTimeTemporalJoin() throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(emr01Ddl);
tableEnv.executeSql(emr02Ddl);
Table result = tableEnv.sqlQuery("" +
"SELECT *" +
" FROM EMR01" +
" LEFT JOIN EMR02 FOR SYSTEM_TIME AS OF EMR01.MUT_TS" +
" ON EMR01.CPN_ORG_NO = EMR02.CPN_ORG_NO");
tableEnv.toChangelogStream(result).addSink(new TestSink());
env.execute();
System.out.println("[Count]" + TestSink.values.size());
//System.out.println("[Row 1]" + TestSink.values.get(0));
//System.out.println("[Row 2]" + TestSink.values.get(1));
AtomicInteger i = new AtomicInteger();
TestSink.values.listIterator().forEachRemaining(value -> log.info("[Row " + i.incrementAndGet() + " ]=" + value));
}
private static class TestSink implements SinkFunction<Row> {
// must be static
public static final List<Row> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Row value, SinkFunction.Context context) {
values.add(value);
}
}
String getCsv1() {
try {
return new ClassPathResource("/GBTEMR01.csv").getFile().getAbsolutePath();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
String getCsv2() {
try {
return new ClassPathResource("/GBTEMR02.csv").getFile().getAbsolutePath();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
有办法解决吗?例如。有没有办法首先将右侧加载到 Flink 状态,然后启动 loading/streaming 左侧?这会是一个好方法吗,因为这个问题引出了:多久以后?左边什么时候可以开始流动?
我们正在使用 Flink 1.13.3。
这种 temporal/versioned 连接取决于是否有准确的水印。 Flink 依靠水印来知道哪些行可以从正在维护的状态中安全地删除(因为它们不再影响结果)。
您使用的水印表示行按 MUT_TS
排序。由于这不是事实,因此连接无法产生完整的结果。
要解决这个问题,水印应该这样定义
WATERMARK FOR MUT_TS AS MUT_TS - INTERVAL '2' MINUTE
其中区间表示需要容纳多少乱序。
背景:我正在尝试使用从 CSV 文件读取的两个 'large(r)' datasets/tables 进行事件时间临时连接(左侧 table 中有 16K+ 行) , 右边 table) 稍微少一些。两个 tables 都是仅附加 tables,即它们的数据源当前是 CSV 文件,但将成为 Debezium 通过 Pulsar 发出的 CDC 变更日志。
我正在使用相当新的 SYSTEM_TIME AS OF
语法。
问题:连接结果只有部分正确,即在查询执行开始时(前 20% 左右),左侧的行 不是 与右侧的行匹配,而在理论上,它们应该。几秒钟后,有更多的匹配项,到查询结束时,左侧的行与右侧的行正确 matched/joined 。 每次我 运行 查询时,它都会根据哪些行(不)匹配显示其他结果。
两个数据集都没有按各自的事件时间排序。它们按主键排序。所以真的是this case,只是数据多了。
本质上,右侧是一个查找-table,它会随着时间的推移而变化,我们确信对于每个左侧记录都有一个匹配的右侧记录,因为它们都是在原始数据库中创建的在 +/- 同一时刻。最终我们的目标是一个动态物化视图,它包含与我们在启用 CDC 的源数据库(SQL 服务器)中加入 2 table 时相同的数据。
显然,我想实现正确 加入完整 数据集,如in the Flink docs
所述
与只有几行的小数据集(如 here)的简单示例和 Flink 测试代码不同,较大数据集的连接 不会 产生正确的结果。
我怀疑,当 probing/left table 开始流动时, build/right table 还没有 'in memory' 这意味着左边的行没有找不到匹配的右行,而他们应该——如果右 table 会更早开始流动的话。这就是为什么右边 table.
列的left join
returns 为空值的原因
我已经包含了我的代码:
@Slf4j(topic = "TO_FILE")
public class CsvTemporalJoinTest {
private final String emr01Ddl =
"CREATE TABLE EMR01\n" +
"(\n" +
" SRC_NO STRING,\n" +
" JRD_ETT_NO STRING,\n" +
" STT_DT DATE,\n" +
" MGT_SLT_DT DATE,\n" +
" ATM_CRT_DT DATE,\n" +
" LTD_MDT_IC STRING,\n" +
" CPN_ORG_NO STRING,\n" +
" PTY_NO STRING,\n" +
" REG_USER_CD STRING,\n" +
" REG_TS TIMESTAMP,\n" +
" MUT_USER_CD STRING,\n" +
" MUT_TS TIMESTAMP(3),\n" +
" WATERMARK FOR MUT_TS AS MUT_TS,\n" +
" PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = '" + getCsv1() + "',\n" +
" 'format' = 'csv'\n" +
")";
private final String emr02Ddl =
"CREATE TABLE EMR02\n" +
"(\n" +
" CPN_ORG_NO STRING,\n" +
" DSB_TX STRING,\n" +
" REG_USER_CD STRING,\n" +
" REG_TS TIMESTAMP,\n" +
" MUT_USER_CD STRING,\n" +
" MUT_TS TIMESTAMP(3),\n" +
" WATERMARK FOR MUT_TS AS MUT_TS,\n" +
" PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = '" + getCsv2() + "',\n" +
" 'format' = 'csv'\n" +
")";
@Test
public void testEventTimeTemporalJoin() throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(emr01Ddl);
tableEnv.executeSql(emr02Ddl);
Table result = tableEnv.sqlQuery("" +
"SELECT *" +
" FROM EMR01" +
" LEFT JOIN EMR02 FOR SYSTEM_TIME AS OF EMR01.MUT_TS" +
" ON EMR01.CPN_ORG_NO = EMR02.CPN_ORG_NO");
tableEnv.toChangelogStream(result).addSink(new TestSink());
env.execute();
System.out.println("[Count]" + TestSink.values.size());
//System.out.println("[Row 1]" + TestSink.values.get(0));
//System.out.println("[Row 2]" + TestSink.values.get(1));
AtomicInteger i = new AtomicInteger();
TestSink.values.listIterator().forEachRemaining(value -> log.info("[Row " + i.incrementAndGet() + " ]=" + value));
}
private static class TestSink implements SinkFunction<Row> {
// must be static
public static final List<Row> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Row value, SinkFunction.Context context) {
values.add(value);
}
}
String getCsv1() {
try {
return new ClassPathResource("/GBTEMR01.csv").getFile().getAbsolutePath();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
String getCsv2() {
try {
return new ClassPathResource("/GBTEMR02.csv").getFile().getAbsolutePath();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
有办法解决吗?例如。有没有办法首先将右侧加载到 Flink 状态,然后启动 loading/streaming 左侧?这会是一个好方法吗,因为这个问题引出了:多久以后?左边什么时候可以开始流动?
我们正在使用 Flink 1.13.3。
这种 temporal/versioned 连接取决于是否有准确的水印。 Flink 依靠水印来知道哪些行可以从正在维护的状态中安全地删除(因为它们不再影响结果)。
您使用的水印表示行按 MUT_TS
排序。由于这不是事实,因此连接无法产生完整的结果。
要解决这个问题,水印应该这样定义
WATERMARK FOR MUT_TS AS MUT_TS - INTERVAL '2' MINUTE
其中区间表示需要容纳多少乱序。