Flink Streaming:如何实现由开始和结束元素定义的windows?
Flink Streaming: How to implement windows which are defined by a start and end element?
我有以下格式的数据,
SIP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST
2016|INVITE RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08
16:58:58 IST 2016|0 RTP|2405463430|4115474257|8.205142580136622E12|Tue
Nov 08 16:58:58 IST 2016|1
RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST
2016|2 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08
16:58:58 IST 2016|3 RTP|2405463430|4115474257|8.205142580136622E12|Tue
Nov 08 16:58:58 IST 2016|4
RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST
2016|5 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08
16:58:58 IST 2016|6 RTP|2405463430|4115474257|8.205142580136622E12|Tue
Nov 08 16:58:58 IST 2016|7
RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST
2016|8 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08
16:58:58 IST 2016|9 SIP|2405463430|4115474257|8.205142580136622E12|Tue
Nov 08 16:58:58 IST 2016|BYE
我希望我的 window 在遇到 SIP-INVITE
消息时启动,并在遇到 SIP-BYE
消息时触发事件,执行一些聚合。
我该怎么做? SIP-INVITE
消息在给定用户的任何时间点出现,我也可能同时收到多个用户的多个 SIP-INVITE
消息。
我认为您可以使用用户键入的全局 windows 解决您的用例。全局 windows 收集每个键的所有数据,并将触发和清除 window 的责任推给用户定义的 Trigger
函数。
全局window定义如下:
val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker)
val agg = input
// one global window per user (handles overlapping SIP-INVITE events).
.keyBy(_._1)
// collect all data for each user until the trigger fires and purges the window.
.window(GlobalWindows.create())
// you have to implement a custom trigger which reacts on the marker.
.trigger(new YourCustomTrigger())
// the window function computes your aggregation.
.apply(new YourWindowFunction())
我认为执行以下操作的触发器应该有效(假设 SIP-INVITE
事件始终启动会话)。 Trigger.onElement()
方法应该检查 SIP-BYE
字段并触发 window 评估并清除 window,即 return TriggerResult.FIRE_AND_PURGE
。这将调用评估函数并删除 window 状态。
注意,如果要支持乱序事件需要特别注意(这种情况下需要设置事件时间定时器为关闭元素的时间戳,以确保时间戳之前的所有数据都是已收到)。如果有数据应该被丢弃,因为它不是 "between" SIP-INVITE
和 SIP-BYE
你也需要处理它。
有关详细信息,请参阅 global windows and triggers, the JavaDocs of [Trigger][3]
, and this blog post 的文档。
我有以下格式的数据,
SIP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|INVITE RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|0 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|1 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|2 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|3 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|4 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|5 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|6 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|7 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|8 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|9 SIP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|BYE
我希望我的 window 在遇到 SIP-INVITE
消息时启动,并在遇到 SIP-BYE
消息时触发事件,执行一些聚合。
我该怎么做? SIP-INVITE
消息在给定用户的任何时间点出现,我也可能同时收到多个用户的多个 SIP-INVITE
消息。
我认为您可以使用用户键入的全局 windows 解决您的用例。全局 windows 收集每个键的所有数据,并将触发和清除 window 的责任推给用户定义的 Trigger
函数。
全局window定义如下:
val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker)
val agg = input
// one global window per user (handles overlapping SIP-INVITE events).
.keyBy(_._1)
// collect all data for each user until the trigger fires and purges the window.
.window(GlobalWindows.create())
// you have to implement a custom trigger which reacts on the marker.
.trigger(new YourCustomTrigger())
// the window function computes your aggregation.
.apply(new YourWindowFunction())
我认为执行以下操作的触发器应该有效(假设 SIP-INVITE
事件始终启动会话)。 Trigger.onElement()
方法应该检查 SIP-BYE
字段并触发 window 评估并清除 window,即 return TriggerResult.FIRE_AND_PURGE
。这将调用评估函数并删除 window 状态。
注意,如果要支持乱序事件需要特别注意(这种情况下需要设置事件时间定时器为关闭元素的时间戳,以确保时间戳之前的所有数据都是已收到)。如果有数据应该被丢弃,因为它不是 "between" SIP-INVITE
和 SIP-BYE
你也需要处理它。
有关详细信息,请参阅 global windows and triggers, the JavaDocs of [Trigger][3]
, and this blog post 的文档。