windowByTime 和 triggerByCount 的 Flink 组合
Flink combination of windowByTime and triggerByCount
source.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(PurgingTrigger.of(CountTrigger.of[TimeWindow](2)))
.process(new TestFun())
说明:
假设我有3个事件[E1, E2, E3],它们应该按计数触发,也应该按时间触发。
我使用 countTrigger 仅触发 2 个事件(E1 和 E2),但未触发剩余的 E3 事件。
预期:E3 事件应该在 5 秒后触发,但实际上它只触发 E1 和 E2 事件
您提供的 CountTrigger
正在替换通常与 TumblingEventTimeWindow
一起使用的 EventTimeTrigger
,而不是以某种方式扩展或扩充它。要获得您想要的行为,您必须实现一个自定义触发器,该触发器可以根据计数和超时触发 window。
A google search 会找到一些例子和讨论。
source.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(PurgingTrigger.of(CountTrigger.of[TimeWindow](2)))
.process(new TestFun())
说明:
假设我有3个事件[E1, E2, E3],它们应该按计数触发,也应该按时间触发。 我使用 countTrigger 仅触发 2 个事件(E1 和 E2),但未触发剩余的 E3 事件。
预期:E3 事件应该在 5 秒后触发,但实际上它只触发 E1 和 E2 事件
您提供的 CountTrigger
正在替换通常与 TumblingEventTimeWindow
一起使用的 EventTimeTrigger
,而不是以某种方式扩展或扩充它。要获得您想要的行为,您必须实现一个自定义触发器,该触发器可以根据计数和超时触发 window。
A google search 会找到一些例子和讨论。