如何在 window 和 sink 运算符之间注入延迟?
How to inject delay between the window and sink operator?
上下文 - 应用程序
我们有一个处理事件的 Apache Flink 应用程序
- 应用程序使用事件时间特征
- 基于
sessionId
字段的应用程序分片 (keyBy
) 事件
- 应用程序 window 滚动 1 分钟 window
- windowing 由
reduce
和 process
函数指定
- 因此,对于每个会话,我们将有 1 个计算记录
- 应用程序将数据发送到 Postgres 接收器
上下文 - 基础设施
申请:
- 它通过 Kinesis Data Analytics (KDA) 托管在 AWS 中
- 在 5 个不同的地区运行
- 每个地区运行完全相同的代码
数据库:
- 它通过 RDS 托管在 AWS 中(目前是 PostgreSQL)
- 它位于一个区域(在不同区域有一个只读副本)
问题
因为我们使用的是 1 分钟翻滚的事件时间特征window所有区域的接收器几乎同时发出它们的记录。
我们想要实现的是在 window 和 sink operators 之间添加人为延迟以推迟 sink 发射。
Flink App
Offset
Window 1
Sink 1st run
Window 2
Sink 2nd run
#1
0
60
60
120
120
#2
12
60
72
120
132
#3
24
60
84
120
144
#4
36
60
96
120
156
#5
48
60
108
120
168
解决方法无效
我们想过可以像这样给evictor's evictBefore
加点睡眠
...
.keyBy(event -> event.getSessionId())
.window(getWindowAssigner(config))
.allowedLateness(Time.seconds(config.getWindowLatenessInSec()))
.evictor(new Evictor<>() {
private static final long serialVersionUID = 5373966807521260856L;
public void evictBefore(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
try {
Thread.sleep(config.getWindowingDelayInMilliSec());
} catch (InterruptedException ignore) {
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
}
})
...
但它不能可靠地工作。
您可以将 TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger)
与 WindowStagger.RANDOM
一起使用。
上下文 - 应用程序
我们有一个处理事件的 Apache Flink 应用程序
- 应用程序使用事件时间特征
- 基于
sessionId
字段的应用程序分片 (keyBy
) 事件 - 应用程序 window 滚动 1 分钟 window
- windowing 由
reduce
和process
函数指定 - 因此,对于每个会话,我们将有 1 个计算记录
- windowing 由
- 应用程序将数据发送到 Postgres 接收器
上下文 - 基础设施
申请:
- 它通过 Kinesis Data Analytics (KDA) 托管在 AWS 中
- 在 5 个不同的地区运行
- 每个地区运行完全相同的代码
数据库:
- 它通过 RDS 托管在 AWS 中(目前是 PostgreSQL)
- 它位于一个区域(在不同区域有一个只读副本)
问题
因为我们使用的是 1 分钟翻滚的事件时间特征window所有区域的接收器几乎同时发出它们的记录。
我们想要实现的是在 window 和 sink operators 之间添加人为延迟以推迟 sink 发射。
Flink App | Offset | Window 1 | Sink 1st run | Window 2 | Sink 2nd run |
---|---|---|---|---|---|
#1 | 0 | 60 | 60 | 120 | 120 |
#2 | 12 | 60 | 72 | 120 | 132 |
#3 | 24 | 60 | 84 | 120 | 144 |
#4 | 36 | 60 | 96 | 120 | 156 |
#5 | 48 | 60 | 108 | 120 | 168 |
解决方法无效
我们想过可以像这样给evictor's evictBefore
加点睡眠
...
.keyBy(event -> event.getSessionId())
.window(getWindowAssigner(config))
.allowedLateness(Time.seconds(config.getWindowLatenessInSec()))
.evictor(new Evictor<>() {
private static final long serialVersionUID = 5373966807521260856L;
public void evictBefore(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
try {
Thread.sleep(config.getWindowingDelayInMilliSec());
} catch (InterruptedException ignore) {
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
}
})
...
但它不能可靠地工作。
您可以将 TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger)
与 WindowStagger.RANDOM
一起使用。