有没有办法在 Kafka 流中的计数函数上应用过滤器?

Is there a way to apply filters on a count function in Kafka streams?

我的用例如下 - 我有一个包含特定 ID 消息的主题。 我创建了一个 Kafka Streams 应用程序,它聚合具有相同 ID 的消息并对它们进行计数(类似于 https://kafka.apache.org/10/documentation/streams/tutorial 中的示例 WordCount 实现)

我希望 Kafka 流仅在超过某个阈值时才向输出主题发送消息。例如,如果我将阈值定义为 10,我希望在流处理 10 条具有相同 ID 的消息后,将一条消息发送到输出主题。

我知道这可以通过有一个额外的主题和另一个处理该主题的流来完成,但是有没有办法在一个流中做到这一点?

使用count()聚合函数将KStream转为Stream后,可以过滤count值,转为Stream发送到特定主题:

.selectKey((k, v) -> v)
        .groupByKey()
        .count()
        .filter((key, count) -> count > 3)
        .toStream()
        .filter((key, count) -> count != null)
        .to("output", Produced.with(Serdes.String(), Serdes.String()));