Spring cloud Kafka 失败时无限重试
Spring cloud Kafka does infinite retry when it fails
目前,我遇到一个问题,其中一个消费者函数抛出错误,导致 Kafka 一次又一次地重试记录。
@Bean
public Consumer<List<RuleEngineSubject>> processCohort() {
return personDtoList -> {
for(RuleEngineSubject subject : personDtoList)
processSubject(subject);
};
}
这是消费者 processSubject
抛出导致其失败的自定义错误。
processCohort-in-0:
destination: internal-process-cohort
consumer:
max-attempts: 1
batch-mode: true
concurrency: 10
group: process-cohort-group
以上是我的Kafka活页夹。
目前,我正在尝试重试 2 次,然后发送到死信队列,但我一直没有成功,不确定采用哪种方法是正确的。
我已经尝试实现一个自定义处理程序,该处理程序将在失败时处理错误但不会再次重试,而且我不确定如何发送到死信队列
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, dest, group) -> {
if (group.equals("process-cohort-group")) {
container.setBatchErrorHandler(new BatchErrorHandler() {
@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
System.out.println(data.records(dest).iterator().);
data.records(dest).forEach(r -> {
System.out.println(r.value());
});
System.out.println("failed payload='{}'" + thrownException.getLocalizedMessage());
}
});
}
};
}
这会停止无限重试,但不会发送死信队列。我能否获得有关如何重试两次然后发送死信队列的建议。据我了解,批处理侦听器在出现错误时无法恢复,有人可以帮助阐明这一点
您需要在监听器容器中配置合适的错误处理器;您可以在绑定中禁用重试和 dlq 并改用 DeadLetterPublishingRecoverer
。查看答案
重试15次后扔到topicname.DLT话题
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(
new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()), kafkaBackOffPolicy()));
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}
@Bean
public ExponentialBackOffWithMaxRetries kafkaBackOffPolicy() {
var exponentialBackOff = new ExponentialBackOffWithMaxRetries(15);
exponentialBackOff.setInitialInterval(Duration.ofMillis(500).toMillis());
exponentialBackOff.setMultiplier(2);
exponentialBackOff.setMaxInterval(Duration.ofSeconds(2).toMillis());
return exponentialBackOff;
}
目前,我遇到一个问题,其中一个消费者函数抛出错误,导致 Kafka 一次又一次地重试记录。
@Bean
public Consumer<List<RuleEngineSubject>> processCohort() {
return personDtoList -> {
for(RuleEngineSubject subject : personDtoList)
processSubject(subject);
};
}
这是消费者 processSubject
抛出导致其失败的自定义错误。
processCohort-in-0:
destination: internal-process-cohort
consumer:
max-attempts: 1
batch-mode: true
concurrency: 10
group: process-cohort-group
以上是我的Kafka活页夹。
目前,我正在尝试重试 2 次,然后发送到死信队列,但我一直没有成功,不确定采用哪种方法是正确的。
我已经尝试实现一个自定义处理程序,该处理程序将在失败时处理错误但不会再次重试,而且我不确定如何发送到死信队列
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, dest, group) -> {
if (group.equals("process-cohort-group")) {
container.setBatchErrorHandler(new BatchErrorHandler() {
@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
System.out.println(data.records(dest).iterator().);
data.records(dest).forEach(r -> {
System.out.println(r.value());
});
System.out.println("failed payload='{}'" + thrownException.getLocalizedMessage());
}
});
}
};
}
这会停止无限重试,但不会发送死信队列。我能否获得有关如何重试两次然后发送死信队列的建议。据我了解,批处理侦听器在出现错误时无法恢复,有人可以帮助阐明这一点
您需要在监听器容器中配置合适的错误处理器;您可以在绑定中禁用重试和 dlq 并改用 DeadLetterPublishingRecoverer
。查看答案
重试15次后扔到topicname.DLT话题
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(
new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()), kafkaBackOffPolicy()));
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}
@Bean
public ExponentialBackOffWithMaxRetries kafkaBackOffPolicy() {
var exponentialBackOff = new ExponentialBackOffWithMaxRetries(15);
exponentialBackOff.setInitialInterval(Duration.ofMillis(500).toMillis());
exponentialBackOff.setMultiplier(2);
exponentialBackOff.setMaxInterval(Duration.ofSeconds(2).toMillis());
return exponentialBackOff;
}