如何在kafka流上实现分组转换
How to materialize grouping transformations on kafka streams
在您的 kafka 流上应用 groupBy() 或 groupByKey() 时,您会得到一个 KGroupedStream 对象。是否有可能实现该对象,也许在一段时间内-window,并将分组数据写入主题?
您可以编写自定义聚合器以将 groupedStream 实现为 KTable。它将具有列表格式的分组记录。稍后可以发布到kafka主题。
KTable<K, ArrayList<V>> groupedTable = streamObject.groupByKey().aggregate(
// Custom Initializer
ArrayList::new,
// aggregator
(key, value, list) -> {
list.add(value);
return list;
}, new ArrayListSerde<V>(serdeType), storageName);
groupedTable.through(newTopic);
// Or convert into stream and publish
groupedTable.toStream().to(newTopic);
在您的 kafka 流上应用 groupBy() 或 groupByKey() 时,您会得到一个 KGroupedStream 对象。是否有可能实现该对象,也许在一段时间内-window,并将分组数据写入主题?
您可以编写自定义聚合器以将 groupedStream 实现为 KTable。它将具有列表格式的分组记录。稍后可以发布到kafka主题。
KTable<K, ArrayList<V>> groupedTable = streamObject.groupByKey().aggregate(
// Custom Initializer
ArrayList::new,
// aggregator
(key, value, list) -> {
list.add(value);
return list;
}, new ArrayListSerde<V>(serdeType), storageName);
groupedTable.through(newTopic);
// Or convert into stream and publish
groupedTable.toStream().to(newTopic);