Apache Flink RocksDB 状态管理

Apache Flink RocksDB state management

我正在同一个 flink 作业中阅读 2 个 kafka 主题。

主题 1 和主题 2 是不同的来源,但基本上两个来源的输出是相同的。我必须用来自 topic1 的数据来丰富来自 topic2 的数据。

这里是流量;

val stream1 = readKafkaTopic1().keyBy(_.memberId).map(saveMemberDetailsToRocksDB)
val stream2 = readKafkaTopic2().keyBy(_.memberId).map(readMemberDetailsAndEnrich)
stream1.union(stream2).addSink(kafkaProducer)

这是问题;

  1. 那个流量好吗?
  2. stream2 可以访问 stream1 为同一个 memberId 保存的状态吗?

看来您应该能够通过使用 KeyedCoProcessFunction 实现您想要的。这或多或少会喜欢:

stream1
.keyBy(_.memberId)
.connect(stream2.keyBy(_.memberId))
.process(new CustomKeyedCoProcessFunction())

这样您可以将状态保持在单个状态 KeyedCoProcessFunction,因此您可以同时访问 stream1stream2

因此,对于 processElement1,您可以在 map 中为 stream1 和在 processElement2 中做同样的事情在 map 内 for stream2.