Kafka Streams - 不可预测的聚合结果

Kafka Streams - Unpredictable result of aggregation

最近在学习Apache Kafka Streams,玩玩world countexamples.Below是我的代码

public class StreamsStarterApp {
public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    StreamsBuilder streamsBuilder = new StreamsBuilder();

    KStream<String, String> wordCountPipe = streamsBuilder.stream("word-count-in");

    wordCountPipe.filter((key, value) -> StringUtils.isNoneBlank(value))
            .mapValues(value -> value.toLowerCase())
            .flatMapValues(value -> Splitter.on(",").trimResults().split(value))
            .groupBy((key,value)-> value)
            .count(Named.as("count"))
            .toStream()
            .to("word-count-out", Produced.with(Serdes.String(),Serdes.Long()));

    KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
    kafkaStreams.start();
    Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}

}

我的一个有趣观察是,如果我评论“.mapValues(value -> value.toLowerCase())”,结果会有所不同,这让我很困惑,代码中的任何更改都会导致不可预测结果变化

  1. 向主题发送你好,你好'word-count-int'

  2. 结果会显示 你好 2

  3. 如果我评论 '.mapValues(value -> value.toLowerCase())' 并再次发送 hello,world 结果会显示 你好 1 世界 1

怎么会这样?这与 Kafka 流中的状态存储有关吗

修改 KafkaStreams 应用程序(即删除或添加运算符)可能会导致不兼容。一般来说,如果你想改变程序,你经常需要重置应用程序(即删除它的所有状态)(cf https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html)。

对于您的特定情况,问题是运算符名称。名称是使用内部计数器自动生成的,以避免命名冲突。如果删除一个操作员,下游操作员的名称会发生​​变化。因此,count() 运算符找不到它的旧状态(每个统计存储也有一个名称,存储的名称也会更改),因此在删除 mapValues 后,您从一个空状态开始.

您可以通过 Topology#describe() 检查命名。这允许您比较更改代码前后的拓扑。

为了兼容升级,DSL 允许您明确指定名称(参见 https://docs.confluent.io/current/streams/developer-guide/dsl-topology-naming.html)。这样,命名就不会改变。对于 word-count 示例,您可以通过以下方式指定名称:

.count(Materialized.as("myName"))