如何检测kafka主题中的重复消息?
How to detect duplicate messages in a kafka topic?
您好,我的架构类似于下图所示。
我有两个 kafka 生产者,它们会向 kafka 主题发送消息,并经常重复消息。
有没有一种方法可以让我轻松处理这种情况,比如服务总线主题。
感谢您的帮助。
假设您实际上有 多个不同的生产者 编写相同的消息,我可以看到这两个选项:
1) 将所有重复项写入单个 Kafka 主题,然后使用类似 Kafka Streams(或任何其他流处理器,如 Flink、Spark Streaming 等)对消息进行去重并将去重结果写入新主题。
这是一个使用状态存储的 Kafka Streams 示例:https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
2) 确保重复的消息具有相同的消息密钥。之后,您需要启用 log compaction,Kafka 将 最终 删除重复项。这种方法不太可靠,但如果你适当地调整压缩设置,它可能会给你你想要的。
现在,Apache Kafka 支持精确一次交付:https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
您好,我的架构类似于下图所示。
我有两个 kafka 生产者,它们会向 kafka 主题发送消息,并经常重复消息。
有没有一种方法可以让我轻松处理这种情况,比如服务总线主题。
感谢您的帮助。
假设您实际上有 多个不同的生产者 编写相同的消息,我可以看到这两个选项:
1) 将所有重复项写入单个 Kafka 主题,然后使用类似 Kafka Streams(或任何其他流处理器,如 Flink、Spark Streaming 等)对消息进行去重并将去重结果写入新主题。
这是一个使用状态存储的 Kafka Streams 示例:https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
2) 确保重复的消息具有相同的消息密钥。之后,您需要启用 log compaction,Kafka 将 最终 删除重复项。这种方法不太可靠,但如果你适当地调整压缩设置,它可能会给你你想要的。
现在,Apache Kafka 支持精确一次交付:https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/