Flink window 状态大小和状态管理

Flink window state size and state management

在阅读了 flink 的文档并四处搜索之后,我无法完全理解 flink 如何处理其 windows 中的状态。 假设我有一个每小时翻滚 window 的聚合函数,该函数将消息累积到一些 java pojo 或 scala 案例 class 中。 window 的大小是否会与一小时内进入 window 的事件数相关联,还是仅与 pojo/case class 相关联,因为我将事件累积到该对象中。 (例如,如果将 10000 条消息计入一个整数,大小会接近 10000 * 消息大小还是 int 的大小?) 此外,如果我使用 pojos 或 case classes,flink 是否会为我处理状态(如果内存 exhausted/saves 状态在检查点等处溢出到磁盘),还是我必须为此使用 flink 的状态对象?

感谢您的帮助!

window 的状态大小取决于您应用的函数类型。如果您应用 ReduceFunctionAggregateFunction,到达的数据会立即聚合,而 window 仅保存聚合值。如果应用ProcessWindowFunctionWindowFunction,Flink会收集所有输入记录并在时间(事件或处理时间取决于window类型)超过window时应用该功能'结束时间。

您还可以组合两种类型的函数,即 AggregateFunction 后跟 ProcessWindowFunction。在这种情况下,到达的记录会立即聚合,当 window 关闭时,聚合结果作为单个值传递给 ProcessWindowFunction。这很有用,因为您有增量聚合(由于 ReduceFunction / AggregateFunction),而且还可以访问 window 元数据,如开始和结束时间戳(由于 ProcessWindowFunction)。

如何管理状态取决于所选的状态后端。如果您配置 FsStateBackend,则所有本地状态都保留在 TaskManager 的堆上,如果状态变得太大,JVM 进程将被 OutOfMemoryError 杀死。如果配置 RocksDBStateBackend 状态会溢出到磁盘。这伴随着每个状态访问的 de/serialization 成本,但为状态提供了更多的存储空间。