Table API 的过度聚合延迟触发触发器

Table API's Over aggregation delays firing the trigger

下面是代码片段:

// Firstly the inputStream receives the first batch of 420 records; 
// Secondly the inputStream receives the second batch of 420 records.
Table inputTable = tableEnv.fromDataStream(
    inputStream, 
  Schema.newBuilder()
      .columnByExpression("rowtime", "CAST(eventTime AS TIMESTAMP(3))")
      .watermark("rowtime", "rowtime")
      .build());

tableEnv.createTemporaryView("InputTable", inputTable);

Table resultTable = tableEnv.sqlQuery(
    "SELECT " +
    " userId," +
    " eventTime," +
    " AVG(valueA) OVER w1 AS AVG_VALUE" +
    " FROM InputTable " +
    " WINDOW w1 AS (" +
    " PARTITION BY userId ORDER BY rowtime" +
    " RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)"
);

DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);

resultStream.map((Row row) -> {
  LOG.warn("Print row: " + row.toString()); 
  // In response to the inputSteam's first batch of 420 records the resultStream only receives 69 records; 
  // In response to the inputSteam's second batch of 420 records it receives additional 324 records.

  return row;
})

问题如下所示:

  1. 首先inputStream接收到第一批420条记录——记录之间的间隔为1秒;
  2. 响应inputSteam的第一批420条记录resultStream只收到69条记录;
  3. 其次,inputStream接收到第二批420条记录;
  4. 为了响应 inputSteam 的第二批 420 条记录,resultStream 收到额外的 324 条记录。

需要一个解决方案来使 resultStream 实时检索查询结果。 我想在引擎盖下有一个触发器可以触发 SQL 执行的触发器。知道如何 customize/control 吗?或者它的算法是什么?

谢谢

当满足这些条件时,您的 OVER window 将针对特定行触发:

  1. 同一个用户 ID 有一个较早的行至少早 10 秒
  2. 水印已经前进到该行的行时间(这将需要至少看到一行 rowtime+1)

如果您以批处理模式或有界流执行,所有未决 windows 将在 运行 结束时触发。

更新:

有out-of-sequence条记录,需要准确加水印。事实上,out-of-sequence 条记录因为迟到而被删除。

例如,如果记录最多可以乱序 15 秒,则将水印指定为

  WATERMARK FOR rowtime AS rowtime – INTERVAL ‘15’ SECOND