关于 Flink States 的 TTL 配置

About TTL configuration for States in Flink

让我们假设我有一个描述符的配置,并且从这里采取了行动:

ValueStateDescriptor<Event> descriptor = ...;

StateTtlConfig ttlConfigOneHourAndReturnExpire = StateTtlConfig.newBuilder(Time.hours(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();

descriptor.enableTimeToLive(ttlConfigOneHourAndReturnExpire);

/*after one hour when the state is expired*/
Event e = state.value(); (step 1 and 2)
e.count = e.count + 1; (step 3)
value.update(e); (step 4)

这是否意味着在 1 小时后状态已被弃用,事情将按以下顺序发生:

  1. Return state besides 中记录的先前状态已弃用。
  2. 记录的先前状态将在读取后清除。
  3. 在传递并清除先前状态(读取)后更新对象。
  4. 在这种情况下更新状态将意味着再次创建状态,因为之前的状态已经被删除,这个值将再花费一个小时,或者状态将在此时而不是在点 1 和对象中被清除不会包括更​​新,它会在到达时存储在状态中吗?

希望我能解释一下,因为文档对我来说不是很清楚。

从我需要在一天发生变化时清理状态这一点开始,并且无法使用 TTL 来做到这一点,我想在每小时后清理状态但在删除之前获取状态,更新当前值,然后再创建一个小时的状态,但在丢失之前始终保持之前的状态。

希望这有意义并且可以以某种方式实现。 亲切的问候!

如果您需要每小时操作一次状态,请创建自定义 ProcessFunction 并使用计时器触发该操作。