flink的broadcast state哪种使用方式比较好
Which way of using flink's broadcast state is better
使用flink 1.13.1版本
代码已经精简
我在我的项目中使用广播状态,它会每5分钟发送一些配置,因为一个处理函数只连接一个广播源,所以我定义了一个案例class以传递三种配置
案例class:
case class OrderConfBroadcastBean(orderRuleConfig: List[OrderInfoBean],
userSegmentInfo: Map[String, (String, String)],
lacciRegRel: Map[String, Set[String]])
和广播状态代码:
val orderConfBroadcast = env.addSource(new OrderConfSource(dbConfig, serverConfig.smsRuleRedis))
.name("order_conf_load")
.uid("order_conf_load")
.setParallelism(1)
.broadcast(new MapStateDescriptor[String, OrderConfBroadcastBean]("order_conf_broadcast", createTypeInformation[String], createTypeInformation[OrderConfBroadcastBean]))
我想知道在进程函数中使用广播状态的两种方式,哪一种是正确的,或者哪一种具有更好的性能和更低的内存占用,为什么
第一次使用:
class OrderFilterProcess(var userSegmentInfo: Map[String, (String, String)],
var orderInfo: List[OrderInfoBean],
redisConf: String,
var lacciRegRel: Map[String, Set[String]]) extends KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean] {
override def processElement(regLacci: RegLacciBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#ReadOnlyContext, out: Collector[OrderResultBean]): Unit = {
userSegmentInfo.get("xxx")
orderInfo.map(xxx)
}
override def processBroadcastElement(value: OrderConfBroadcastBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#Context, out: Collector[OrderResultBean]): Unit = {
if (value.orderRuleConfig.nonEmpty) {
orderInfo = value.orderRuleConfig
}
if (value.userSegmentInfo.nonEmpty) {
userSegmentInfo = value.userSegmentInfo
}
if (value.lacciRegRel.nonEmpty) {
lacciRegRel = value.lacciRegRel
}
}
}
第二种方式:
class OrderFilterProcess(var userSegmentInfo: Map[String, (String, String)],
var orderInfo: List[OrderInfoBean],
redisConf: String,
var lacciRegRel: Map[String, Set[String]]) extends KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean] {
val stateDescriptor = new MapStateDescriptor[String, OrderConfBroadcastBean]("order_conf_broadcast", createTypeInformation[String], createTypeInformation[OrderConfBroadcastBean])
override def processElement(regLacci: RegLacciBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#ReadOnlyContext, out: Collector[OrderResultBean]): Unit = {
val state = ctx.getBroadcastState(ruleStateDescriptor)
Option(state.get("order_state")).map(_.get("xxx")).orElse(userSegmentInfo.get("xxx"))
}
override def processBroadcastElement(value: OrderConfBroadcastBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#Context, out: Collector[OrderResultBean]): Unit = {
ctx.getBroadcastState(stateDescriptor).put("order_state", value);
}
}
这两种实现方式最大的区别在于,第一种方式是将从广播流接收到的数据存储到变量中,这些变量在作业失败时会丢失,而第二种方式是使用广播状态,它将被检查点和恢复。
第二版有一些开销。您必须测量它才能知道有多少——但在这两种情况下,数据都在内存中,因此差异应该不会很大。
使用flink 1.13.1版本
代码已经精简
我在我的项目中使用广播状态,它会每5分钟发送一些配置,因为一个处理函数只连接一个广播源,所以我定义了一个案例class以传递三种配置
案例class:
case class OrderConfBroadcastBean(orderRuleConfig: List[OrderInfoBean],
userSegmentInfo: Map[String, (String, String)],
lacciRegRel: Map[String, Set[String]])
和广播状态代码:
val orderConfBroadcast = env.addSource(new OrderConfSource(dbConfig, serverConfig.smsRuleRedis))
.name("order_conf_load")
.uid("order_conf_load")
.setParallelism(1)
.broadcast(new MapStateDescriptor[String, OrderConfBroadcastBean]("order_conf_broadcast", createTypeInformation[String], createTypeInformation[OrderConfBroadcastBean]))
我想知道在进程函数中使用广播状态的两种方式,哪一种是正确的,或者哪一种具有更好的性能和更低的内存占用,为什么
第一次使用:
class OrderFilterProcess(var userSegmentInfo: Map[String, (String, String)],
var orderInfo: List[OrderInfoBean],
redisConf: String,
var lacciRegRel: Map[String, Set[String]]) extends KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean] {
override def processElement(regLacci: RegLacciBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#ReadOnlyContext, out: Collector[OrderResultBean]): Unit = {
userSegmentInfo.get("xxx")
orderInfo.map(xxx)
}
override def processBroadcastElement(value: OrderConfBroadcastBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#Context, out: Collector[OrderResultBean]): Unit = {
if (value.orderRuleConfig.nonEmpty) {
orderInfo = value.orderRuleConfig
}
if (value.userSegmentInfo.nonEmpty) {
userSegmentInfo = value.userSegmentInfo
}
if (value.lacciRegRel.nonEmpty) {
lacciRegRel = value.lacciRegRel
}
}
}
第二种方式:
class OrderFilterProcess(var userSegmentInfo: Map[String, (String, String)],
var orderInfo: List[OrderInfoBean],
redisConf: String,
var lacciRegRel: Map[String, Set[String]]) extends KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean] {
val stateDescriptor = new MapStateDescriptor[String, OrderConfBroadcastBean]("order_conf_broadcast", createTypeInformation[String], createTypeInformation[OrderConfBroadcastBean])
override def processElement(regLacci: RegLacciBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#ReadOnlyContext, out: Collector[OrderResultBean]): Unit = {
val state = ctx.getBroadcastState(ruleStateDescriptor)
Option(state.get("order_state")).map(_.get("xxx")).orElse(userSegmentInfo.get("xxx"))
}
override def processBroadcastElement(value: OrderConfBroadcastBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#Context, out: Collector[OrderResultBean]): Unit = {
ctx.getBroadcastState(stateDescriptor).put("order_state", value);
}
}
这两种实现方式最大的区别在于,第一种方式是将从广播流接收到的数据存储到变量中,这些变量在作业失败时会丢失,而第二种方式是使用广播状态,它将被检查点和恢复。
第二版有一些开销。您必须测量它才能知道有多少——但在这两种情况下,数据都在内存中,因此差异应该不会很大。