Flink Streaming:如何实现由开始和结束元素定义的windows?

Flink Streaming: How to implement windows which are defined by a start and end element?

我有以下格式的数据,

SIP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|INVITE RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|0 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|1 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|2 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|3 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|4 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|5 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|6 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|7 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|8 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|9 SIP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|BYE

我希望我的 window 在遇到 SIP-INVITE 消息时启动,并在遇到 SIP-BYE 消息时触发事件,执行一些聚合。

我该怎么做? SIP-INVITE 消息在给定用户的任何时间点出现,我也可能同时收到多个用户的多个 SIP-INVITE 消息。

我认为您可以使用用户键入的全局 windows 解决您的用例。全局 windows 收集每个键的所有数据,并将触发和清除 window 的责任推给用户定义的 Trigger 函数。

全局window定义如下:

val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker)
val agg = input
  // one global window per user (handles overlapping SIP-INVITE events).
  .keyBy(_._1)
  // collect all data for each user until the trigger fires and purges the window.
  .window(GlobalWindows.create())
  // you have to implement a custom trigger which reacts on the marker.
  .trigger(new YourCustomTrigger())
  // the window function computes your aggregation.
  .apply(new YourWindowFunction())

我认为执行以下操作的触发器应该有效(假设 SIP-INVITE 事件始终启动会话)。 Trigger.onElement() 方法应该检查 SIP-BYE 字段并触发 window 评估并清除 window,即 return TriggerResult.FIRE_AND_PURGE。这将调用评估函数并删除 window 状态。

注意,如果要支持乱序事件需要特别注意(这种情况下需要设置事件时间定时器为关闭元素的时间戳,以确保时间戳之前的所有数据都是已收到)。如果有数据应该被丢弃,因为它不是 "between" SIP-INVITESIP-BYE 你也需要处理它。

有关详细信息,请参阅 global windows and triggers, the JavaDocs of [Trigger][3], and this blog post 的文档。