在我的例子中,没有 tumble window group by 的输出
There is no output for the tumble window group by in my case
我正在使用 Flink 1.12。我想读取 csv,并根据处理时间翻滚 window 分组。
代码如下,但是没有输出查询sql_tubmle_window
,想知道问题出在哪里
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
object Sql017_ProcessTimeAttributeDDLTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
"""
create table sourceTable(
key STRING,
price DOUBLE,
pt as PROCTIME() ---processing time
) with (
'connector' = 'filesystem',
'path' = 'D:/stock_id_price.csv',
'format' = 'csv'
)
""".stripMargin(' ')
//Create the source table
tenv.executeSql(ddl)
//NOTE: The following query produces correct result
tenv.sqlQuery("select key, price, pt from sourceTable").toAppendStream[Row].print()
//there is no output for the tumble group by query
val sql_tumble_window =
"""
SELECT
TUMBLE_START(pt, INTERVAL '4' second),
TUMBLE_END(pt, INTERVAL '4' second),
sum(price),
'FLAG'
FROM sourceTable
GROUP BY TUMBLE(pt, INTERVAL '4' second)
""".stripMargin(' ')
println("=" * 20)
println("=" * 20)
//There is no output for this sql query
tenv.sqlQuery(sql_tumble_window).toAppendStream[Row].print()
env.execute()
Thread.sleep(20 * 1000)
}
}
问题是作业 运行 在 window 有机会触发之前完成。
当 Flink 流作业 运行 具有有界输入(例如文件)时,作业在完全消耗并处理完输入后结束。同时,只要一天中的时间恰好是自纪元以来 4 秒的精确倍数,就会触发 4 秒长的处理时间 window——除非 CSV 文件非常长,否则这种情况不太可能发生。
您可能希望 20 秒长的睡眠能够解决这个问题。但是在将作业提交到集群后,Flink 客户端发生了睡眠。这不会影响流作业本身的执行。
我正在使用 Flink 1.12。我想读取 csv,并根据处理时间翻滚 window 分组。
代码如下,但是没有输出查询sql_tubmle_window
,想知道问题出在哪里
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
object Sql017_ProcessTimeAttributeDDLTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
"""
create table sourceTable(
key STRING,
price DOUBLE,
pt as PROCTIME() ---processing time
) with (
'connector' = 'filesystem',
'path' = 'D:/stock_id_price.csv',
'format' = 'csv'
)
""".stripMargin(' ')
//Create the source table
tenv.executeSql(ddl)
//NOTE: The following query produces correct result
tenv.sqlQuery("select key, price, pt from sourceTable").toAppendStream[Row].print()
//there is no output for the tumble group by query
val sql_tumble_window =
"""
SELECT
TUMBLE_START(pt, INTERVAL '4' second),
TUMBLE_END(pt, INTERVAL '4' second),
sum(price),
'FLAG'
FROM sourceTable
GROUP BY TUMBLE(pt, INTERVAL '4' second)
""".stripMargin(' ')
println("=" * 20)
println("=" * 20)
//There is no output for this sql query
tenv.sqlQuery(sql_tumble_window).toAppendStream[Row].print()
env.execute()
Thread.sleep(20 * 1000)
}
}
问题是作业 运行 在 window 有机会触发之前完成。
当 Flink 流作业 运行 具有有界输入(例如文件)时,作业在完全消耗并处理完输入后结束。同时,只要一天中的时间恰好是自纪元以来 4 秒的精确倍数,就会触发 4 秒长的处理时间 window——除非 CSV 文件非常长,否则这种情况不太可能发生。
您可能希望 20 秒长的睡眠能够解决这个问题。但是在将作业提交到集群后,Flink 客户端发生了睡眠。这不会影响流作业本身的执行。