使用 DeadLetterPublishingRecoverer 的状态重试导致 RetryCacheCapacityExceededException
Stateful-Retry with DeadLetterPublishingRecoverer causing RetryCacheCapacityExceededException
我的容器工厂有一个 SeekToCurrentErrorHandler,它使用 DeadLetterPublishingRecoverer 发布到 DLT,某些 'NotRetryableException' 类型的异常,并无限次地为其他类型的异常寻找相同的偏移量。使用此设置,在导致不可重试异常的一定数量的有效负载之后,存储重试上下文的映射 - MapRetryContextCache (spring-retry) 溢出并抛出 RetryCacheCapacityExceededException。从最初的外观来看,DLT 恢复器要处理的消息的重试上下文不会从 MapRetryContextCache 中删除。或者我的配置不正确。
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate),-1);
eh.addNotRetryableException(SomeNonRetryableException.class);
eh.setCommitRecovered(true);
ConcurrentKafkaListenerContainerFactory<String, String> factory
= getContainerFactory();
factory.setErrorHandler(eh);
factory.setRetryTemplate(retryTemplate);
factory.setStatefulRetry(true);
为了清除缓存,您必须在重试模板中进行恢复,而不是在错误处理程序中。
@SpringBootApplication
public class So56846940Application {
public static void main(String[] args) {
SpringApplication.run(So56846940Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so56846940").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicDLT() {
return TopicBuilder.name("so56846940.DLT").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
DeadLetterPublishingRecoverer recoverer) {
factory.setRetryTemplate(new RetryTemplate());
factory.setStatefulRetry(true);
factory.setRecoveryCallback(context -> {
recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
(Exception) context.getLastThrowable());
return null;
});
return args -> IntStream.range(0, 5000).forEach(i -> template.send("so56846940", "foo"));
}
@KafkaListener(id = "so56846940", topics = "so56846940")
public void listen(String in) {
System.out.println(in);
throw new RuntimeException();
}
@Bean
public DeadLetterPublishingRecoverer recoverer(KafkaTemplate<String, String> template) {
return new DeadLetterPublishingRecoverer(template);
}
@Bean
public SeekToCurrentErrorHandler eh() {
return new SeekToCurrentErrorHandler(4);
}
}
错误处理程序的重试次数必须至少与重试模板一样多,以便重试次数耗尽并清除缓存。
您还应该使用与错误处理程序相同的不可重试异常来配置 RetryTemplate。
我们会在参考手册中说明。
我的容器工厂有一个 SeekToCurrentErrorHandler,它使用 DeadLetterPublishingRecoverer 发布到 DLT,某些 'NotRetryableException' 类型的异常,并无限次地为其他类型的异常寻找相同的偏移量。使用此设置,在导致不可重试异常的一定数量的有效负载之后,存储重试上下文的映射 - MapRetryContextCache (spring-retry) 溢出并抛出 RetryCacheCapacityExceededException。从最初的外观来看,DLT 恢复器要处理的消息的重试上下文不会从 MapRetryContextCache 中删除。或者我的配置不正确。
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate),-1);
eh.addNotRetryableException(SomeNonRetryableException.class);
eh.setCommitRecovered(true);
ConcurrentKafkaListenerContainerFactory<String, String> factory
= getContainerFactory();
factory.setErrorHandler(eh);
factory.setRetryTemplate(retryTemplate);
factory.setStatefulRetry(true);
为了清除缓存,您必须在重试模板中进行恢复,而不是在错误处理程序中。
@SpringBootApplication
public class So56846940Application {
public static void main(String[] args) {
SpringApplication.run(So56846940Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so56846940").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicDLT() {
return TopicBuilder.name("so56846940.DLT").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
DeadLetterPublishingRecoverer recoverer) {
factory.setRetryTemplate(new RetryTemplate());
factory.setStatefulRetry(true);
factory.setRecoveryCallback(context -> {
recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
(Exception) context.getLastThrowable());
return null;
});
return args -> IntStream.range(0, 5000).forEach(i -> template.send("so56846940", "foo"));
}
@KafkaListener(id = "so56846940", topics = "so56846940")
public void listen(String in) {
System.out.println(in);
throw new RuntimeException();
}
@Bean
public DeadLetterPublishingRecoverer recoverer(KafkaTemplate<String, String> template) {
return new DeadLetterPublishingRecoverer(template);
}
@Bean
public SeekToCurrentErrorHandler eh() {
return new SeekToCurrentErrorHandler(4);
}
}
错误处理程序的重试次数必须至少与重试模板一样多,以便重试次数耗尽并清除缓存。
您还应该使用与错误处理程序相同的不可重试异常来配置 RetryTemplate。
我们会在参考手册中说明。