Flink ValueState 在使用 Rocksdb 时会在过期后从存储中移除?

Flink ValueState will be removed from storage after expired when using Rocksdb?

这是我的应用程序的一般结构:

streamA = source.filter(..);
streamA2 = source2.filter(..);
streamB = streamA.keyBy(ipAddr).window().process(); // contains value state
streamC = streamA.keyBy(ipAddr).flatMap(..); // contains value state
streamD = streamA2.keyBy(ipAddr).window.process(); // contains value state
streamE = streamA.union(streamA2).keyBy(ipAddr)....

这是我的应用程序中的状态示例:

 private transient ValueState<SampleObject> sampleState;
 StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(10))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        ValueStateDescriptor<SampleObject> sampleValueStateDescriptor = new ValueStateDescriptor<>(
                "sampleState",
                TypeInformation.of(SampleObject.class)
        );
        sampleValueStateDescriptor.enableTimeToLive(ttlConfig);

Rocksdb 配置:

state.backend: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 6
state.backend.rocksdb.localdir: /pathTo/checkpoint_only_local

为什么我使用 Rocksdb

备注

Flink ValueState will be removed from storage after expired when using Rocksdb?

是的,但不是马上。 (在 Flink 的一些早期版本中,答案是 "it depends"。)

在您的状态 ttl 配置中,您没有指定您希望如何完成状态清理。在这种情况下,过期值在读取时被显式删除(例如 ValueState#value),否则会在后台定期进行垃圾回收。对于 RocksDB,这种后台清理是在压缩期间完成的。换句话说,清理不是立即进行的。 docs 提供了有关如何调整它的更多详细信息 -- 您可以将清理配置为更快地完成,但会降低一些性能。

keyBy 本身不使用任何状态。键选择器函数用于对流进行分区,但键不与keyBy连接存储。只有 windows 和平面图操作保持状态,这是每个键的状态,所有这些键控状态都将在 RocksDB 中(除非您已将计时器配置在堆上,这是一个选项,在但 Flink 1.10 计时器默认存储在堆外,在 rocksdb 中。

您可以将 flatmap 更改为 KeyedProcessFunction 并使用计时器明确清除状态键的状态——这将使您可以直接控制状态何时被清除,而不是依赖关于状态TTL机制最终清除状态。

但更有可能的是 windows 正在建立相当大的状态。如果您可以切换到进行预聚合(通过 reduceaggregate),这可能会有很大帮助。