flink的broadcast state哪种使用方式比较好

Which way of using flink's broadcast state is better

使用flink 1.13.1版本

代码已经精简

我在我的项目中使用广播状态,它会每5分钟发送一些配置,因为一个处理函数只连接一个广播源,所以我定义了一个案例class以传递三种配置

案例class:

case class OrderConfBroadcastBean(orderRuleConfig: List[OrderInfoBean],
                                  userSegmentInfo: Map[String, (String, String)],
                                  lacciRegRel: Map[String, Set[String]])

和广播状态代码:

    val orderConfBroadcast = env.addSource(new OrderConfSource(dbConfig, serverConfig.smsRuleRedis))
      .name("order_conf_load")
      .uid("order_conf_load")
      .setParallelism(1)
      .broadcast(new MapStateDescriptor[String, OrderConfBroadcastBean]("order_conf_broadcast", createTypeInformation[String], createTypeInformation[OrderConfBroadcastBean]))

我想知道在进程函数中使用广播状态的两种方式,哪一种是正确的,或者哪一种具有更好的性能和更低的内存占用,为什么

第一次使用:

class OrderFilterProcess(var userSegmentInfo: Map[String, (String, String)],
                         var orderInfo: List[OrderInfoBean],
                         redisConf: String,
                         var lacciRegRel: Map[String, Set[String]]) extends KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean] {

  override def processElement(regLacci: RegLacciBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#ReadOnlyContext, out: Collector[OrderResultBean]): Unit = {
    userSegmentInfo.get("xxx")
    orderInfo.map(xxx)
  }

  override def processBroadcastElement(value: OrderConfBroadcastBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#Context, out: Collector[OrderResultBean]): Unit = {
    if (value.orderRuleConfig.nonEmpty) {
      orderInfo = value.orderRuleConfig
    }
    if (value.userSegmentInfo.nonEmpty) {
      userSegmentInfo = value.userSegmentInfo
    }
    if (value.lacciRegRel.nonEmpty) {
      lacciRegRel = value.lacciRegRel
    }
  }
}

第二种方式:

class OrderFilterProcess(var userSegmentInfo: Map[String, (String, String)],
                         var orderInfo: List[OrderInfoBean],
                         redisConf: String,
                         var lacciRegRel: Map[String, Set[String]]) extends KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean] {

  val stateDescriptor = new MapStateDescriptor[String, OrderConfBroadcastBean]("order_conf_broadcast", createTypeInformation[String], createTypeInformation[OrderConfBroadcastBean])

  override def processElement(regLacci: RegLacciBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#ReadOnlyContext, out: Collector[OrderResultBean]): Unit = {
    val state = ctx.getBroadcastState(ruleStateDescriptor)
    Option(state.get("order_state")).map(_.get("xxx")).orElse(userSegmentInfo.get("xxx"))
  }

  override def processBroadcastElement(value: OrderConfBroadcastBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#Context, out: Collector[OrderResultBean]): Unit = {
    ctx.getBroadcastState(stateDescriptor).put("order_state", value);
  }
}

这两种实现方式最大的区别在于,第一种方式是将从广播流接收到的数据存储到变量中,这些变量在作业失败时会丢失,而第二种方式是使用广播状态,它将被检查点和恢复。

第二版有一些开销。您必须测量它才能知道有多少——但在这两种情况下,数据都在内存中,因此差异应该不会很大。