Flink 封顶 MapState
Flink capped MapState
背景
我们希望将最后的 n
个唯一 ID 保留在 Flink 运算符的状态中。
当 n+1
唯一 ID 到达时,我们希望保留它并删除状态中最旧的唯一 ID。这是为了避免不断增长的状态。
我们已经有了 TTL(过期时间)机制。大小限制是我们希望实施的另一个限制。
并非每个元素都拥有唯一的 ID。
问题
Flink有没有提供API限制state的元素个数?
尝试过的东西
- 使用
MapState
和 StateTtlConfig
生成的 TTL/expiration 机制。
- Window限制处理元素的数量,但不限制状态中的元素数量。
我认为 Flink 没有开箱即用的状态类型。
我能想到的最接近的事情是使用 ListState。使用 ListState,您可以像添加常规列表一样附加元素。
对于您的用例,您将读取状态,调用 .get()
,这将为您提供一个可以迭代的可迭代对象,删除您想要删除的项目,然后将状态推回。
从性能的角度来看,迭代可能并不理想,但另一方面,与从磁盘访问状态(如果您使用 RocksDB 作为状态后端)相比,它并不重要,这会导致序列化和反序列化成本高
虽然没有直接提供,但您可以通过 MapState<Long, Event>
加上几个额外的 ValueState<Long>
值来实现这一点,以跟踪 MapState 中当前活动的索引范围。
随着事件的到来,做一些大致像这样的事情(但使用 Flink 状态而不是这个伪代码):
map[nextIndex++] = thisEvent;
if (nextIndex - oldestIndex > n) {
map[oldestIndex++].clear();
}
我认为您可以通过实现 Java 的 LinkedHashMap
的某种“Flink 调整”版本来自己创建此功能
您必须记住,MapState 不完全是一个经典的 Java 地图,而是它的精心实现,可以进行状态管理。
本质上 - 我认为你可以使用 MapState
和 ListState
的组合来实现这样的事情,遵循一些关于如何实现它的指南 - 这个例子中的一些东西:Java tips and tricks.: Queue Map Hybrid -- Creating Data Structures in Java (techtipsjava.blogspot.com)
关于 TTL - 我不确定删除密钥(因为它是 timed-out)会如何影响这种结构,必须仔细考虑。
背景
我们希望将最后的 n
个唯一 ID 保留在 Flink 运算符的状态中。
当 n+1
唯一 ID 到达时,我们希望保留它并删除状态中最旧的唯一 ID。这是为了避免不断增长的状态。
我们已经有了 TTL(过期时间)机制。大小限制是我们希望实施的另一个限制。
并非每个元素都拥有唯一的 ID。
问题
Flink有没有提供API限制state的元素个数?
尝试过的东西
- 使用
MapState
和StateTtlConfig
生成的 TTL/expiration 机制。 - Window限制处理元素的数量,但不限制状态中的元素数量。
我认为 Flink 没有开箱即用的状态类型。 我能想到的最接近的事情是使用 ListState。使用 ListState,您可以像添加常规列表一样附加元素。
对于您的用例,您将读取状态,调用 .get()
,这将为您提供一个可以迭代的可迭代对象,删除您想要删除的项目,然后将状态推回。
从性能的角度来看,迭代可能并不理想,但另一方面,与从磁盘访问状态(如果您使用 RocksDB 作为状态后端)相比,它并不重要,这会导致序列化和反序列化成本高
虽然没有直接提供,但您可以通过 MapState<Long, Event>
加上几个额外的 ValueState<Long>
值来实现这一点,以跟踪 MapState 中当前活动的索引范围。
随着事件的到来,做一些大致像这样的事情(但使用 Flink 状态而不是这个伪代码):
map[nextIndex++] = thisEvent;
if (nextIndex - oldestIndex > n) {
map[oldestIndex++].clear();
}
我认为您可以通过实现 Java 的 LinkedHashMap
您必须记住,MapState 不完全是一个经典的 Java 地图,而是它的精心实现,可以进行状态管理。
本质上 - 我认为你可以使用 MapState
和 ListState
的组合来实现这样的事情,遵循一些关于如何实现它的指南 - 这个例子中的一些东西:Java tips and tricks.: Queue Map Hybrid -- Creating Data Structures in Java (techtipsjava.blogspot.com)
关于 TTL - 我不确定删除密钥(因为它是 timed-out)会如何影响这种结构,必须仔细考虑。