Flink 中的检查点随时间增加
Checkpoints increasing over time in Flink
汇总到 this question 我仍然不清楚为什么我的 Flink 作业的检查点会随着时间的推移而增长,目前,在大约 7 天 运行 的时间里,这些检查点从未得到高原。
我目前使用的是Flink 1.10版本,FS State Backend因为我的工作负担不起使用RocksDB的延迟成本
查看检查点在 7 天内的演变:
假设我在所有有状态运算符中为状态的 TTL 配置了一个小时或更长时间,在一种情况下为一天:
public static final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot().build();
在我看来,所有进入状态的对象都将在到期时间后被清理,因此检查点的大小应该减少,正如我们所期望的,每天的数据量或多或少相同。
另一方面,我们有一个流量曲线,在一天中的几个小时内有更多的传入数据,但深夜流量下降,所有进入过期状态的对象都应该被清理,导致在流量再次上升之前,检查点大小应该减少而不是保持相同大小。
让我们看看一个用例的代码示例:
DataStream<Event> stream = addSource(source);
KeyedStream<Event, String> keyedStream = stream.filter((FilterFunction<Event>) event ->
apply filters here;))
.name("Events filtered")
.keyBy(k -> k.rType.equals("something") ? k.id1 : k.id2);
keyedStream.flatMap(new MyFlatMapFunction())
public class MyFlatMapFunction extends RichFlatMapFunction<Event, Event>{
private final MapStateDescriptor<String, Event> descriptor = new MapStateDescriptor<>("prev_state", String.class, Event.class);
private MapState<String, Event> previousState;
@Override
public void open(Configuration parameters) {
/*ttlConfig described above*/
descriptor.enableTimeToLive(ttlConfig);
previousState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void flatMap(Event event, Collector<Event> collector) throws Exception {
final String key = event.rType.equals("something") ? event.id1 : event.id2;
Event previous = previousState.get(key);
if(previous != null){
/*something done here*/
}else /*something done here*/
previousState.put(key, previous);
collector.collect(previous);
}
}
这些或多或少是用例的结构,还有一些使用 Windows(时间 Window 或会话 Window)
问题:
- 我做错了什么?
- 状态过期后会被清理吗?这个场景与其他用例相同吗?
- 如果检查点工作有误,什么可以帮助我修复检查点大小?
- 这种行为正常吗?
亲切的问候!
在这段代码中,您似乎只是在写回已经存在的状态,这仅用于重置 TTL 计时器。这或许可以解释为什么状态没有过期。
Event previous = previousState.get(key);
if (previous != null) {
/*something done here*/
} else
previousState.put(key, previous);
看来您应该使用 ValueState
而不是 MapState
。 ValueState 有效地提供了一个分片的 key/value 存储,其中的键是用于在 keyBy 中对流进行分区的键。 MapState 为您提供每个键的嵌套映射,而不是单个值。但是由于您在 flatMap 中使用的是与最初用于为流设置密钥的密钥相同的密钥,因此 key-partitioned ValueState 似乎就是您所需要的。
汇总到 this question 我仍然不清楚为什么我的 Flink 作业的检查点会随着时间的推移而增长,目前,在大约 7 天 运行 的时间里,这些检查点从未得到高原。 我目前使用的是Flink 1.10版本,FS State Backend因为我的工作负担不起使用RocksDB的延迟成本
查看检查点在 7 天内的演变:
public static final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot().build();
在我看来,所有进入状态的对象都将在到期时间后被清理,因此检查点的大小应该减少,正如我们所期望的,每天的数据量或多或少相同。
另一方面,我们有一个流量曲线,在一天中的几个小时内有更多的传入数据,但深夜流量下降,所有进入过期状态的对象都应该被清理,导致在流量再次上升之前,检查点大小应该减少而不是保持相同大小。
让我们看看一个用例的代码示例:
DataStream<Event> stream = addSource(source);
KeyedStream<Event, String> keyedStream = stream.filter((FilterFunction<Event>) event ->
apply filters here;))
.name("Events filtered")
.keyBy(k -> k.rType.equals("something") ? k.id1 : k.id2);
keyedStream.flatMap(new MyFlatMapFunction())
public class MyFlatMapFunction extends RichFlatMapFunction<Event, Event>{
private final MapStateDescriptor<String, Event> descriptor = new MapStateDescriptor<>("prev_state", String.class, Event.class);
private MapState<String, Event> previousState;
@Override
public void open(Configuration parameters) {
/*ttlConfig described above*/
descriptor.enableTimeToLive(ttlConfig);
previousState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void flatMap(Event event, Collector<Event> collector) throws Exception {
final String key = event.rType.equals("something") ? event.id1 : event.id2;
Event previous = previousState.get(key);
if(previous != null){
/*something done here*/
}else /*something done here*/
previousState.put(key, previous);
collector.collect(previous);
}
}
这些或多或少是用例的结构,还有一些使用 Windows(时间 Window 或会话 Window)
问题:
- 我做错了什么?
- 状态过期后会被清理吗?这个场景与其他用例相同吗?
- 如果检查点工作有误,什么可以帮助我修复检查点大小?
- 这种行为正常吗?
亲切的问候!
在这段代码中,您似乎只是在写回已经存在的状态,这仅用于重置 TTL 计时器。这或许可以解释为什么状态没有过期。
Event previous = previousState.get(key);
if (previous != null) {
/*something done here*/
} else
previousState.put(key, previous);
看来您应该使用 ValueState
而不是 MapState
。 ValueState 有效地提供了一个分片的 key/value 存储,其中的键是用于在 keyBy 中对流进行分区的键。 MapState 为您提供每个键的嵌套映射,而不是单个值。但是由于您在 flatMap 中使用的是与最初用于为流设置密钥的密钥相同的密钥,因此 key-partitioned ValueState 似乎就是您所需要的。