如何使用 Spring 重试在耗尽重试时包装异常
How to wrap exception on exhausted retries with Spring Retry
上下文:
我正在使用 spring-retry 重试 restTemplate 调用。
restTemplate 调用是从 kafka 侦听器调用的。
kafka 侦听器还配置为在错误时重试(如果在此过程中抛出任何异常,不仅是 restTemplate 调用)。
目标:
当错误来自已耗尽的重试模板时,我想阻止 kafka 重试。
实际行为:
当retryTemplate用完所有重试时,抛出原来的异常。因此阻止我识别错误是否被重试模板重试。
期望的行为:
当 retryTemplate 耗尽所有重试时,将原始异常包装在 RetryExhaustedException 中,这将允许我将其从 kafka 重试中列入黑名单。
问题:
我怎样才能做这样的事情?
谢谢
编辑
重试模板配置:
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
retryTemplate.setBackOffPolicy(backOffPolicy);
Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
retryableExceptions.put(FunctionalException.class, false);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions, true, true);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setThrowLastExceptionOnExhausted(false);
卡夫卡错误处理器
public class DefaultErrorHandler implements ErrorHandler {
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
Throwable exception = Optional.ofNullable(thrownException.getCause()).orElse(thrownException);
// TODO if exception as been retried in a RetryTemplate, stop it to prevent rollback and send it to a DLQ
// else rethrow exception, it will be rollback and handled by AfterRollbackProcessor to be retried
throw new KafkaException("Could not handle exception", thrownException);
}
}
监听卡夫卡:
@KafkaListener
public void onMessage(ConsumerRecord<String, String> record) {
retryTemplate.execute((args) -> {
throw new RuntimeException("Should be catched by ErrorHandler to prevent rollback");
}
throw new RuntimeException("Should be retried by afterRollbackProcessor");
}
只需使用配置为将 RetryExhaustedException
分类为不可重试的 SimplyRetryPolicy
配置侦听器重试模板。
请务必将 traverseCauses
属性 设置为 true,因为容器将所有侦听器异常包装在 ListenerExecutionFailedException
.
中
/**
* Create a {@link SimpleRetryPolicy} with the specified number of retry
* attempts. If traverseCauses is true, the exception causes will be traversed until
* a match is found. The default value indicates whether to retry or not for exceptions
* (or super classes) are not found in the map.
*
* @param maxAttempts the maximum number of attempts
* @param retryableExceptions the map of exceptions that are retryable based on the
* map value (true/false).
* @param traverseCauses is this clause traversable
* @param defaultValue the default action.
*/
public SimpleRetryPolicy(int maxAttempts, Map<Class<? extends Throwable>, Boolean> retryableExceptions,
boolean traverseCauses, boolean defaultValue) {
编辑
使用
template.execute((args) -> {...}, (context) -> throw new Blah(context.getLastThrowable()));
上下文:
我正在使用 spring-retry 重试 restTemplate 调用。
restTemplate 调用是从 kafka 侦听器调用的。 kafka 侦听器还配置为在错误时重试(如果在此过程中抛出任何异常,不仅是 restTemplate 调用)。
目标:
当错误来自已耗尽的重试模板时,我想阻止 kafka 重试。
实际行为:
当retryTemplate用完所有重试时,抛出原来的异常。因此阻止我识别错误是否被重试模板重试。
期望的行为:
当 retryTemplate 耗尽所有重试时,将原始异常包装在 RetryExhaustedException 中,这将允许我将其从 kafka 重试中列入黑名单。
问题:
我怎样才能做这样的事情?
谢谢
编辑
重试模板配置:
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
retryTemplate.setBackOffPolicy(backOffPolicy);
Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
retryableExceptions.put(FunctionalException.class, false);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions, true, true);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setThrowLastExceptionOnExhausted(false);
卡夫卡错误处理器
public class DefaultErrorHandler implements ErrorHandler {
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
Throwable exception = Optional.ofNullable(thrownException.getCause()).orElse(thrownException);
// TODO if exception as been retried in a RetryTemplate, stop it to prevent rollback and send it to a DLQ
// else rethrow exception, it will be rollback and handled by AfterRollbackProcessor to be retried
throw new KafkaException("Could not handle exception", thrownException);
}
}
监听卡夫卡:
@KafkaListener
public void onMessage(ConsumerRecord<String, String> record) {
retryTemplate.execute((args) -> {
throw new RuntimeException("Should be catched by ErrorHandler to prevent rollback");
}
throw new RuntimeException("Should be retried by afterRollbackProcessor");
}
只需使用配置为将 RetryExhaustedException
分类为不可重试的 SimplyRetryPolicy
配置侦听器重试模板。
请务必将 traverseCauses
属性 设置为 true,因为容器将所有侦听器异常包装在 ListenerExecutionFailedException
.
/**
* Create a {@link SimpleRetryPolicy} with the specified number of retry
* attempts. If traverseCauses is true, the exception causes will be traversed until
* a match is found. The default value indicates whether to retry or not for exceptions
* (or super classes) are not found in the map.
*
* @param maxAttempts the maximum number of attempts
* @param retryableExceptions the map of exceptions that are retryable based on the
* map value (true/false).
* @param traverseCauses is this clause traversable
* @param defaultValue the default action.
*/
public SimpleRetryPolicy(int maxAttempts, Map<Class<? extends Throwable>, Boolean> retryableExceptions,
boolean traverseCauses, boolean defaultValue) {
编辑
使用
template.execute((args) -> {...}, (context) -> throw new Blah(context.getLastThrowable()));