Kafka Streams 以复杂的条件按键加入
Kafka Streams join by key with complex condition
我正在尝试通过按键将 KStream
与 GlobalKTable
连接起来,但具有特定的逻辑。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Integer> stream = builder.stream(inputTopic1); // key = "ABC"
GlobalKTable<String, Integer> table = builder.globalTable(inputTopic2); // key = "ABC"
stream.join(table, // join first by "ABC" = "ABC", then by "AB" = "AB", then by "A" = "A"
(key, value) -> key,
(valueLeft, valueRigth) -> {/* identify by which condition the join was performed */});
例如,如果键 = "ABC",则:
- 首先,通过完整的密钥加入 - 即 "ABC" = "ABC"
- 然后,如果没有加入,则通过前两个符号加入(删除一个符号) - 即 "AB" = "AB"
- 最后,尝试仅通过一个符号加入 - 即 "A" = "A"
此外,还需要知道执行连接的条件 - 例如,3 个字母/2 个字母/1 个字母。
问题是,是否完全可行,或者我应该寻找解决方法?例如,使用相应的键(table 与 "ABC" 键,一个与 "AB" 键和一个与 "A" 键复制 GlobalKTable 并执行 3 个单独的连接?或者还有其他建议吗?
提前致谢!
可以对多个表使用一系列左联接(如果您知道经常想尝试联接)。如果连接成功,您将跳过下一个连接。使用 leftJoin()
和 branch()
的组合应该允许您在每次加入后将流拆分为 "joined" 和 "retry"。最后,如果需要,您可以 merge()
将不同的结果流放在一起。
我正在尝试通过按键将 KStream
与 GlobalKTable
连接起来,但具有特定的逻辑。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Integer> stream = builder.stream(inputTopic1); // key = "ABC"
GlobalKTable<String, Integer> table = builder.globalTable(inputTopic2); // key = "ABC"
stream.join(table, // join first by "ABC" = "ABC", then by "AB" = "AB", then by "A" = "A"
(key, value) -> key,
(valueLeft, valueRigth) -> {/* identify by which condition the join was performed */});
例如,如果键 = "ABC",则:
- 首先,通过完整的密钥加入 - 即 "ABC" = "ABC"
- 然后,如果没有加入,则通过前两个符号加入(删除一个符号) - 即 "AB" = "AB"
- 最后,尝试仅通过一个符号加入 - 即 "A" = "A"
此外,还需要知道执行连接的条件 - 例如,3 个字母/2 个字母/1 个字母。
问题是,是否完全可行,或者我应该寻找解决方法?例如,使用相应的键(table 与 "ABC" 键,一个与 "AB" 键和一个与 "A" 键复制 GlobalKTable 并执行 3 个单独的连接?或者还有其他建议吗?
提前致谢!
可以对多个表使用一系列左联接(如果您知道经常想尝试联接)。如果连接成功,您将跳过下一个连接。使用 leftJoin()
和 branch()
的组合应该允许您在每次加入后将流拆分为 "joined" 和 "retry"。最后,如果需要,您可以 merge()
将不同的结果流放在一起。