Kstreams 中有状态和无状态转换有什么区别?
what is the difference between stateful and stateless transformation in Kstreams?
我是 Kstreams 的初学者,我仔细阅读了文档,但我似乎无法理解这两者之间的区别,非常感谢您通过示例进行简单的解释。
聚合和连接需要状态 - 通过拓扑进行的初始累加器或分组
过滤、分支、映射或迭代流不需要状态 - 一条消息进来,零条或一条消息出来
值得指出的是 groupBy 函数被认为是无状态的
OneCricketer 所说的是正确的。我发布这个答案只是为了用现在的例子来解释。简而言之,stateful
操作依赖于流的先前事件,而 stateless
操作则不是。因此,以计数事件为例。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KTable<String, Count> aggregate = stream
.peek((key, value) -> log.info("received key: {}, value: {}", key, value))
.filter((key, value) -> /* filter events with value is ____ */)
.groupByKey()
.aggregate(new Initializer<Count>() {
@Override
public Count apply() {
return new Count("", 0);
}
}, new Aggregator<String, String, Count>() {
@Override
public Count apply(String k, String v, Count aggKeyCount) {
Integer currentCount = aggKeyCount.getCount();
return new Count(k, currentCount + 1);
}
});
aggregate.toStream()
.map((k,v) -> new KeyValue<>(k, v.getCount()))
.peek((key, value) -> log.info("emitting key: {}, value: {}", key, value))
.to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
操作 groupByKey
、aggregate
、filter
、map
正在以某种方式转换事件(peek
不转换事件) .转换 groupByKey
、filter
和 map
是 stateless
,因为它们可能会修改它们正在处理的当前事件(它们不关心以前的事件)。 aggregate
转换正在计算事件的数量,因此它正在对它们求和。因此,这取决于以前的事件。并不是说它有 Count("", 0)
的 Initializer
,而是根据前一个计数器 aggKeyCount
.
在 Count(k, currentCount + 1)
上一个接一个地聚合事件
而且stateless
和stateful
的概念在Kafka的KStream
上是没有的。它在所有处理引擎上,例如 Hadoop MapReduce,Apache Spark, Apache Flink, Apache Storm. And it is also present on any processing pipeline, such as Java Stream (e.g.: map
, reduce
, flatmap
, filter
), , .
我是 Kstreams 的初学者,我仔细阅读了文档,但我似乎无法理解这两者之间的区别,非常感谢您通过示例进行简单的解释。
聚合和连接需要状态 - 通过拓扑进行的初始累加器或分组
过滤、分支、映射或迭代流不需要状态 - 一条消息进来,零条或一条消息出来
值得指出的是 groupBy 函数被认为是无状态的
OneCricketer 所说的是正确的。我发布这个答案只是为了用现在的例子来解释。简而言之,stateful
操作依赖于流的先前事件,而 stateless
操作则不是。因此,以计数事件为例。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KTable<String, Count> aggregate = stream
.peek((key, value) -> log.info("received key: {}, value: {}", key, value))
.filter((key, value) -> /* filter events with value is ____ */)
.groupByKey()
.aggregate(new Initializer<Count>() {
@Override
public Count apply() {
return new Count("", 0);
}
}, new Aggregator<String, String, Count>() {
@Override
public Count apply(String k, String v, Count aggKeyCount) {
Integer currentCount = aggKeyCount.getCount();
return new Count(k, currentCount + 1);
}
});
aggregate.toStream()
.map((k,v) -> new KeyValue<>(k, v.getCount()))
.peek((key, value) -> log.info("emitting key: {}, value: {}", key, value))
.to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
操作 groupByKey
、aggregate
、filter
、map
正在以某种方式转换事件(peek
不转换事件) .转换 groupByKey
、filter
和 map
是 stateless
,因为它们可能会修改它们正在处理的当前事件(它们不关心以前的事件)。 aggregate
转换正在计算事件的数量,因此它正在对它们求和。因此,这取决于以前的事件。并不是说它有 Count("", 0)
的 Initializer
,而是根据前一个计数器 aggKeyCount
.
Count(k, currentCount + 1)
上一个接一个地聚合事件
而且stateless
和stateful
的概念在Kafka的KStream
上是没有的。它在所有处理引擎上,例如 Hadoop MapReduce,Apache Spark, Apache Flink, Apache Storm. And it is also present on any processing pipeline, such as Java Stream (e.g.: map
, reduce
, flatmap
, filter
),