Flink 中的检查点随时间增加

Checkpoints increasing over time in Flink

汇总到 this question 我仍然不清楚为什么我的 Flink 作业的检查点会随着时间的推移而增长,目前,在大约 7 天 运行 的时间里,这些检查点从未得到高原。 我目前使用的是Flink 1.10版本,FS State Backend因为我的工作负担不起使用RocksDB的延迟成本

查看检查点在 7 天内的演变: 假设我在所有有状态运算符中为状态的 TTL 配置了一个小时或更长时间,在一种情况下为一天:

public static final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupFullSnapshot().build();

在我看来,所有进入状态的对象都将在到期时间后被清理,因此检查点的大小应该减少,正如我们所期望的,每天的数据量或多或少相同。

另一方面,我们有一个流量曲线,在一天中的几个小时内有更多的传入数据,但深夜流量下降,所有进入过期状态的对象都应该被清理,导致在流量再次上升之前,检查点大小应该减少而不是保持相同大小。

让我们看看一个用例的代码示例:

DataStream<Event> stream = addSource(source);
KeyedStream<Event, String> keyedStream = stream.filter((FilterFunction<Event>) event ->
                    apply filters here;))
                    .name("Events filtered")
                    .keyBy(k -> k.rType.equals("something") ? k.id1 : k.id2);
keyedStream.flatMap(new MyFlatMapFunction())


public class MyFlatMapFunction extends RichFlatMapFunction<Event, Event>{
private final MapStateDescriptor<String, Event> descriptor = new MapStateDescriptor<>("prev_state", String.class, Event.class);
private MapState<String, Event> previousState;

@Override
    public void open(Configuration parameters) {
        /*ttlConfig described above*/
        descriptor.enableTimeToLive(ttlConfig);
        previousState = getRuntimeContext().getMapState(descriptor);
    }

@Override
    public void flatMap(Event event, Collector<Event> collector) throws Exception {
      final String key = event.rType.equals("something") ? event.id1 : event.id2;
      Event previous = previousState.get(key);
      if(previous != null){
        /*something done here*/
      }else /*something done here*/
        previousState.put(key, previous);
        collector.collect(previous);
 }
}

这些或多或少是用例的结构,还有一些使用 Windows(时间 Window 或会话 Window)

问题:

亲切的问候!

在这段代码中,您似乎只是在写回已经存在的状态,这仅用于重置 TTL 计时器。这或许可以解释为什么状态没有过期。

Event previous = previousState.get(key);
if (previous != null) {
  /*something done here*/
} else
  previousState.put(key, previous);

看来您应该使用 ValueState 而不是 MapState。 ValueState 有效地提供了一个分片的 key/value 存储,其中的键是用于在 keyBy 中对流进行分区的键。 MapState 为您提供每个键的嵌套映射,而不是单个值。但是由于您在 flatMap 中使用的是与最初用于为流设置密钥的密钥相同的密钥,因此 key-partitioned ValueState 似乎就是您所需要的。