Flink:将缩回 SQL 转换为追加 SQL,仅使用 SQL,以提供时间 table

Flink: Convert a retracting SQL to an appending SQL, using only SQL, to feed a temporal table

我是给用户提供一个Flink SQL接口,所以我不能真正使用Table或Java/Scala接口。一切都需要在 SQL 中指定。不过,我可以解析 SQL 文件中的注释,并添加指定的临时较低级别 API 指令。

一个用户如何转换,比如说:

SELECT b, AVG(a) "average" FROM source_data GROUP BY b

name: average_source_data_retracting
  b STRING
  average NUMERIC

——这是缩回值——到一个可以追加它们的形式。此附加表单可以具有以下架构:

name: average_source_data_appending
  flag BOOLEAN <-- indicating an accumulate or retract message
  b STRING
  average NUMERIC

也就是具有相当于 AppendStreamTableSink 的 RetractStreamTableSink,但它不是一个接收器。

所有这些都是为了能够使用 average_source_data_appending 创建一个 Temporal table(过滤撤回消息),但是这种 table 只接受仅附加源 tables.

我考虑过使用 windows (as talked about here),但我希望即时更新 table。

请忽略这个问题,显然 Temporal Table 函数可以接受(对我来说)缩回的表格。

大意为:

SELECT b, AVG(a) "average", MAX(proctime) max_proctime FROM source_data GROUP BY b

可以作为时间 Table 函数接受,其中 b 作为键,max_proctime 作为时间属性。我猜 MAX(proctime) 以某种方式让它认为新行被发出,当它们只是被覆盖时?我想我需要更多时间来理解这一点。

编辑:

通过源代码挖掘,我们发现 Temporal Table 函数似乎接受缩回定义,但前提是它在处理时间:

TemporalProcessTimeJoinOperator.java:

@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
    if (BaseRowUtil.isAccumulateMsg(element.getValue())) {
        rightState.update(element.getValue());
        registerProcessingCleanupTimer();
    } else {
        rightState.clear();
        cleanupLastTimer();
    }
}

TemporalRowTimeJoinOperator.java:

@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
    ...
    checkNotRetraction(row);
    ...
}
private void checkNotRetraction(BaseRow row) {
    if (BaseRowUtil.isRetractMsg(row)) {
        String className = getClass().getSimpleName();
        throw new IllegalStateException(
            "Retractions are not supported by " + className +
                ". If this can happen it should be validated during planning!");
    }
}

这没有记录;我不知道这是否是永久性的,以及文档是否会更新。