消费者多次消费同一条消息
Consumer is consuming same message multiple time
我有一个 Kafka 消费者那个消费者消息。我在 KafkaListenerContainerFactory 中设置了重试模板。但是不知道为什么它两次消费同一条消息。当应用程序通过使用指定计数调用的异常重试模板时,Kafka 也会使用相同计数使用相同消息。
@Bean
RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean("KafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> KafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate);
factory.setRecoveryCallback(context -> {
log.error("Maximum retry policy has been reached");
return null;
});
factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
return factory;
}
消费者
@KafkaListener(topics = "${kafka.topic.json}", containerFactory = "kafkaListenerContainerFactory")
public void recieveSegmentService(String KafkaPayload) throws Exception {
KafkaSegmentTrigger kafkaSegmentTrigger;
kafkaSegmentTrigger = TransformUtil.fromJson(KafkaPayload, KafkaSegmentTrigger.class);
log.info("Trigger recieved from segment service {}", kafkaSegmentTrigger);
try {
processMessage(kafkaSegmentTrigger);
} catch (Exception e) {
retryTemplate.execute(arg0 -> {
processMessage(kafkaSegmentTrigger);
return null;
});
}finally {
}
}
processMessage 抛出异常
您嵌套了 RetryTemplate
s - 一个在侦听器外部,一个在侦听器内部。如果您在两个地方都使用相同的模板,您将获得 12 次尝试(3 次由侦听器内部的侦听器适配器 x4 驱动)。
使用其中之一。
我有一个 Kafka 消费者那个消费者消息。我在 KafkaListenerContainerFactory 中设置了重试模板。但是不知道为什么它两次消费同一条消息。当应用程序通过使用指定计数调用的异常重试模板时,Kafka 也会使用相同计数使用相同消息。
@Bean
RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean("KafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> KafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate);
factory.setRecoveryCallback(context -> {
log.error("Maximum retry policy has been reached");
return null;
});
factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
return factory;
}
消费者
@KafkaListener(topics = "${kafka.topic.json}", containerFactory = "kafkaListenerContainerFactory")
public void recieveSegmentService(String KafkaPayload) throws Exception {
KafkaSegmentTrigger kafkaSegmentTrigger;
kafkaSegmentTrigger = TransformUtil.fromJson(KafkaPayload, KafkaSegmentTrigger.class);
log.info("Trigger recieved from segment service {}", kafkaSegmentTrigger);
try {
processMessage(kafkaSegmentTrigger);
} catch (Exception e) {
retryTemplate.execute(arg0 -> {
processMessage(kafkaSegmentTrigger);
return null;
});
}finally {
}
}
processMessage 抛出异常
您嵌套了 RetryTemplate
s - 一个在侦听器外部,一个在侦听器内部。如果您在两个地方都使用相同的模板,您将获得 12 次尝试(3 次由侦听器内部的侦听器适配器 x4 驱动)。
使用其中之一。