Kafka 流不会以聚合值重新启动
Kafka Streams Do Not Restart with Aggregated Values
我正在像这样聚合流中的值:
private KTable<String, StringAggregator> aggregate(KStream<String, String> inputStream) {
return inputStream
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.aggregate(
StringAggregator::new,
(k, v, a) -> {
a.add(v);
return a;
}, Materialized.<String, StringAggregator>as(Stores.persistentKeyValueStore("STATE_STORE"))
.withKeySerde(Serdes.String())
.withValueSerde(getValueSerde(StringAggregator.class)));
}
通常情况下,这非常有效。但是,当应用程序重新启动时,键的聚合值将丢失。此外,也有可能整个服务器将被终止,而新服务器(具有新版本的流应用程序)将联机。我如何确保聚合的值持续存在?
我最终创建了聚合逻辑,它使用在 kafka 主题上持久保存的聚合结果。逻辑如下:
private KStream<String, StringAggregator> getAggregator(String topicName,
KStream<String, String> input,
KTable<String, StringAggregator> aggregator) {
return input
.leftJoin(aggregator, (inputMessage, aggregatorMessage) -> {
if (aggregatorMessage == null) {
aggregatorMessage = new StringAggregator();
}
aggregatorMessage.add(inputMessage);
return aggregatorMessage;
}).peek((k, v) -> logger.info("Aggregated a join input for {}: {}, {} aggregated.", topicName, k, v.size()));
}
下面是实际构建流的逻辑。
String topicName = "input";
KStream<String, String> input = streamsBuilder.stream(topicName);
KTable<String, StringAggregator> aggregator = streamsBuilder.table("aggregate");
getAggregator(topicName, input, aggregator).to("aggregate");
我正在像这样聚合流中的值:
private KTable<String, StringAggregator> aggregate(KStream<String, String> inputStream) {
return inputStream
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.aggregate(
StringAggregator::new,
(k, v, a) -> {
a.add(v);
return a;
}, Materialized.<String, StringAggregator>as(Stores.persistentKeyValueStore("STATE_STORE"))
.withKeySerde(Serdes.String())
.withValueSerde(getValueSerde(StringAggregator.class)));
}
通常情况下,这非常有效。但是,当应用程序重新启动时,键的聚合值将丢失。此外,也有可能整个服务器将被终止,而新服务器(具有新版本的流应用程序)将联机。我如何确保聚合的值持续存在?
我最终创建了聚合逻辑,它使用在 kafka 主题上持久保存的聚合结果。逻辑如下:
private KStream<String, StringAggregator> getAggregator(String topicName,
KStream<String, String> input,
KTable<String, StringAggregator> aggregator) {
return input
.leftJoin(aggregator, (inputMessage, aggregatorMessage) -> {
if (aggregatorMessage == null) {
aggregatorMessage = new StringAggregator();
}
aggregatorMessage.add(inputMessage);
return aggregatorMessage;
}).peek((k, v) -> logger.info("Aggregated a join input for {}: {}, {} aggregated.", topicName, k, v.size()));
}
下面是实际构建流的逻辑。
String topicName = "input";
KStream<String, String> input = streamsBuilder.stream(topicName);
KTable<String, StringAggregator> aggregator = streamsBuilder.table("aggregate");
getAggregator(topicName, input, aggregator).to("aggregate");