Flink 如何为 MapState 中的所有项目设置 TTL?

How to set a TTL for all items in a MapState in Flink?

我想在给定时间戳清理 MapState 中的所有条目。

我正在考虑两种方法:

  1. ValueState 中保持 cleanup timestamp,为 cleanup timestamp 注册一个计时器,当计时器触发时清除 MapState。尽管 cleanup timestamp 可能相同,但添加到 MapState 的每个项目都会发生这种情况。我依靠 Flink 对定时器进行重复数据删除。
  2. 根据(cleanup timestamp - current timestamp计算TTL,使用StateTtlConfigMapState
  3. 设置TTL

哪种方法更好(性能、准确性等)? StateTtlConfig 是否适用于偶数时间处理?

如果您想在给定的时间戳清理所有记录,那么选项 1 显然应该更高效。通常在 flink 状态上使用 TTL 会增加额外的开销,因为每次访问 MapState 中的键时都会检查它。这取决于你如何使用状态(你打算在那里存储多少条记录,你访问它的频率以及状态的类型),但我会说如果你只是想删除所有记录给定时间戳,那么选项 1 是更好的主意。

我认为使用 timerService 来注册计时器会更好 performance.but 事件时间计时器仅在水印进入时触发,您还可以通过使用当前水印来安排这些计时器并将这些计时器与下一个水印合并。

val coalescedTime = ctx.timerService.currentWatermark + 1
ctx.timerService.registerEventTimeTimer(coalescedTime)

如果你的目的是同时清除MapState中的所有条目,那么我不会使用StateTtlConfig,因为Flink会花费8个字节来存储每个映射的计时器入口。这是很多不必要的存储开销。

使用StateTtlConfig,状态过期只能用处理时间来指定。

另请记住,StateTtlConfig 不能添加到现有状态描述符或从中删除。