我们可以更新状态的 TTL 值吗?

Can we update a state's TTL value?

我们有一个使用状态(ValueStateListState)和 TTL(StateTtlConfig) 因为我们不能使用计时器(我们每天会生成数亿个计时器,而且它确实可以扩展:savepoint/checkpoint 需要几个小时才能生成,)。

但是,我们需要根据某些传入事件的类型和其他逻辑在运行时更新 TTL 的值。使用新的 StateTtlConfig(和更新的 TTL 时间)重新创建新状态并在 CoProcessFunctionprocessElement1()processElement2() 方法中将值从“旧”复制到“新”是否可以? (而不是像我们通常那样在 open() 中一次) ?

我猜“旧”状态会被垃圾收集 (?)。

这个解决方案会扩展吗?表现出色?产生任何问题?有什么不好的吗?

I guess the "old" state would be garbage collected (?).

来自 Flink 文档 Cleanup of Expired State

By default, expired values are explicitly removed on read, such as ValueState#value, and periodically garbage collected in the background if supported by the configured state backend. Background cleanup can be disabled in the StateTtlConfig:

import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground()
    .build();

或在完整快照后执行清理:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();

您可以根据文档随时更改 TTL。但是,您必须重新启动查询(我在 运行 时间打喷嚏):

For existing jobs, this cleanup strategy can be activated or deactivated anytime in StateTtlConfig, e.g. after restart from savepoint.

但是你为什么不像大卫在参考答案中所说的那样在 RocksDB 上看到计时器?

我认为你的方法在某种程度上可以在运行时重新创建状态,但它很脆弱。我可以看到,问题是旧的状态元信息可能会在某个地方徘徊,具体取决于后端实现。

对于堆 (FS) 后端,最终 checkpoint/savepoint 将没有过期旧状态的记录,但元信息可以在作业 运行 时留在内存中。如果重新启动作业,它将消失。

对于RocksDB,旧状态的列族可以留存。此外,后台清理仅在压缩期间运行。如果 table 太小,比如在内存中的部分,这部分(甚至可能在磁盘上有一点)会残留。如果对完整快照的清理处于活动状态(不适用于增量检查点),它将在重启后消失。

总而言之,这取决于您创建新状态并从 savepoint/checkpoint 重新启动作业的频率。

我创建了一个 ticket 来记录可以在 TTL 配置中更改什么以及何时更改, 所以检查问题中的一些细节。