如何在kafka流上实现分组转换

How to materialize grouping transformations on kafka streams

在您的 kafka 流上应用 groupBy() 或 groupByKey() 时,您会得到一个 KGroupedStream 对象。是否有可能实现该对象,也许在一段时间内-window,并将分组数据写入主题?

您可以编写自定义聚合器以将 groupedStream 实现为 KTable。它将具有列表格式的分组记录。稍后可以发布到kafka主题。

KTable<K, ArrayList<V>> groupedTable = streamObject.groupByKey().aggregate(
            // Custom Initializer
            ArrayList::new,
            // aggregator
            (key, value, list) -> {
                list.add(value);
                return list;
            }, new ArrayListSerde<V>(serdeType), storageName);
groupedTable.through(newTopic);
// Or convert into stream and publish
groupedTable.toStream().to(newTopic);