在 PyFlink 中使用消息键进行窗口化分组

Windowed grouping using message keys in PyFlink

我正在为一个项目使用 PyFlink 1.13,我正在尝试执行以下操作:

这是我试图实现的数据流的可视化表示:

我正在使用 PyFlink 的 Table API 并且我的两个表都是使用 SQL DDL 声明的。

我的查询执行是这样的:

SELECT UserId, Timestamp, my_udf(Data) AS Result,
FROM InputTable 
GROUP BY TUMBLE(Timestamp, interval 2 SECONDS), UserId, Data

这是我的 Python UDF 函数:

@udf(input_types=SOME_INPUT_TYPE, result_type=SOME_OUTPUT_TYPE)
def my_udf(window_data):
    
    # ...business logic here with window_data
    
    return some_result

我目前的问题是,由于某种原因,my_udf 函数分别接收每一行,因此在上面的示例中将被调用 4 次而不是 2 次。

我一直在查看 PyFlink 文档,但找不到如何实现我想要的。

信息可能在文档中,但我似乎没能 find/understand 它。

如有任何帮助,我们将不胜感激。

谢谢!

如果我理解正确,您想要修改您的查询,使其Data列或Timestamp

SELECT UserId, TUMBLE_END(Timestamp, interval '2' SECONDS), my_udf(Data) AS Result,
FROM InputTable 
GROUP BY TUMBLE(Timestamp, interval '2' SECONDS), UserId

然后您想要实现一个 user-defined aggregate function,它将给定用户的 window 中所有行的数据列的值聚合为一个值。我在上面链接的文档中有一个示例。