Kstreams 中有状态和无状态转换有什么区别?

what is the difference between stateful and stateless transformation in Kstreams?

我是 Kstreams 的初学者,我仔细阅读了文档,但我似乎无法理解这两者之间的区别,非常感谢您通过示例进行简单的解释。

聚合和连接需要状态 - 通过拓扑进行的初始累加器或分组

过滤、分支、映射或迭代流不需要状态 - 一条消息进来,零条或一条消息出来

值得指出的是 groupBy 函数被认为是无状态的

OneCricketer 所说的是正确的。我发布这个答案只是为了用现在的例子来解释。简而言之,stateful 操作依赖于流的先前事件,而 stateless 操作则不是。因此,以计数事件为例。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KTable<String, Count> aggregate = stream
      .peek((key, value) -> log.info("received key: {}, value: {}", key, value))
      .filter((key, value) -> /* filter events with value is ____ */)
      .groupByKey()
      .aggregate(new Initializer<Count>() {
                    @Override
                    public Count apply() {
                        return new Count("", 0);
                    }
                }, new Aggregator<String, String, Count>() {
                    @Override
                    public Count apply(String k, String v, Count aggKeyCount) {
                        Integer currentCount = aggKeyCount.getCount();
                        return new Count(k, currentCount + 1);
                    }
                });
                
aggregate.toStream()
         .map((k,v) -> new KeyValue<>(k, v.getCount()))
         .peek((key, value) -> log.info("emitting key: {}, value: {}", key, value))
         .to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));

操作 groupByKeyaggregatefiltermap 正在以某种方式转换事件(peek 不转换事件) .转换 groupByKeyfiltermapstateless,因为它们可能会修改它们正在处理的当前事件(它们不关心以前的事件)。 aggregate 转换正在计算事件的数量,因此它正在对它们求和。因此,这取决于以前的事件。并不是说它有 Count("", 0)Initializer,而是根据前一个计数器 aggKeyCount.

Count(k, currentCount + 1) 上一个接一个地聚合事件

而且statelessstateful的概念在Kafka的KStream上是没有的。它在所有处理引擎上,例如 Hadoop MapReduce,Apache Spark, Apache Flink, Apache Storm. And it is also present on any processing pipeline, such as Java Stream (e.g.: map, reduce, flatmap, filter), , .