Apache Flink RocksDB 状态管理
Apache Flink RocksDB state management
我正在同一个 flink 作业中阅读 2 个 kafka 主题。
Stream1
: 来自第一个主题的消息被保存到rocksdb,然后与stream2合并。
Stream2
: 来自第二个topic的消息被stream1保存的state丰富,然后与stream1联合。
主题 1 和主题 2 是不同的来源,但基本上两个来源的输出是相同的。我必须用来自 topic1 的数据来丰富来自 topic2 的数据。
这里是流量;
val stream1 = readKafkaTopic1().keyBy(_.memberId).map(saveMemberDetailsToRocksDB)
val stream2 = readKafkaTopic2().keyBy(_.memberId).map(readMemberDetailsAndEnrich)
stream1.union(stream2).addSink(kafkaProducer)
这是问题;
- 那个流量好吗?
stream2
可以访问 stream1
为同一个 memberId
保存的状态吗?
看来您应该能够通过使用 KeyedCoProcessFunction
实现您想要的。这或多或少会喜欢:
stream1
.keyBy(_.memberId)
.connect(stream2.keyBy(_.memberId))
.process(new CustomKeyedCoProcessFunction())
这样您可以将状态保持在单个状态 KeyedCoProcessFunction
,因此您可以同时访问 stream1
和 stream2
。
因此,对于 processElement1
,您可以在 map
中为 stream1
和在 processElement2
中做同样的事情在 map
内 for stream2.
我正在同一个 flink 作业中阅读 2 个 kafka 主题。
Stream1
: 来自第一个主题的消息被保存到rocksdb,然后与stream2合并。Stream2
: 来自第二个topic的消息被stream1保存的state丰富,然后与stream1联合。
主题 1 和主题 2 是不同的来源,但基本上两个来源的输出是相同的。我必须用来自 topic1 的数据来丰富来自 topic2 的数据。
这里是流量;
val stream1 = readKafkaTopic1().keyBy(_.memberId).map(saveMemberDetailsToRocksDB)
val stream2 = readKafkaTopic2().keyBy(_.memberId).map(readMemberDetailsAndEnrich)
stream1.union(stream2).addSink(kafkaProducer)
这是问题;
- 那个流量好吗?
stream2
可以访问stream1
为同一个memberId
保存的状态吗?
看来您应该能够通过使用 KeyedCoProcessFunction
实现您想要的。这或多或少会喜欢:
stream1
.keyBy(_.memberId)
.connect(stream2.keyBy(_.memberId))
.process(new CustomKeyedCoProcessFunction())
这样您可以将状态保持在单个状态 KeyedCoProcessFunction
,因此您可以同时访问 stream1
和 stream2
。
因此,对于 processElement1
,您可以在 map
中为 stream1
和在 processElement2
中做同样的事情在 map
内 for stream2.