Flink processWindow 函数发出带有部分信息的记录
Flink processWindow function emits records with partial information
我们看到一些奇怪的行为,processWindow 函数发出两条记录,
第一条记录包含使用 window 中存在的聚合数据的完整信息,第二条记录包含部分信息,其中一些信息已从记录中删除。
processWindow 函数正在使用状态 (MapState),如下所示:
override def open(parameters: Configuration): Unit = {
cfState = getRuntimeContext.getMapState(
new MapStateDescriptor[(String, Int), mutable.Map[Int, mutable.Set[Int]]] (
"customFieldsState",
classOf[(String, Int)],
classOf[mutable.Map[Int, mutable.Set[Int]]]
)
)
}
并且 process
函数使用 window 中存在的记录来操纵上述状态。
这是反模式吗?在 processWindow
函数中使用状态?在 processWindow
函数中使用状态还有其他建议吗?
在这种情况下,我们需要维护状态,因为我们不会在单个 window 中捕获所有字段,我们需要聚合每个用户的记录,因此使用 window 函数.
谢谢
如果您想在单个 window 实例的生命周期之后保持状态,您应该使用
KeyedStateStore ProcessWindowFunction.Context#globalState
当 window 关闭时,所有其他状态都被清除。
由于 Flink 永远不会清除 globalState
,如果您的密钥会过时,您应该在您使用的状态描述符上设置状态 TTL,以避免随着时间的推移泄漏状态。
我们看到一些奇怪的行为,processWindow 函数发出两条记录, 第一条记录包含使用 window 中存在的聚合数据的完整信息,第二条记录包含部分信息,其中一些信息已从记录中删除。
processWindow 函数正在使用状态 (MapState),如下所示:
override def open(parameters: Configuration): Unit = {
cfState = getRuntimeContext.getMapState(
new MapStateDescriptor[(String, Int), mutable.Map[Int, mutable.Set[Int]]] (
"customFieldsState",
classOf[(String, Int)],
classOf[mutable.Map[Int, mutable.Set[Int]]]
)
)
}
并且 process
函数使用 window 中存在的记录来操纵上述状态。
这是反模式吗?在 processWindow
函数中使用状态?在 processWindow
函数中使用状态还有其他建议吗?
在这种情况下,我们需要维护状态,因为我们不会在单个 window 中捕获所有字段,我们需要聚合每个用户的记录,因此使用 window 函数.
谢谢
如果您想在单个 window 实例的生命周期之后保持状态,您应该使用
KeyedStateStore ProcessWindowFunction.Context#globalState
当 window 关闭时,所有其他状态都被清除。
由于 Flink 永远不会清除 globalState
,如果您的密钥会过时,您应该在您使用的状态描述符上设置状态 TTL,以避免随着时间的推移泄漏状态。