Flink KeyedCoProcessFunction 使用状态
Flink KeyedCoProcessFunction working with state
我使用 KeyedCoProcessFunction
函数用来自另一个流的数据丰富主数据流
代码:
class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {
case class AssetStateDoc(assetId: Option[String])
private var associatedDevices: ValueState[AssetStateDoc] = _
override def open(parameters: Configuration): Unit = {
val associatedDevicesDescriptor =
new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
}
override def processElement1(
packet: PacketData,
ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
out: Collector[AssetData]): Unit = {
val tmpState = associatedDevices.value
val state = if (tmpState == null) AssetStateDoc(None) else tmpState
state.assetId match {
case Some(assetId) =>
logger.debug(s"There are state for ${packet.tag.externalId} = $assetId")
out.collect(AssetData(assetId, packet.tag.externalId.get, packet.toString))
case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
case _ => logger.debug("Smth went wrong")
}
}
override def processElement2(
value: AssetCommandState,
ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
out: Collector[AssetData]): Unit = {
value.command match {
case CREATE =>
logger.debug(s"Got command to CREATE state for tag: ${value.id} with value: ${value.assetId}")
logger.debug(s"current state is ${associatedDevices.value()}")
associatedDevices.update(AssetStateDoc(Some(value.assetId)))
logger.debug(s"new state is ${associatedDevices.value()}")
case _ =>
logger.error("Got unknown AssetCommandState command")
}
}
}
processElement2()
效果很好,它接受数据并更新状态。
但是在 processElement1()
中我总是点击 case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
尽管我希望在 processElement2
函数中设置一个值
作为示例,我使用了本指南 - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/
processElement1
和 processElement2
共享状态,但请记住这是 key-partitioned 状态。这意味着在处理给定值 v2 时在 processElement2
中设置的值只会在稍后使用值 调用时在 processElement1
中看到]v1 与 v2.
具有相同的密钥
另请记住,您无法控制进入 processElement1
和 processElement2
的两个流之间的竞争条件。
RidesAndFares exercise from the official Apache Flink training is all about learning to work with this part of the API. https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/etl/是相应教程的首页
我使用 KeyedCoProcessFunction
函数用来自另一个流的数据丰富主数据流
代码:
class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {
case class AssetStateDoc(assetId: Option[String])
private var associatedDevices: ValueState[AssetStateDoc] = _
override def open(parameters: Configuration): Unit = {
val associatedDevicesDescriptor =
new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
}
override def processElement1(
packet: PacketData,
ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
out: Collector[AssetData]): Unit = {
val tmpState = associatedDevices.value
val state = if (tmpState == null) AssetStateDoc(None) else tmpState
state.assetId match {
case Some(assetId) =>
logger.debug(s"There are state for ${packet.tag.externalId} = $assetId")
out.collect(AssetData(assetId, packet.tag.externalId.get, packet.toString))
case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
case _ => logger.debug("Smth went wrong")
}
}
override def processElement2(
value: AssetCommandState,
ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
out: Collector[AssetData]): Unit = {
value.command match {
case CREATE =>
logger.debug(s"Got command to CREATE state for tag: ${value.id} with value: ${value.assetId}")
logger.debug(s"current state is ${associatedDevices.value()}")
associatedDevices.update(AssetStateDoc(Some(value.assetId)))
logger.debug(s"new state is ${associatedDevices.value()}")
case _ =>
logger.error("Got unknown AssetCommandState command")
}
}
}
processElement2()
效果很好,它接受数据并更新状态。
但是在 processElement1()
中我总是点击 case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
尽管我希望在 processElement2
函数中设置一个值
作为示例,我使用了本指南 - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/
processElement1
和 processElement2
共享状态,但请记住这是 key-partitioned 状态。这意味着在处理给定值 v2 时在 processElement2
中设置的值只会在稍后使用值 调用时在 processElement1
中看到]v1 与 v2.
另请记住,您无法控制进入 processElement1
和 processElement2
的两个流之间的竞争条件。
RidesAndFares exercise from the official Apache Flink training is all about learning to work with this part of the API. https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/etl/是相应教程的首页