Spring Kafka Listener 收到重复消息
Spring Kafka Listener receiving duplicate messages
我的 Spring Kafka 侦听器正在接收重复的消息,我可以看到消息是从相同的分区和偏移量以及相同的时间戳轮询的。在我的代码中,我跟踪每条传入消息并识别重复项,但在这种情况下,我什至不能拒绝处理这两条消息——原始消息和重复消息几乎同时收到,而且第一条记录甚至没有提交数据库跟踪 table。
1.Please 建议我应该如何避免轮询重复的消息,我不明白为什么它被轮询两次 - 仅在负载下。
2. 如果正在处理消息 1 元数据但未在
中提交,我如何在跟踪 table 中处理此问题
the tracking table, message 2 comes and is not able to find that record in tracking table and proceeds with processing the duplicate message again.
config of the listener based on my use case:
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
两个消费者需要具有相同的 group.id
属性 以便分区分布在他们之间。
如果他们在不同的消费组,他们都会得到所有的记录。
我的 Spring Kafka 侦听器正在接收重复的消息,我可以看到消息是从相同的分区和偏移量以及相同的时间戳轮询的。在我的代码中,我跟踪每条传入消息并识别重复项,但在这种情况下,我什至不能拒绝处理这两条消息——原始消息和重复消息几乎同时收到,而且第一条记录甚至没有提交数据库跟踪 table。 1.Please 建议我应该如何避免轮询重复的消息,我不明白为什么它被轮询两次 - 仅在负载下。 2. 如果正在处理消息 1 元数据但未在
中提交,我如何在跟踪 table 中处理此问题the tracking table, message 2 comes and is not able to find that record in tracking table and proceeds with processing the duplicate message again.
config of the listener based on my use case:
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
两个消费者需要具有相同的 group.id
属性 以便分区分布在他们之间。
如果他们在不同的消费组,他们都会得到所有的记录。