Apache flink 广播状态被刷新
Apache flink broadcast state gets flushed
我正在使用广播模式连接两个流并从一个流读取数据到另一个流。代码如下所示
case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{
override def processBroadcastElement(in2: (String, Double),
context: BroadcastProcessFunction[MyObject, (String, Double), MyObject]#Context,
collector:Collector[MyObject]):Unit={
context.getBroadcastState(broadcastStateDescriptor).put(in2._1,in2._2)
}
override def processElement(obj: MyObject,
readOnlyContext:BroadCastProcessFunction[MyObject, (String,Double),
MyObject]#ReadOnlyContext, collector: Collector[MyObject]):Unit={
val theValue = readOnlyContext.getBroadccastState(broadcastStateDesriptor).get(obj.prop)
//If I print the context of the state here sometimes it is empty.
out.collect(MyObject(new, properties, go, here))
}
}
状态描述符:
val broadcastStateDescriptor: MapStateDescriptor[String, Double) = new MapStateDescriptor[String, Double]("name_for_this", classOf[String], classOf[Double])
我的执行代码是这样的。
val streamA :DataStream[MyObject] = ...
val streamB :DataStream[(String,Double)] = ...
val broadcastedStream = streamB.broadcast(broadcastStateDescriptor)
streamA.connect(streamB).process(new Broadcast)
问题出在 processElement
函数中,状态有时为空有时不是。状态应该始终包含数据,因为我不断地从一个我知道它有数据的文件中流式传输。我不明白为什么它会刷新状态,我无法获取数据。
我试着在processBroadcastElement
把数据放到状态前后加了一些打印,结果如下
0 - 1
1 - 2
2 - 3
.. all the way to 48 where it resets back to 0
更新:
我注意到的是,当我降低流式执行上下文的超时值时,结果会好一些。当我增加它时,地图总是空的。
env.setBufferTimeout(1) //better results
env.setBufferTimeout(200) //worse result (default is 100)
每当 Flink 中连接两个流时,您无法控制 Flink 将事件从两个流传递到您的用户函数的时间。因此,例如,如果有一个事件可从 streaA 处理,并且有一个事件可从 streamB 处理,那么下一个可能会被处理。您不能期望 broadcastedStream 以某种方式优先于其他流。
根据您的要求,您可以采用多种策略来应对这两个流之间的竞争。例如,您可以使用 KeyedBroadcastProcessFunction 并使用其 applyToKeyedState 方法在新广播事件到达时遍历所有现有键控状态。
正如 David 提到的那样,作业可能正在重新启动。我禁用了检查点,这样我就可以看到任何可能抛出的异常,而不是 flink 默默地失败并重新启动作业。
原来是在尝试解析文件时出错。所以作业不断重启,因此状态为空,flink 一遍又一遍地消耗流。
我正在使用广播模式连接两个流并从一个流读取数据到另一个流。代码如下所示
case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{
override def processBroadcastElement(in2: (String, Double),
context: BroadcastProcessFunction[MyObject, (String, Double), MyObject]#Context,
collector:Collector[MyObject]):Unit={
context.getBroadcastState(broadcastStateDescriptor).put(in2._1,in2._2)
}
override def processElement(obj: MyObject,
readOnlyContext:BroadCastProcessFunction[MyObject, (String,Double),
MyObject]#ReadOnlyContext, collector: Collector[MyObject]):Unit={
val theValue = readOnlyContext.getBroadccastState(broadcastStateDesriptor).get(obj.prop)
//If I print the context of the state here sometimes it is empty.
out.collect(MyObject(new, properties, go, here))
}
}
状态描述符:
val broadcastStateDescriptor: MapStateDescriptor[String, Double) = new MapStateDescriptor[String, Double]("name_for_this", classOf[String], classOf[Double])
我的执行代码是这样的。
val streamA :DataStream[MyObject] = ...
val streamB :DataStream[(String,Double)] = ...
val broadcastedStream = streamB.broadcast(broadcastStateDescriptor)
streamA.connect(streamB).process(new Broadcast)
问题出在 processElement
函数中,状态有时为空有时不是。状态应该始终包含数据,因为我不断地从一个我知道它有数据的文件中流式传输。我不明白为什么它会刷新状态,我无法获取数据。
我试着在processBroadcastElement
把数据放到状态前后加了一些打印,结果如下
0 - 1
1 - 2
2 - 3
.. all the way to 48 where it resets back to 0
更新: 我注意到的是,当我降低流式执行上下文的超时值时,结果会好一些。当我增加它时,地图总是空的。
env.setBufferTimeout(1) //better results
env.setBufferTimeout(200) //worse result (default is 100)
每当 Flink 中连接两个流时,您无法控制 Flink 将事件从两个流传递到您的用户函数的时间。因此,例如,如果有一个事件可从 streaA 处理,并且有一个事件可从 streamB 处理,那么下一个可能会被处理。您不能期望 broadcastedStream 以某种方式优先于其他流。
根据您的要求,您可以采用多种策略来应对这两个流之间的竞争。例如,您可以使用 KeyedBroadcastProcessFunction 并使用其 applyToKeyedState 方法在新广播事件到达时遍历所有现有键控状态。
正如 David 提到的那样,作业可能正在重新启动。我禁用了检查点,这样我就可以看到任何可能抛出的异常,而不是 flink 默默地失败并重新启动作业。
原来是在尝试解析文件时出错。所以作业不断重启,因此状态为空,flink 一遍又一遍地消耗流。