Flink ValueState 在使用 Rocksdb 时会在过期后从存储中移除?
Flink ValueState will be removed from storage after expired when using Rocksdb?
- 我正在使用带有 rocksdb 后端的 Flink 版本 1.10.1。
- 我知道 rocksdb 使用来自 "managed memory" 的内存,我没有为托管内存设置任何特定值。由 Flink 完成。
- 当我监控我的应用程序时,任务管理器的可用内存总是在减少(我的意思是通过
free -h
测量的操作系统的可用内存)。我怀疑原因可能是Rocksdb。
- Question_1 => 如果
ValueState
的值过期,那么 rocksdb 会从内存中删除并从本地存储目录中删除? (我的存储容量也有限)
- Question_2 =>
stream.keyBy(ipAddress)
,如果这个 ipAddress
将由 rocksdb 持有(我说的是 keyBy 本身而不是状态),它是否总是放在托管内存中?如果不是,那么flink heap内存会增加吗?
这是我的应用程序的一般结构:
streamA = source.filter(..);
streamA2 = source2.filter(..);
streamB = streamA.keyBy(ipAddr).window().process(); // contains value state
streamC = streamA.keyBy(ipAddr).flatMap(..); // contains value state
streamD = streamA2.keyBy(ipAddr).window.process(); // contains value state
streamE = streamA.union(streamA2).keyBy(ipAddr)....
这是我的应用程序中的状态示例:
private transient ValueState<SampleObject> sampleState;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<SampleObject> sampleValueStateDescriptor = new ValueStateDescriptor<>(
"sampleState",
TypeInformation.of(SampleObject.class)
);
sampleValueStateDescriptor.enableTimeToLive(ttlConfig);
Rocksdb 配置:
state.backend: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 6
state.backend.rocksdb.localdir: /pathTo/checkpoint_only_local
为什么我使用 Rocksdb
- 我正在使用 rocksdb,因为我有一个巨大的密钥大小(想想它的 ip 地址),HeapState 后端或其他无法处理。
- 我的应用程序使用 rocksdb,因为我在用户定义的 keyedprocessfunction 中有一堆状态供将来决定。 (每个状态都有 `StateTtlConfig)
备注
- 我的应用程序不需要增量检查点或任何关于保存点的东西。我不关心保存我的应用程序的所有快照。
Flink ValueState will be removed from storage after expired when using
Rocksdb?
是的,但不是马上。 (在 Flink 的一些早期版本中,答案是 "it depends"。)
在您的状态 ttl 配置中,您没有指定您希望如何完成状态清理。在这种情况下,过期值在读取时被显式删除(例如 ValueState#value
),否则会在后台定期进行垃圾回收。对于 RocksDB,这种后台清理是在压缩期间完成的。换句话说,清理不是立即进行的。 docs 提供了有关如何调整它的更多详细信息 -- 您可以将清理配置为更快地完成,但会降低一些性能。
keyBy 本身不使用任何状态。键选择器函数用于对流进行分区,但键不与keyBy连接存储。只有 windows 和平面图操作保持状态,这是每个键的状态,所有这些键控状态都将在 RocksDB 中(除非您已将计时器配置在堆上,这是一个选项,在但 Flink 1.10 计时器默认存储在堆外,在 rocksdb 中。
您可以将 flatmap
更改为 KeyedProcessFunction
并使用计时器明确清除状态键的状态——这将使您可以直接控制状态何时被清除,而不是依赖关于状态TTL机制最终清除状态。
但更有可能的是 windows 正在建立相当大的状态。如果您可以切换到进行预聚合(通过 reduce
或 aggregate
),这可能会有很大帮助。
- 我正在使用带有 rocksdb 后端的 Flink 版本 1.10.1。
- 我知道 rocksdb 使用来自 "managed memory" 的内存,我没有为托管内存设置任何特定值。由 Flink 完成。
- 当我监控我的应用程序时,任务管理器的可用内存总是在减少(我的意思是通过
free -h
测量的操作系统的可用内存)。我怀疑原因可能是Rocksdb。 - Question_1 => 如果
ValueState
的值过期,那么 rocksdb 会从内存中删除并从本地存储目录中删除? (我的存储容量也有限) - Question_2 =>
stream.keyBy(ipAddress)
,如果这个ipAddress
将由 rocksdb 持有(我说的是 keyBy 本身而不是状态),它是否总是放在托管内存中?如果不是,那么flink heap内存会增加吗?
这是我的应用程序的一般结构:
streamA = source.filter(..);
streamA2 = source2.filter(..);
streamB = streamA.keyBy(ipAddr).window().process(); // contains value state
streamC = streamA.keyBy(ipAddr).flatMap(..); // contains value state
streamD = streamA2.keyBy(ipAddr).window.process(); // contains value state
streamE = streamA.union(streamA2).keyBy(ipAddr)....
这是我的应用程序中的状态示例:
private transient ValueState<SampleObject> sampleState;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<SampleObject> sampleValueStateDescriptor = new ValueStateDescriptor<>(
"sampleState",
TypeInformation.of(SampleObject.class)
);
sampleValueStateDescriptor.enableTimeToLive(ttlConfig);
Rocksdb 配置:
state.backend: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 6
state.backend.rocksdb.localdir: /pathTo/checkpoint_only_local
为什么我使用 Rocksdb
- 我正在使用 rocksdb,因为我有一个巨大的密钥大小(想想它的 ip 地址),HeapState 后端或其他无法处理。
- 我的应用程序使用 rocksdb,因为我在用户定义的 keyedprocessfunction 中有一堆状态供将来决定。 (每个状态都有 `StateTtlConfig)
备注
- 我的应用程序不需要增量检查点或任何关于保存点的东西。我不关心保存我的应用程序的所有快照。
Flink ValueState will be removed from storage after expired when using Rocksdb?
是的,但不是马上。 (在 Flink 的一些早期版本中,答案是 "it depends"。)
在您的状态 ttl 配置中,您没有指定您希望如何完成状态清理。在这种情况下,过期值在读取时被显式删除(例如 ValueState#value
),否则会在后台定期进行垃圾回收。对于 RocksDB,这种后台清理是在压缩期间完成的。换句话说,清理不是立即进行的。 docs 提供了有关如何调整它的更多详细信息 -- 您可以将清理配置为更快地完成,但会降低一些性能。
keyBy 本身不使用任何状态。键选择器函数用于对流进行分区,但键不与keyBy连接存储。只有 windows 和平面图操作保持状态,这是每个键的状态,所有这些键控状态都将在 RocksDB 中(除非您已将计时器配置在堆上,这是一个选项,在但 Flink 1.10 计时器默认存储在堆外,在 rocksdb 中。
您可以将 flatmap
更改为 KeyedProcessFunction
并使用计时器明确清除状态键的状态——这将使您可以直接控制状态何时被清除,而不是依赖关于状态TTL机制最终清除状态。
但更有可能的是 windows 正在建立相当大的状态。如果您可以切换到进行预聚合(通过 reduce
或 aggregate
),这可能会有很大帮助。