Flink Table/SQL API: session window 聚合后修改rowtime属性
Flink Table/SQL API: modify rowtime attribute after session window aggregation
我想使用 Session
window 聚合,然后在 Table API/Flink SQL
中生成的结果之上使用 运行 Tumble
window 聚合.
是否可以在第一次 session
聚合后修改 rowtime
属性,使其等于会话中最后观察到的事件的 .rowtime
?
我正在尝试做这样的事情:
table
.window(Session withGap 2.minutes on 'rowtime as 'w)
.groupBy('w, 'userId)
.select(
'userId,
('w.end.cast(Types.LONG) - 'w.start.cast(Types.LONG)) as 'sessionDuration,
('w.rowtime - 2.minutes) as 'rowtime
)
.window(Tumble over 5.minutes on 'rowtime as 'w)
.groupBy('w)
.select(
'w.start,
'w.end,
'sessionDuration.avg as 'avgSession,
'sessionDuration.count as 'numberOfSession
)
关键部分是:
('w.rowtime - 2.minutes) as 'rowtime
所以我想将会话中最新事件的 .rowtime
重新分配给记录,没有会话间隔(本例中为 2.minutes
)。
这在 BatchTable 中工作正常,但在 StreamTable 中不起作用:
Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'rowtime, 300000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.
是的,我知道,感觉就像我不想发明时间机器并改变时间的顺序。但实际上有可能以某种方式实现所描述的行为吗?
不,不幸的是,在当前版本 (1.6.0) 中,您无法使用 SQL 或 Table API 执行此操作。一旦修改时间属性(rowtime 或 proctime),它就会变成常规 TIMESTAMP
属性并失去其特殊的时间特征。
对于行时间属性,原因是我们不能保证时间戳仍然与水印对齐。原则上,我们可以将水印延迟减去的时间间隔,但目前还不支持。
我想使用 Session
window 聚合,然后在 Table API/Flink SQL
中生成的结果之上使用 运行 Tumble
window 聚合.
是否可以在第一次 session
聚合后修改 rowtime
属性,使其等于会话中最后观察到的事件的 .rowtime
?
我正在尝试做这样的事情:
table
.window(Session withGap 2.minutes on 'rowtime as 'w)
.groupBy('w, 'userId)
.select(
'userId,
('w.end.cast(Types.LONG) - 'w.start.cast(Types.LONG)) as 'sessionDuration,
('w.rowtime - 2.minutes) as 'rowtime
)
.window(Tumble over 5.minutes on 'rowtime as 'w)
.groupBy('w)
.select(
'w.start,
'w.end,
'sessionDuration.avg as 'avgSession,
'sessionDuration.count as 'numberOfSession
)
关键部分是:
('w.rowtime - 2.minutes) as 'rowtime
所以我想将会话中最新事件的 .rowtime
重新分配给记录,没有会话间隔(本例中为 2.minutes
)。
这在 BatchTable 中工作正常,但在 StreamTable 中不起作用:
Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'rowtime, 300000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.
是的,我知道,感觉就像我不想发明时间机器并改变时间的顺序。但实际上有可能以某种方式实现所描述的行为吗?
不,不幸的是,在当前版本 (1.6.0) 中,您无法使用 SQL 或 Table API 执行此操作。一旦修改时间属性(rowtime 或 proctime),它就会变成常规 TIMESTAMP
属性并失去其特殊的时间特征。
对于行时间属性,原因是我们不能保证时间戳仍然与水印对齐。原则上,我们可以将水印延迟减去的时间间隔,但目前还不支持。