在 Spring RabbitMQ 中,我抛出 AmqpRejectAndDontRequeueException 但消息仍然重新排队
In Spring RabbitMQ I throw AmqpRejectAndDontRequeueException but message still requeue
我的服务监听 RabbitMQ 队列。我在消费者端配置重试策略。当我抛出异常时,所有死信消息都会重新排队。但是取决于我的业务逻辑,在抛出 StopRequeueException(除 SmsException 之外的每个异常)之后,我想停止重试此消息。但是消息仍然重新排队。
这是我的配置
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
initial-interval: 3s
max-attempts: 10
max-interval: 12s
multiplier: 2
missing-queues-fatal: false
if (!checkMobileService.isMobileNumberAdmitted(mobileNumber())) {
throw new StopRequeueException("SMS_BIMTEK.MOBILE_NUMBER_IS_NOT_ADMITTED");
}
我的错误处理程序:
public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
if (!(t.getCause() instanceof SmsException)) {
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
}
}
}
调用错误处理程序不在重试范围内;重试次数耗尽后调用。
您需要在重试级别分类哪些异常是可重试的,并在恢复器中进行转换。
这是一个例子:
@SpringBootApplication
public class So67406799Application {
public static void main(String[] args) {
SpringApplication.run(So67406799Application.class, args);
}
@Bean
public RabbitRetryTemplateCustomizer customizer(
@Value("${spring.rabbitmq.listener.simple.retry.max-attempts}") int attempts) {
return (target, template) -> template.setRetryPolicy(new SimpleRetryPolicy(attempts,
Map.of(StopRequeueException.class, false), true, true));
}
@Bean
MessageRecoverer recoverer() {
return (msg, cause) -> {
throw new AmqpRejectAndDontRequeueException("Stop requeue after " +
RetrySynchronizationManager.getContext().getRetryCount() + " attempts");
};
}
@RabbitListener(queues = "so67406799")
void listen(String in) {
System.out.println(in);
if (in.equals("dontRetry")) {
throw new StopRequeueException("test");
}
throw new RuntimeException("test");
}
@Bean
Queue queue() {
return new Queue("so67406799");
}
}
@SuppressWarnings("serial")
class StopRequeueException extends NestedRuntimeException {
public StopRequeueException(String msg) {
super(msg);
}
}
编辑
自定义程序被Spring Boot 调用一次;在设置重试策略和退避策略后调用它。参见 RetryTemplateFactory
。
在这种情况下,定制器将重试策略替换为带有异常分类器的新策略(这就是我们需要在此处注入最大尝试次数的原因)。
参见 SimpleRetryPolicy
构造函数。
/**
* Create a {@link SimpleRetryPolicy} with the specified number of retry attempts. If
* traverseCauses is true, the exception causes will be traversed until a match is
* found. The default value indicates whether to retry or not for exceptions (or super
* classes) are not found in the map.
* @param maxAttempts the maximum number of attempts
* @param retryableExceptions the map of exceptions that are retryable based on the
* map value (true/false).
* @param traverseCauses true to traverse the exception cause chain until a classified
* exception is found or the root cause is reached.
* @param defaultValue the default action.
*/
public SimpleRetryPolicy(int maxAttempts, Map<Class<? extends Throwable>, Boolean> retryableExceptions,
boolean traverseCauses, boolean defaultValue) {
上面配置中的最后一个布尔值 (true) 是默认行为(重试映射中不存在的异常),第三个 (true) 告诉策略遵循原因链来查找异常(比如您在错误处理程序中的 getCause()
)。地图 <Exception, Boolean>
表示不要重试这个。
您也可以以相反的方式配置它(映射值中的默认值 false
和 true
),明确说明您想要重试哪些异常,而不要重试所有其他异常。
为所有异常调用 MessageRecoverer
,或者立即为分类异常调用,或者当其他异常用尽重试时调用。
我的服务监听 RabbitMQ 队列。我在消费者端配置重试策略。当我抛出异常时,所有死信消息都会重新排队。但是取决于我的业务逻辑,在抛出 StopRequeueException(除 SmsException 之外的每个异常)之后,我想停止重试此消息。但是消息仍然重新排队。 这是我的配置
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
initial-interval: 3s
max-attempts: 10
max-interval: 12s
multiplier: 2
missing-queues-fatal: false
if (!checkMobileService.isMobileNumberAdmitted(mobileNumber())) {
throw new StopRequeueException("SMS_BIMTEK.MOBILE_NUMBER_IS_NOT_ADMITTED");
}
我的错误处理程序:
public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
if (!(t.getCause() instanceof SmsException)) {
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
}
}
}
调用错误处理程序不在重试范围内;重试次数耗尽后调用。
您需要在重试级别分类哪些异常是可重试的,并在恢复器中进行转换。
这是一个例子:
@SpringBootApplication
public class So67406799Application {
public static void main(String[] args) {
SpringApplication.run(So67406799Application.class, args);
}
@Bean
public RabbitRetryTemplateCustomizer customizer(
@Value("${spring.rabbitmq.listener.simple.retry.max-attempts}") int attempts) {
return (target, template) -> template.setRetryPolicy(new SimpleRetryPolicy(attempts,
Map.of(StopRequeueException.class, false), true, true));
}
@Bean
MessageRecoverer recoverer() {
return (msg, cause) -> {
throw new AmqpRejectAndDontRequeueException("Stop requeue after " +
RetrySynchronizationManager.getContext().getRetryCount() + " attempts");
};
}
@RabbitListener(queues = "so67406799")
void listen(String in) {
System.out.println(in);
if (in.equals("dontRetry")) {
throw new StopRequeueException("test");
}
throw new RuntimeException("test");
}
@Bean
Queue queue() {
return new Queue("so67406799");
}
}
@SuppressWarnings("serial")
class StopRequeueException extends NestedRuntimeException {
public StopRequeueException(String msg) {
super(msg);
}
}
编辑
自定义程序被Spring Boot 调用一次;在设置重试策略和退避策略后调用它。参见 RetryTemplateFactory
。
在这种情况下,定制器将重试策略替换为带有异常分类器的新策略(这就是我们需要在此处注入最大尝试次数的原因)。
参见 SimpleRetryPolicy
构造函数。
/**
* Create a {@link SimpleRetryPolicy} with the specified number of retry attempts. If
* traverseCauses is true, the exception causes will be traversed until a match is
* found. The default value indicates whether to retry or not for exceptions (or super
* classes) are not found in the map.
* @param maxAttempts the maximum number of attempts
* @param retryableExceptions the map of exceptions that are retryable based on the
* map value (true/false).
* @param traverseCauses true to traverse the exception cause chain until a classified
* exception is found or the root cause is reached.
* @param defaultValue the default action.
*/
public SimpleRetryPolicy(int maxAttempts, Map<Class<? extends Throwable>, Boolean> retryableExceptions,
boolean traverseCauses, boolean defaultValue) {
上面配置中的最后一个布尔值 (true) 是默认行为(重试映射中不存在的异常),第三个 (true) 告诉策略遵循原因链来查找异常(比如您在错误处理程序中的 getCause()
)。地图 <Exception, Boolean>
表示不要重试这个。
您也可以以相反的方式配置它(映射值中的默认值 false
和 true
),明确说明您想要重试哪些异常,而不要重试所有其他异常。
为所有异常调用 MessageRecoverer
,或者立即为分类异常调用,或者当其他异常用尽重试时调用。