Table API 的过度聚合延迟触发触发器
Table API's Over aggregation delays firing the trigger
下面是代码片段:
// Firstly the inputStream receives the first batch of 420 records;
// Secondly the inputStream receives the second batch of 420 records.
Table inputTable = tableEnv.fromDataStream(
inputStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(eventTime AS TIMESTAMP(3))")
.watermark("rowtime", "rowtime")
.build());
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery(
"SELECT " +
" userId," +
" eventTime," +
" AVG(valueA) OVER w1 AS AVG_VALUE" +
" FROM InputTable " +
" WINDOW w1 AS (" +
" PARTITION BY userId ORDER BY rowtime" +
" RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)"
);
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
resultStream.map((Row row) -> {
LOG.warn("Print row: " + row.toString());
// In response to the inputSteam's first batch of 420 records the resultStream only receives 69 records;
// In response to the inputSteam's second batch of 420 records it receives additional 324 records.
return row;
})
问题如下所示:
- 首先inputStream接收到第一批420条记录——记录之间的间隔为1秒;
- 响应inputSteam的第一批420条记录resultStream只收到69条记录;
- 其次,inputStream接收到第二批420条记录;
- 为了响应 inputSteam 的第二批 420 条记录,resultStream 收到额外的 324 条记录。
需要一个解决方案来使 resultStream 实时检索查询结果。
我想在引擎盖下有一个触发器可以触发 SQL 执行的触发器。知道如何 customize/control 吗?或者它的算法是什么?
谢谢
当满足这些条件时,您的 OVER window 将针对特定行触发:
- 同一个用户 ID 有一个较早的行至少早 10 秒
- 水印已经前进到该行的行时间(这将需要至少看到一行 rowtime+1)
如果您以批处理模式或有界流执行,所有未决 windows 将在 运行 结束时触发。
更新:
有out-of-sequence条记录,需要准确加水印。事实上,out-of-sequence 条记录因为迟到而被删除。
例如,如果记录最多可以乱序 15 秒,则将水印指定为
WATERMARK FOR rowtime AS rowtime – INTERVAL ‘15’ SECOND
下面是代码片段:
// Firstly the inputStream receives the first batch of 420 records;
// Secondly the inputStream receives the second batch of 420 records.
Table inputTable = tableEnv.fromDataStream(
inputStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(eventTime AS TIMESTAMP(3))")
.watermark("rowtime", "rowtime")
.build());
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery(
"SELECT " +
" userId," +
" eventTime," +
" AVG(valueA) OVER w1 AS AVG_VALUE" +
" FROM InputTable " +
" WINDOW w1 AS (" +
" PARTITION BY userId ORDER BY rowtime" +
" RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)"
);
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
resultStream.map((Row row) -> {
LOG.warn("Print row: " + row.toString());
// In response to the inputSteam's first batch of 420 records the resultStream only receives 69 records;
// In response to the inputSteam's second batch of 420 records it receives additional 324 records.
return row;
})
问题如下所示:
- 首先inputStream接收到第一批420条记录——记录之间的间隔为1秒;
- 响应inputSteam的第一批420条记录resultStream只收到69条记录;
- 其次,inputStream接收到第二批420条记录;
- 为了响应 inputSteam 的第二批 420 条记录,resultStream 收到额外的 324 条记录。
需要一个解决方案来使 resultStream 实时检索查询结果。 我想在引擎盖下有一个触发器可以触发 SQL 执行的触发器。知道如何 customize/control 吗?或者它的算法是什么?
谢谢
当满足这些条件时,您的 OVER window 将针对特定行触发:
- 同一个用户 ID 有一个较早的行至少早 10 秒
- 水印已经前进到该行的行时间(这将需要至少看到一行 rowtime+1)
如果您以批处理模式或有界流执行,所有未决 windows 将在 运行 结束时触发。
更新:
有out-of-sequence条记录,需要准确加水印。事实上,out-of-sequence 条记录因为迟到而被删除。
例如,如果记录最多可以乱序 15 秒,则将水印指定为
WATERMARK FOR rowtime AS rowtime – INTERVAL ‘15’ SECOND