Kafka Stream Reducer 没有减少记录
Kafka Stream Reducer is not reducing records
下面是一些示例代码,我们试图根据某些记录值(在本例中为 id)删除重复项。当我发布 2 条具有相同 ID 的记录时,我收到了两份打印报表。我期待 1. 我是流的新手,所以我确定我误解了一些东西。
kstream.groupBy((key, value) -> value.getId())
.reduce((aggValue, newValue) -> aggValue)
.toStream()
.foreach((key,value) -> {
System.out.println(value);
})
基于定义的加法器,reduce 运算结果为 ktable。
KTable<Interger, String> aggregatedStream = kstream.groupBy((key, value) -> value.getId())
.reduce((aggValue, newValue) -> aggValue)
SO kTable 将具有任何 ID 的最新值(来自 value.getID() 的键)。
Kafka 流支持逐个事件处理。如果记录缓存被禁用,它将触发对每个事件的操作。因此,为 2 条记录调用该方法两次,打印语句将 return 该记录的最新值。
聚合计算输入的记录值的总和,按键分组,returns 一个 KTable。
示例:
没有缓存: 为键 A 发出一系列输出记录,表示结果聚合中的变化 table。括号(())表示变化,左边的数字是新的聚合值,右边的数字是旧的聚合值:<A, (1, null)>, <A, (21, 1)>, <A, (321, 21)>.
使用缓存: 为键 A 发出单个输出记录,该记录可能会在缓存中压缩,从而导致 <A, (321, null)>
的单个输出记录。该记录被写入聚合的内部状态存储并转发给任何下游操作。
缓存大小通过 cache.max.bytes.buffering
参数指定,这是每个处理拓扑的全局设置。您可以这样设置 属性:
// Enable record cache of size 10 MB.
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
下面是一些示例代码,我们试图根据某些记录值(在本例中为 id)删除重复项。当我发布 2 条具有相同 ID 的记录时,我收到了两份打印报表。我期待 1. 我是流的新手,所以我确定我误解了一些东西。
kstream.groupBy((key, value) -> value.getId())
.reduce((aggValue, newValue) -> aggValue)
.toStream()
.foreach((key,value) -> {
System.out.println(value);
})
基于定义的加法器,reduce 运算结果为 ktable。
KTable<Interger, String> aggregatedStream = kstream.groupBy((key, value) -> value.getId())
.reduce((aggValue, newValue) -> aggValue)
SO kTable 将具有任何 ID 的最新值(来自 value.getID() 的键)。
Kafka 流支持逐个事件处理。如果记录缓存被禁用,它将触发对每个事件的操作。因此,为 2 条记录调用该方法两次,打印语句将 return 该记录的最新值。
聚合计算输入的记录值的总和,按键分组,returns 一个 KTable。
示例:
没有缓存: 为键 A 发出一系列输出记录,表示结果聚合中的变化 table。括号(())表示变化,左边的数字是新的聚合值,右边的数字是旧的聚合值:<A, (1, null)>, <A, (21, 1)>, <A, (321, 21)>.
使用缓存: 为键 A 发出单个输出记录,该记录可能会在缓存中压缩,从而导致 <A, (321, null)>
的单个输出记录。该记录被写入聚合的内部状态存储并转发给任何下游操作。
缓存大小通过 cache.max.bytes.buffering
参数指定,这是每个处理拓扑的全局设置。您可以这样设置 属性:
// Enable record cache of size 10 MB.
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);