windowByTime 和 triggerByCount 的 Flink 组合

Flink combination of windowByTime and triggerByCount

source.keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .trigger(PurgingTrigger.of(CountTrigger.of[TimeWindow](2)))
    .process(new TestFun())

说明:

假设我有3个事件[E1, E2, E3],它们应该按计数触发,也应该按时间触发。 我使用 countTrigger 仅触发 2 个事件(E1 和 E2),但未触发剩余的 E3 事件。

预期:E3 事件应该在 5 秒后触发,但实际上它只触发 E1 和 E2 事件

您提供的 CountTrigger 正在替换通常与 TumblingEventTimeWindow 一起使用的 EventTimeTrigger,而不是以某种方式扩展或扩充它。要获得您想要的行为,您必须实现一个自定义触发器,该触发器可以根据计数和超时触发 window。

A​​ google search 会找到一些例子和讨论。