Kafka 流式传输字数统计中的前 N 个字
Kafka streams Top N words in word count
我已经实现了使用 kafka 流计算单词的著名示例:
KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde));
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
.groupBy((key, value) -> value)
.count();
现在我想导出计数最高的前N个词到一个新主题。
最好的方法是什么?
您可以过滤所有高于某个阈值的计数。
textLines.toStream().filter((key, value) -> value > N).to("new-topic")
或者您可以使用 Interactive Queries 查询状态存储,找到存储中条目的总数,然后获取“前 N 个”。
随时查看 Confluent 示例存储库,其中有一些 TopN 实现示例,例如 this one。
我已经实现了使用 kafka 流计算单词的著名示例:
KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde));
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
.groupBy((key, value) -> value)
.count();
现在我想导出计数最高的前N个词到一个新主题。 最好的方法是什么?
您可以过滤所有高于某个阈值的计数。
textLines.toStream().filter((key, value) -> value > N).to("new-topic")
或者您可以使用 Interactive Queries 查询状态存储,找到存储中条目的总数,然后获取“前 N 个”。
随时查看 Confluent 示例存储库,其中有一些 TopN 实现示例,例如 this one。