Kafka Stream Reducer 没有减少记录

Kafka Stream Reducer is not reducing records

下面是一些示例代码,我们试图根据某些记录值(在本例中为 id)删除重复项。当我发布 2 条具有相同 ID 的记录时,我收到了两份打印报表。我期待 1. 我是流的新手,所以我确定我误解了一些东西。

 kstream.groupBy((key, value) -> value.getId())
                    .reduce((aggValue, newValue) -> aggValue)
                    .toStream()
                    .foreach((key,value) -> {
                        System.out.println(value);
                    })

基于定义的加法器,reduce 运算结果为 ktable。

KTable<Interger, String> aggregatedStream = kstream.groupBy((key, value) -> value.getId())
                    .reduce((aggValue, newValue) -> aggValue)

SO kTable 将具有任何 ID 的最新值(来自 value.getID() 的键)。

Kafka 流支持逐个事件处理。如果记录缓存被禁用,它将触发对每个事件的操作。因此,为 2 条记录调用该方法两次,打印语句将 return 该记录的最新值。

聚合计算输入的记录值的总和,按键分组,returns 一个 KTable。

示例:

没有缓存: 为键 A 发出一系列输出记录,表示结果聚合中的变化 table。括号(())表示变化,左边的数字是新的聚合值,右边的数字是旧的聚合值:<A, (1, null)>, <A, (21, 1)>, <A, (321, 21)>.

使用缓存: 为键 A 发出单个输出记录,该记录可能会在缓存中压缩,从而导致 <A, (321, null)> 的单个输出记录。该记录被写入聚合的内部状态存储并转发给任何下游操作。

缓存大小通过 cache.max.bytes.buffering 参数指定,这是每个处理拓扑的全局设置。您可以这样设置 属性:

// Enable record cache of size 10 MB.
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);