Flink 封顶 MapState

Flink capped MapState

背景

我们希望将最后的 n 个唯一 ID 保留在 Flink 运算符的状态中。 当 n+1 唯一 ID 到达时,我们希望保留它并删除状态中最旧的唯一 ID。这是为了避免不断增长的状态。

我们已经有了 TTL(过期时间)机制。大小限制是我们希望实施的另一个限制。

并非每个元素都拥有唯一的 ID。

问题

Flink有没有提供API限制state的元素个数?

尝试过的东西

  1. 使用 MapStateStateTtlConfig 生成的 TTL/expiration 机制。
  2. Window限制处理元素的数量,但不限制状态中的元素数量。

我认为 Flink 没有开箱即用的状态类型。 我能想到的最接近的事情是使用 ListState。使用 ListState,您可以像添加常规列表一样附加元素。

对于您的用例,您将读取状态,调用 .get(),这将为您提供一个可以迭代的可迭代对象,删除您想要删除的项目,然后将状态推回。

从性能的角度来看,迭代可能并不理想,但另一方面,与从磁盘访问状态(如果您使用 RocksDB 作为状态后端)相比,它并不重要,这会导致序列化和反序列化成本高

虽然没有直接提供,但您可以通过 MapState<Long, Event> 加上几个额外的 ValueState<Long> 值来实现这一点,以跟踪 MapState 中当前活动的索引范围。

随着事件的到来,做一些大致像这样的事情(但使用 Flink 状态而不是这个伪代码):

map[nextIndex++] = thisEvent;
if (nextIndex - oldestIndex > n) {
  map[oldestIndex++].clear();
}

我认为您可以通过实现 Java 的 LinkedHashMap

的某种“Flink 调整”版本来自己创建此功能

您必须记住,MapState 不完全是一个经典的 Java 地图,而是它的精心实现,可以进行状态管理。

本质上 - 我认为你可以使用 MapStateListState 的组合来实现这样的事情,遵循一些关于如何实现它的指南 - 这个例子中的一些东西:Java tips and tricks.: Queue Map Hybrid -- Creating Data Structures in Java (techtipsjava.blogspot.com)

关于 TTL - 我不确定删除密钥(因为它是 timed-out)会如何影响这种结构,必须仔细考虑。