如何在 window 和 sink 运算符之间注入延迟?

How to inject delay between the window and sink operator?

上下文 - 应用程序

我们有一个处理事件的 Apache Flink 应用程序

上下文 - 基础设施

申请:

数据库:

问题

因为我们使用的是 1 分钟翻滚的事件时间特征window所有区域的接收器几乎同时发出它们的记录。

我们想要实现的是在 window 和 sink operators 之间添加人为延迟以推迟 sink 发射。

Flink App Offset Window 1 Sink 1st run Window 2 Sink 2nd run
#1 0 60 60 120 120
#2 12 60 72 120 132
#3 24 60 84 120 144
#4 36 60 96 120 156
#5 48 60 108 120 168

解决方法无效

我们想过可以像这样给evictor's evictBefore加点睡眠

...
.keyBy(event -> event.getSessionId())
.window(getWindowAssigner(config))
.allowedLateness(Time.seconds(config.getWindowLatenessInSec()))
.evictor(new Evictor<>() {
    private static final long serialVersionUID = 5373966807521260856L;

    public void evictBefore(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
        try {
            Thread.sleep(config.getWindowingDelayInMilliSec());
        } catch (InterruptedException ignore) {
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {

    }
})
...

但它不能可靠地工作。

您可以将 TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger)WindowStagger.RANDOM 一起使用。

有关文档,请参阅 https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/assigners/WindowStagger.html