Kafka Streams - 不可预测的聚合结果
Kafka Streams - Unpredictable result of aggregation
最近在学习Apache Kafka Streams,玩玩world countexamples.Below是我的代码
public class StreamsStarterApp {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> wordCountPipe = streamsBuilder.stream("word-count-in");
wordCountPipe.filter((key, value) -> StringUtils.isNoneBlank(value))
.mapValues(value -> value.toLowerCase())
.flatMapValues(value -> Splitter.on(",").trimResults().split(value))
.groupBy((key,value)-> value)
.count(Named.as("count"))
.toStream()
.to("word-count-out", Produced.with(Serdes.String(),Serdes.Long()));
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
}
我的一个有趣观察是,如果我评论“.mapValues(value -> value.toLowerCase())”,结果会有所不同,这让我很困惑,代码中的任何更改都会导致不可预测结果变化
向主题发送你好,你好'word-count-int'
结果会显示
你好 2
如果我评论 '.mapValues(value -> value.toLowerCase())' 并再次发送 hello,world
结果会显示
你好 1
世界 1
怎么会这样?这与 Kafka 流中的状态存储有关吗
修改 KafkaStreams 应用程序(即删除或添加运算符)可能会导致不兼容。一般来说,如果你想改变程序,你经常需要重置应用程序(即删除它的所有状态)(cf https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html)。
对于您的特定情况,问题是运算符名称。名称是使用内部计数器自动生成的,以避免命名冲突。如果删除一个操作员,下游操作员的名称会发生变化。因此,count()
运算符找不到它的旧状态(每个统计存储也有一个名称,存储的名称也会更改),因此在删除 mapValues
后,您从一个空状态开始.
您可以通过 Topology#describe()
检查命名。这允许您比较更改代码前后的拓扑。
为了兼容升级,DSL 允许您明确指定名称(参见 https://docs.confluent.io/current/streams/developer-guide/dsl-topology-naming.html)。这样,命名就不会改变。对于 word-count 示例,您可以通过以下方式指定名称:
.count(Materialized.as("myName"))
最近在学习Apache Kafka Streams,玩玩world countexamples.Below是我的代码
public class StreamsStarterApp {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> wordCountPipe = streamsBuilder.stream("word-count-in");
wordCountPipe.filter((key, value) -> StringUtils.isNoneBlank(value))
.mapValues(value -> value.toLowerCase())
.flatMapValues(value -> Splitter.on(",").trimResults().split(value))
.groupBy((key,value)-> value)
.count(Named.as("count"))
.toStream()
.to("word-count-out", Produced.with(Serdes.String(),Serdes.Long()));
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
}
我的一个有趣观察是,如果我评论“.mapValues(value -> value.toLowerCase())”,结果会有所不同,这让我很困惑,代码中的任何更改都会导致不可预测结果变化
向主题发送你好,你好'word-count-int'
结果会显示 你好 2
如果我评论 '.mapValues(value -> value.toLowerCase())' 并再次发送 hello,world 结果会显示 你好 1 世界 1
怎么会这样?这与 Kafka 流中的状态存储有关吗
修改 KafkaStreams 应用程序(即删除或添加运算符)可能会导致不兼容。一般来说,如果你想改变程序,你经常需要重置应用程序(即删除它的所有状态)(cf https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html)。
对于您的特定情况,问题是运算符名称。名称是使用内部计数器自动生成的,以避免命名冲突。如果删除一个操作员,下游操作员的名称会发生变化。因此,count()
运算符找不到它的旧状态(每个统计存储也有一个名称,存储的名称也会更改),因此在删除 mapValues
后,您从一个空状态开始.
您可以通过 Topology#describe()
检查命名。这允许您比较更改代码前后的拓扑。
为了兼容升级,DSL 允许您明确指定名称(参见 https://docs.confluent.io/current/streams/developer-guide/dsl-topology-naming.html)。这样,命名就不会改变。对于 word-count 示例,您可以通过以下方式指定名称:
.count(Materialized.as("myName"))