我可以在 Kafka Stream Topology 中多次使用主题吗?

Can I use topic more than once in Kafka Stream Topology?

我们假设 groupby 函数在 kafka 流中不可用。我可以执行以下操作来获取字数并在其上构建 KTable 吗?请注意,我在拓扑中使用了两次 "word-count-topic"。我有一个用例,我想迭代地构建一些东西,对于下一个流事件,我想查找以前的值并根据事件更新它。我想在我构建 Ktable 的同一主题中保留最新值。

KTable<String,Long> wordCountTable = builder.table("word-count-topic",Consumed.with(Serdes.String(), Serdes.Long()));

KStream<String,String> wordsStream = builder.stream("words-topic",Consumed.with(Serdes.String(), Serdes.String()));

KStream<String,String> msgStream = wordsStream
                                   .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\W+")))
                                   .selectKey((k,v) -> v);

msgStream.leftJoin(kTable, (word,count) -> {
                                             if( count == null) return new WordCount(word, Long.valueOf(1));
                                             else return new WordCount(word, count + 1);
                                           })
            .mapValues((k,v)-> v.getCount())
            .to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));

streams = new KafkaStreams(builder.build(), props);
streams.start();

应该可以。为什么不只是 运行 代码?