Flink 的广播状态行为
Flink's broadcast state behavior
我想用一个简单的案例来玩一下flink的广播状态
我只是想将一个整数流乘以另一个整数来生成一个广播流。
我的广播的行为是 "weird",如果我在输入流中放置的元素太少(例如 10 个),则什么也不会发生并且我的 MapState
为空,但如果我放置更多元素(比如 100)我有我想要的行为(在这里将整数流乘以 2)。
如果我提供的元素太少,为什么不考虑广播流?
如何控制广播流何时工作?
可选:我只想保留广播流的最后一个元素,.clear()
是好方法吗?
谢谢!
这是我的 BroadcastProcessFunction
:
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConversions._
class BroadcastProcess extends BroadcastProcessFunction[Int, Int, Int] {
override def processElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#ReadOnlyContext, out: Collector[Int]) = {
val currentBroadcastState = ctx.getBroadcastState(State.mapState).immutableEntries()
if (currentBroadcastState.isEmpty) {
out.collect(value)
} else {
out.collect(currentBroadcastState.last.getValue * value)
}
}
override def processBroadcastElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#Context, out: Collector[Int]) = {
// Keep only last state
ctx.getBroadcastState(State.mapState).clear()
// Add state
ctx.getBroadcastState(State.mapState).put("key", value)
}
}
还有我的MapState
:
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.scala._
object State {
val mapState: MapStateDescriptor[String, Int] =
new MapStateDescriptor(
"State",
createTypeInformation[String],
createTypeInformation[Int]
)
}
还有我的 Main
:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object Broadcast {
def main(args: Array[String]): Unit = {
val numberElements = 100
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val broadcastStream = env.fromElements(2).broadcast(State.mapState)
val input = (1 to numberElements).toList
val inputStream = env.fromCollection(input)
val outputStream = inputStream
.connect(broadcastStream)
.process(new BroadcastProcess())
outputStream.print()
env.execute()
}
}
编辑:我使用 Flink 1.5,Broadcast State 文档是 here。
Flink 不会同步流的摄取,即流会尽快生成数据。这适用于常规和广播输入。 BroadcastProcess
不会等待第一个广播输入到达才接收常规输入。
当您将更多数字放入常规输入时,序列化、反序列化和提供输入所需的时间会更长,以便在第一个常规数字到达时广播输入已经存在。
我想用一个简单的案例来玩一下flink的广播状态
我只是想将一个整数流乘以另一个整数来生成一个广播流。
我的广播的行为是 "weird",如果我在输入流中放置的元素太少(例如 10 个),则什么也不会发生并且我的 MapState
为空,但如果我放置更多元素(比如 100)我有我想要的行为(在这里将整数流乘以 2)。
如果我提供的元素太少,为什么不考虑广播流?
如何控制广播流何时工作?
可选:我只想保留广播流的最后一个元素,.clear()
是好方法吗?
谢谢!
这是我的 BroadcastProcessFunction
:
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConversions._
class BroadcastProcess extends BroadcastProcessFunction[Int, Int, Int] {
override def processElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#ReadOnlyContext, out: Collector[Int]) = {
val currentBroadcastState = ctx.getBroadcastState(State.mapState).immutableEntries()
if (currentBroadcastState.isEmpty) {
out.collect(value)
} else {
out.collect(currentBroadcastState.last.getValue * value)
}
}
override def processBroadcastElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#Context, out: Collector[Int]) = {
// Keep only last state
ctx.getBroadcastState(State.mapState).clear()
// Add state
ctx.getBroadcastState(State.mapState).put("key", value)
}
}
还有我的MapState
:
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.scala._
object State {
val mapState: MapStateDescriptor[String, Int] =
new MapStateDescriptor(
"State",
createTypeInformation[String],
createTypeInformation[Int]
)
}
还有我的 Main
:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object Broadcast {
def main(args: Array[String]): Unit = {
val numberElements = 100
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val broadcastStream = env.fromElements(2).broadcast(State.mapState)
val input = (1 to numberElements).toList
val inputStream = env.fromCollection(input)
val outputStream = inputStream
.connect(broadcastStream)
.process(new BroadcastProcess())
outputStream.print()
env.execute()
}
}
编辑:我使用 Flink 1.5,Broadcast State 文档是 here。
Flink 不会同步流的摄取,即流会尽快生成数据。这适用于常规和广播输入。 BroadcastProcess
不会等待第一个广播输入到达才接收常规输入。
当您将更多数字放入常规输入时,序列化、反序列化和提供输入所需的时间会更长,以便在第一个常规数字到达时广播输入已经存在。