Kafka 流不会以聚合值重新启动

Kafka Streams Do Not Restart with Aggregated Values

我正在像这样聚合流中的值:

private KTable<String, StringAggregator> aggregate(KStream<String, String> inputStream) {
    return inputStream
            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
            .aggregate(
                    StringAggregator::new,
                    (k, v, a) -> {
                        a.add(v);
                        return a;
                    }, Materialized.<String, StringAggregator>as(Stores.persistentKeyValueStore("STATE_STORE"))
                            .withKeySerde(Serdes.String())
                            .withValueSerde(getValueSerde(StringAggregator.class)));
}

通常情况下,这非常有效。但是,当应用程序重新启动时,键的聚合值将丢失。此外,也有可能整个服务器将被终止,而新服务器(具有新版本的流应用程序)将联机。我如何确保聚合的值持续存在?

我最终创建了聚合逻辑,它使用在 kafka 主题上持久保存的聚合结果。逻辑如下:

private KStream<String, StringAggregator> getAggregator(String topicName, 
                                                        KStream<String, String> input,
                                                        KTable<String, StringAggregator> aggregator) {

    return input
            .leftJoin(aggregator, (inputMessage, aggregatorMessage) -> { 
                if (aggregatorMessage == null) { 
                    aggregatorMessage = new StringAggregator(); 
                }
                aggregatorMessage.add(inputMessage);
                return aggregatorMessage; 
            }).peek((k, v) -> logger.info("Aggregated a join input for {}: {}, {} aggregated.", topicName, k, v.size()));
}

下面是实际构建流的逻辑。

String topicName = "input";
KStream<String, String> input = streamsBuilder.stream(topicName);
KTable<String, StringAggregator> aggregator = streamsBuilder.table("aggregate");
getAggregator(topicName, input, aggregator).to("aggregate");