如何在 Apache Flink 中实现 Window 功能?
How to implement Window Function in Apache Flink?
各位,
我有一个 kafka 主题源,我按 1 分钟 window 对它进行分组。
在 window 中我想做的是创建具有 Window 函数的新列,如 SQL 中那样,例如我想使用
- 总和(数量)超过(分区
- 计数(用户)超过(分区依据
- ROW_NUMBER() 结束(分区
我可以使用 DataStream 函数进行这些操作吗? 或
如何操作我的kafka数据转成DataTable并使用sqlQuery?
Destination 是另一个 kafka 主题。
val stream = senv
.addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))
我试过这样做
val tableA = tableEnv.fromDataStream(stream, 'user, 'product, 'amount)
但我收到以下错误
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.
测试数据
1,"beer",3
1,"beer",1
2,"beer",3
3,"diaper",4
4,"diaper",1
5,"diaper",5
6,"rubber",2
查询示例
SELECT
user, product, amount,
COUNT(user) OVER(PARTITION BY product) AS count_product
FROM table;
预期表现
1,"beer",3,3
1,"beer",1,3
2,"beer",3,3
3,"diaper",4,3
4,"diaper",1,3
5,"diaper",5,3
6,"rubber",2,1
您需要将字符串解析为字段,然后重命名。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val stream = env.fromElements("1,beer,3",
"1,beer,1","2,beer,3","3,diaper,4","4,diaper,1","5,diaper,5","6,rubber,2");
val parsed = stream.map(x=> {
val arr = x.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
})
val tableA = tEnv.fromDataStream(parsed, $"_1" as "user", $"_2" as "product", $"_3" as "amount")
// example query
val result = tEnv.sqlQuery(s"SELECT user, product, amount from $tableA")
val rs = result.toAppendStream[(Int, String, Int)]
rs.print()
我不确定如何在 Flink SQL 中实现所需的 window 功能。或者,也可以在简单的 Flink 中实现如下:
parsed.keyBy(x => x._2) // key by product id.
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.process(new ProcessWindowFunction[
(Int, String, Int), (Int, String, Int, Int), String, TimeWindow
]() {
override def process(key: String, context: Context,
elements: Iterable[(Int, String, Int)],
out: Collector[(Int, String, Int, Int)]): Unit = {
val lst = elements.toList
lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
}
})
.print()
各位, 我有一个 kafka 主题源,我按 1 分钟 window 对它进行分组。 在 window 中我想做的是创建具有 Window 函数的新列,如 SQL 中那样,例如我想使用
- 总和(数量)超过(分区
- 计数(用户)超过(分区依据
- ROW_NUMBER() 结束(分区
我可以使用 DataStream 函数进行这些操作吗? 或
如何操作我的kafka数据转成DataTable并使用sqlQuery?
Destination 是另一个 kafka 主题。
val stream = senv
.addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))
我试过这样做
val tableA = tableEnv.fromDataStream(stream, 'user, 'product, 'amount)
但我收到以下错误
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.
测试数据
1,"beer",3
1,"beer",1
2,"beer",3
3,"diaper",4
4,"diaper",1
5,"diaper",5
6,"rubber",2
查询示例
SELECT
user, product, amount,
COUNT(user) OVER(PARTITION BY product) AS count_product
FROM table;
预期表现
1,"beer",3,3
1,"beer",1,3
2,"beer",3,3
3,"diaper",4,3
4,"diaper",1,3
5,"diaper",5,3
6,"rubber",2,1
您需要将字符串解析为字段,然后重命名。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val stream = env.fromElements("1,beer,3",
"1,beer,1","2,beer,3","3,diaper,4","4,diaper,1","5,diaper,5","6,rubber,2");
val parsed = stream.map(x=> {
val arr = x.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
})
val tableA = tEnv.fromDataStream(parsed, $"_1" as "user", $"_2" as "product", $"_3" as "amount")
// example query
val result = tEnv.sqlQuery(s"SELECT user, product, amount from $tableA")
val rs = result.toAppendStream[(Int, String, Int)]
rs.print()
我不确定如何在 Flink SQL 中实现所需的 window 功能。或者,也可以在简单的 Flink 中实现如下:
parsed.keyBy(x => x._2) // key by product id.
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.process(new ProcessWindowFunction[
(Int, String, Int), (Int, String, Int, Int), String, TimeWindow
]() {
override def process(key: String, context: Context,
elements: Iterable[(Int, String, Int)],
out: Collector[(Int, String, Int, Int)]): Unit = {
val lst = elements.toList
lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
}
})
.print()