KTable 不对具有相同键的传入记录进行重复数据删除

KTable not deduplicating the incoming records with same keys

我正在尝试使用输入主题作为 KTable 对记录进行重复数据删除,并将它们下沉到输出主题。但是KTable还在将重复的记录下沉到输出topic中。不确定我哪里出错了。

这是我的 application.yml

spring:
  cloud:
    stream:
      function:
        bindings:
          process-in-0: input.topic
          process-out-0: output.topic
        definition: process
      kafka:
        streams:
          bindings:
            process-in-0:
              consumer:
                materializedAs: incoming-store
          binder:
            application-id: spring-cloud-uppercase-app
            brokers: localhost:9092
            configuration:
              commit:
                interval:
                  ms: 1000
                state.dir: state-store
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde

根据spring cloud stream kafka stream documentation about state store,我添加了上面的物化视图作为incoming-store

process() bean 函数将输入主题作为 KTable 并将其下沉到输出主题


    @Bean
    public Function<KTable<String, String>, KStream<String, String>> process(){
        return table -> table
                .toStream()
                .peek((k, v) -> log.info("Received key={}, value={}", k, v));
    }

对于给定的 4 条记录输入

key=111, value="a"
key=111, value="a"
key=222, value="b"
key=111, value="a"

我预计只有 2 条记录

key=111, value="a"
key=222, value="b"

但是正在获取所有 4 条记录。任何帮助将不胜感激!

我认为你要解决的问题,在这里通过压缩主题会得到很好的解决。 一旦您将具有相同密钥的数据传送到压缩主题并且在代理级别启用了压缩(默认情况下启用),每个代理将启动一个压缩管理器线程和许多压缩线程。这些负责执行压缩任务。 压缩只是保留每个键的最新值并清理旧的(脏的)条目。

有关详细信息,请参阅 this Kafka 文档。

您可以按键分组并聚合事件。虽然在聚合过程中您没有连接字符串,但 aggregate 转换将仅用于发出您按键 111222 分组的值。您的用例只是一个独特的聚合。每次汇总时,您都会收到 (key, value, aggregate),然后您只保留 value,这将是最新值。

@Slf4j
@Configuration
@EnableAutoConfiguration
public class KafkaAggFunctionalService {

    @Bean
    public Function<KTable<String, String>, KStream<String, String>> aggregate() {
        return table -> table
                .toStream()
                .groupBy((key, value) -> key, Grouped.with(Serdes.String(), Serdes.String()))
                .aggregate(() -> "", (key, value, aggregate) ->
                                value,
                        Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
                )
                .toStream()
                .peek((k, v) -> log.info("Received key={}, value={}", k, v));
    }
}

这个git repo has a lot of examples. The one that looks very similar to yours is this.