在 PyFlink 中使用消息键进行窗口化分组
Windowed grouping using message keys in PyFlink
我正在为一个项目使用 PyFlink 1.13,我正在尝试执行以下操作:
- 从消息包含 UserId 的 Kafka 主题读取数据
- 对数据执行超过 2 秒的滚动窗口
- 使用我的 windows 值调用 Python UDF
这是我试图实现的数据流的可视化表示:
我正在使用 PyFlink 的 Table API 并且我的两个表都是使用 SQL DDL 声明的。
我的查询执行是这样的:
SELECT UserId, Timestamp, my_udf(Data) AS Result,
FROM InputTable
GROUP BY TUMBLE(Timestamp, interval 2 SECONDS), UserId, Data
这是我的 Python UDF 函数:
@udf(input_types=SOME_INPUT_TYPE, result_type=SOME_OUTPUT_TYPE)
def my_udf(window_data):
# ...business logic here with window_data
return some_result
我目前的问题是,由于某种原因,my_udf
函数分别接收每一行,因此在上面的示例中将被调用 4 次而不是 2 次。
我一直在查看 PyFlink 文档,但找不到如何实现我想要的。
信息可能在文档中,但我似乎没能 find/understand 它。
如有任何帮助,我们将不胜感激。
谢谢!
如果我理解正确,您想要修改您的查询,使其不按Data
列或Timestamp
SELECT UserId, TUMBLE_END(Timestamp, interval '2' SECONDS), my_udf(Data) AS Result,
FROM InputTable
GROUP BY TUMBLE(Timestamp, interval '2' SECONDS), UserId
然后您想要实现一个 user-defined aggregate function,它将给定用户的 window 中所有行的数据列的值聚合为一个值。我在上面链接的文档中有一个示例。
我正在为一个项目使用 PyFlink 1.13,我正在尝试执行以下操作:
- 从消息包含 UserId 的 Kafka 主题读取数据
- 对数据执行超过 2 秒的滚动窗口
- 使用我的 windows 值调用 Python UDF
这是我试图实现的数据流的可视化表示:
我正在使用 PyFlink 的 Table API 并且我的两个表都是使用 SQL DDL 声明的。
我的查询执行是这样的:
SELECT UserId, Timestamp, my_udf(Data) AS Result,
FROM InputTable
GROUP BY TUMBLE(Timestamp, interval 2 SECONDS), UserId, Data
这是我的 Python UDF 函数:
@udf(input_types=SOME_INPUT_TYPE, result_type=SOME_OUTPUT_TYPE)
def my_udf(window_data):
# ...business logic here with window_data
return some_result
我目前的问题是,由于某种原因,my_udf
函数分别接收每一行,因此在上面的示例中将被调用 4 次而不是 2 次。
我一直在查看 PyFlink 文档,但找不到如何实现我想要的。
信息可能在文档中,但我似乎没能 find/understand 它。
如有任何帮助,我们将不胜感激。
谢谢!
如果我理解正确,您想要修改您的查询,使其不按Data
列或Timestamp
SELECT UserId, TUMBLE_END(Timestamp, interval '2' SECONDS), my_udf(Data) AS Result,
FROM InputTable
GROUP BY TUMBLE(Timestamp, interval '2' SECONDS), UserId
然后您想要实现一个 user-defined aggregate function,它将给定用户的 window 中所有行的数据列的值聚合为一个值。我在上面链接的文档中有一个示例。