Kafka 流式传输字数统计中的前 N ​​个字

Kafka streams Top N words in word count

我已经实现了使用 kafka 流计算单词的著名示例:

KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde));

KTable<String, Long> wordCounts = textLines    
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
    .groupBy((key, value) -> value)    
    .count();

现在我想导出计数最高的前N个词到一个新主题。 最好的方法是什么?

您可以过滤所有高于某个阈值的计数。

textLines.toStream().filter((key, value) -> value > N).to("new-topic")

或者您可以使用 Interactive Queries 查询状态存储,找到存储中条目的总数,然后获取“前 N 个”。

随时查看 Confluent 示例存储库,其中有一些 TopN 实现示例,例如 this one