在我的例子中,没有 tumble window group by 的输出

There is no output for the tumble window group by in my case

我正在使用 Flink 1.12。我想读取 csv,并根据处理时间翻滚 window 分组。

代码如下,但是没有输出查询sql_tubmle_window,想知道问题出在哪里

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row

object Sql017_ProcessTimeAttributeDDLTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tenv = StreamTableEnvironment.create(env)

    val ddl =
      """
      create table sourceTable(
      key STRING,
      price DOUBLE,
      pt as PROCTIME() ---processing time
      ) with (
        'connector' = 'filesystem',
        'path' = 'D:/stock_id_price.csv',
        'format' = 'csv'
      )
      """.stripMargin(' ')

    //Create the source table
    tenv.executeSql(ddl)

    //NOTE: The following query produces correct result
    tenv.sqlQuery("select key, price, pt from sourceTable").toAppendStream[Row].print()

    //there is no output for the tumble group by query
    val sql_tumble_window =
      """
       SELECT
           TUMBLE_START(pt, INTERVAL '4' second),
           TUMBLE_END(pt, INTERVAL '4' second),
           sum(price),
           'FLAG'
       FROM sourceTable
       GROUP BY TUMBLE(pt, INTERVAL '4' second)
     """.stripMargin(' ')

    println("=" * 20)
    println("=" * 20)

    //There is no output for this sql query
    tenv.sqlQuery(sql_tumble_window).toAppendStream[Row].print()

    env.execute()

    Thread.sleep(20 * 1000)
  }

}

问题是作业 运行 在 window 有机会触发之前完成。

当 Flink 流作业 运行 具有有界输入(例如文件)时,作业在完全消耗并处理完输入后结束。同时,只要一天中的时间恰好是自纪元以来 4 秒的精确倍数,就会触发 4 秒长的处理时间 window——除非 CSV 文件非常长,否则这种情况不太可能发生。

您可能希望 20 秒长的睡眠能够解决这个问题。但是在将作业提交到集群后,Flink 客户端发生了睡眠。这不会影响流作业本身的执行。