Flink KeyedCoProcessFunction 使用状态

Flink KeyedCoProcessFunction working with state

我使用 KeyedCoProcessFunction 函数用来自另一个流的数据丰富主数据流

代码:

class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {

  case class AssetStateDoc(assetId: Option[String])
  private var associatedDevices: ValueState[AssetStateDoc] = _

  override def open(parameters: Configuration): Unit = {
    val associatedDevicesDescriptor =
      new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
    associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
  }

  override def processElement1(
                                packet: PacketData,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    val tmpState = associatedDevices.value
    val state = if (tmpState == null) AssetStateDoc(None) else tmpState
    
    state.assetId match {
      case Some(assetId) =>
        logger.debug(s"There are state for ${packet.tag.externalId} = $assetId")
        out.collect(AssetData(assetId, packet.tag.externalId.get, packet.toString))
      case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
      case _ => logger.debug("Smth went wrong")
    }
  }

  override def processElement2(
                                value: AssetCommandState,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    value.command match {
      case CREATE =>
        logger.debug(s"Got command to CREATE state for tag: ${value.id} with value: ${value.assetId}")
        logger.debug(s"current state is ${associatedDevices.value()}")
        associatedDevices.update(AssetStateDoc(Some(value.assetId)))
        logger.debug(s"new state is ${associatedDevices.value()}")
      case _ =>
        logger.error("Got unknown AssetCommandState command")
    }
  }
}

processElement2() 效果很好,它接受数据并更新状态。
但是在 processElement1() 中我总是点击 case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")

尽管我希望在 processElement2 函数中设置一个值

作为示例,我使用了本指南 - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/

processElement1processElement2 共享状态,但请记住这是 key-partitioned 状态。这意味着在处理给定值 v2 时在 processElement2 中设置的值只会在稍后使用值 调用时在 processElement1 中看到]v1v2.

具有相同的密钥

另请记住,您无法控制进入 processElement1processElement2 的两个流之间的竞争条件。

RidesAndFares exercise from the official Apache Flink training is all about learning to work with this part of the API. https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/etl/是相应教程的首页