Flink window 状态大小和状态管理
Flink window state size and state management
在阅读了 flink 的文档并四处搜索之后,我无法完全理解 flink 如何处理其 windows 中的状态。
假设我有一个每小时翻滚 window 的聚合函数,该函数将消息累积到一些 java pojo 或 scala 案例 class 中。
window 的大小是否会与一小时内进入 window 的事件数相关联,还是仅与 pojo/case class 相关联,因为我将事件累积到该对象中。 (例如,如果将 10000 条消息计入一个整数,大小会接近 10000 * 消息大小还是 int 的大小?)
此外,如果我使用 pojos 或 case classes,flink 是否会为我处理状态(如果内存 exhausted/saves 状态在检查点等处溢出到磁盘),还是我必须为此使用 flink 的状态对象?
感谢您的帮助!
window 的状态大小取决于您应用的函数类型。如果您应用 ReduceFunction
或 AggregateFunction
,到达的数据会立即聚合,而 window 仅保存聚合值。如果应用ProcessWindowFunction
或WindowFunction
,Flink会收集所有输入记录并在时间(事件或处理时间取决于window类型)超过window时应用该功能'结束时间。
您还可以组合两种类型的函数,即 AggregateFunction
后跟 ProcessWindowFunction
。在这种情况下,到达的记录会立即聚合,当 window 关闭时,聚合结果作为单个值传递给 ProcessWindowFunction
。这很有用,因为您有增量聚合(由于 ReduceFunction
/ AggregateFunction
),而且还可以访问 window 元数据,如开始和结束时间戳(由于 ProcessWindowFunction
)。
如何管理状态取决于所选的状态后端。如果您配置 FsStateBackend
,则所有本地状态都保留在 TaskManager 的堆上,如果状态变得太大,JVM 进程将被 OutOfMemoryError
杀死。如果配置 RocksDBStateBackend
状态会溢出到磁盘。这伴随着每个状态访问的 de/serialization 成本,但为状态提供了更多的存储空间。
在阅读了 flink 的文档并四处搜索之后,我无法完全理解 flink 如何处理其 windows 中的状态。 假设我有一个每小时翻滚 window 的聚合函数,该函数将消息累积到一些 java pojo 或 scala 案例 class 中。 window 的大小是否会与一小时内进入 window 的事件数相关联,还是仅与 pojo/case class 相关联,因为我将事件累积到该对象中。 (例如,如果将 10000 条消息计入一个整数,大小会接近 10000 * 消息大小还是 int 的大小?) 此外,如果我使用 pojos 或 case classes,flink 是否会为我处理状态(如果内存 exhausted/saves 状态在检查点等处溢出到磁盘),还是我必须为此使用 flink 的状态对象?
感谢您的帮助!
window 的状态大小取决于您应用的函数类型。如果您应用 ReduceFunction
或 AggregateFunction
,到达的数据会立即聚合,而 window 仅保存聚合值。如果应用ProcessWindowFunction
或WindowFunction
,Flink会收集所有输入记录并在时间(事件或处理时间取决于window类型)超过window时应用该功能'结束时间。
您还可以组合两种类型的函数,即 AggregateFunction
后跟 ProcessWindowFunction
。在这种情况下,到达的记录会立即聚合,当 window 关闭时,聚合结果作为单个值传递给 ProcessWindowFunction
。这很有用,因为您有增量聚合(由于 ReduceFunction
/ AggregateFunction
),而且还可以访问 window 元数据,如开始和结束时间戳(由于 ProcessWindowFunction
)。
如何管理状态取决于所选的状态后端。如果您配置 FsStateBackend
,则所有本地状态都保留在 TaskManager 的堆上,如果状态变得太大,JVM 进程将被 OutOfMemoryError
杀死。如果配置 RocksDBStateBackend
状态会溢出到磁盘。这伴随着每个状态访问的 de/serialization 成本,但为状态提供了更多的存储空间。