Flink:将缩回 SQL 转换为追加 SQL,仅使用 SQL,以提供时间 table
Flink: Convert a retracting SQL to an appending SQL, using only SQL, to feed a temporal table
我是给用户提供一个Flink SQL接口,所以我不能真正使用Table或Java/Scala接口。一切都需要在 SQL 中指定。不过,我可以解析 SQL 文件中的注释,并添加指定的临时较低级别 API 指令。
一个用户如何转换,比如说:
SELECT b, AVG(a) "average" FROM source_data GROUP BY b
name: average_source_data_retracting
b STRING
average NUMERIC
——这是缩回值——到一个可以追加它们的形式。此附加表单可以具有以下架构:
name: average_source_data_appending
flag BOOLEAN <-- indicating an accumulate or retract message
b STRING
average NUMERIC
也就是具有相当于 AppendStreamTableSink 的 RetractStreamTableSink,但它不是一个接收器。
所有这些都是为了能够使用 average_source_data_appending 创建一个 Temporal table(过滤撤回消息),但是这种 table 只接受仅附加源 tables.
我考虑过使用 windows (as talked about here),但我希望即时更新 table。
请忽略这个问题,显然 Temporal Table 函数可以接受(对我来说)缩回的表格。
大意为:
SELECT b, AVG(a) "average", MAX(proctime) max_proctime FROM source_data GROUP BY b
可以作为时间 Table 函数接受,其中 b 作为键,max_proctime 作为时间属性。我猜 MAX(proctime) 以某种方式让它认为新行被发出,当它们只是被覆盖时?我想我需要更多时间来理解这一点。
编辑:
通过源代码挖掘,我们发现 Temporal Table 函数似乎接受缩回定义,但前提是它在处理时间:
TemporalProcessTimeJoinOperator.java:
@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
if (BaseRowUtil.isAccumulateMsg(element.getValue())) {
rightState.update(element.getValue());
registerProcessingCleanupTimer();
} else {
rightState.clear();
cleanupLastTimer();
}
}
TemporalRowTimeJoinOperator.java:
@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
...
checkNotRetraction(row);
...
}
private void checkNotRetraction(BaseRow row) {
if (BaseRowUtil.isRetractMsg(row)) {
String className = getClass().getSimpleName();
throw new IllegalStateException(
"Retractions are not supported by " + className +
". If this can happen it should be validated during planning!");
}
}
这没有记录;我不知道这是否是永久性的,以及文档是否会更新。
我是给用户提供一个Flink SQL接口,所以我不能真正使用Table或Java/Scala接口。一切都需要在 SQL 中指定。不过,我可以解析 SQL 文件中的注释,并添加指定的临时较低级别 API 指令。
一个用户如何转换,比如说:
SELECT b, AVG(a) "average" FROM source_data GROUP BY b
name: average_source_data_retracting
b STRING
average NUMERIC
——这是缩回值——到一个可以追加它们的形式。此附加表单可以具有以下架构:
name: average_source_data_appending
flag BOOLEAN <-- indicating an accumulate or retract message
b STRING
average NUMERIC
也就是具有相当于 AppendStreamTableSink 的 RetractStreamTableSink,但它不是一个接收器。
所有这些都是为了能够使用 average_source_data_appending 创建一个 Temporal table(过滤撤回消息),但是这种 table 只接受仅附加源 tables.
我考虑过使用 windows (as talked about here),但我希望即时更新 table。
请忽略这个问题,显然 Temporal Table 函数可以接受(对我来说)缩回的表格。
大意为:
SELECT b, AVG(a) "average", MAX(proctime) max_proctime FROM source_data GROUP BY b
可以作为时间 Table 函数接受,其中 b 作为键,max_proctime 作为时间属性。我猜 MAX(proctime) 以某种方式让它认为新行被发出,当它们只是被覆盖时?我想我需要更多时间来理解这一点。
编辑:
通过源代码挖掘,我们发现 Temporal Table 函数似乎接受缩回定义,但前提是它在处理时间:
TemporalProcessTimeJoinOperator.java:
@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
if (BaseRowUtil.isAccumulateMsg(element.getValue())) {
rightState.update(element.getValue());
registerProcessingCleanupTimer();
} else {
rightState.clear();
cleanupLastTimer();
}
}
TemporalRowTimeJoinOperator.java:
@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
...
checkNotRetraction(row);
...
}
private void checkNotRetraction(BaseRow row) {
if (BaseRowUtil.isRetractMsg(row)) {
String className = getClass().getSimpleName();
throw new IllegalStateException(
"Retractions are not supported by " + className +
". If this can happen it should be validated during planning!");
}
}
这没有记录;我不知道这是否是永久性的,以及文档是否会更新。