spring amqp默认开启重试,根据指定的异常阻止
spring amqp enable retry by default and prevent it according to a specified exception
在异常A的情况下:重试有限次,最后当重试次数用完时,将消息写入死信队列
在异常 B 的情况下:简单地说,消息应该写入死信队列
我正在尝试实现与中相同的用例,并且我已按照正确答案执行了所有步骤,但我仍然看到我的自定义异常被包装到 ListenerExecutionFailedException 中。我无法让它停止重试自定义异常。
我已按照以下答案中的步骤进行操作
和
@Bean
public SimpleRetryPolicy rejectionRetryPolicy(){
Map<Class<? extends Throwable> , Boolean> exceptionsMap = new HashMap<Class<? extends Throwable> , Boolean>();
exceptionsMap.put(AmqpRejectAndDontRequeueException.class, false);//not retriable
exceptionsMap.put(ListenerExecutionFailedException.class, true); //retriable
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3 , exceptionsMap ,true);
return retryPolicy;
}
@Bean
public RetryOperationsInterceptor workMessagesRetryInterceptor() {
return RetryInterceptorBuilder.stateless().retryPolicy(rejectionRetryPolicy())
//.backOffOptions(1000, 2, 10000)
.recoverer(new RepublishMessageRecoverer(defaultTemplate, this.getDlqExchange(), this.getDlqroutingkey()))
.build();
}
/* My Rabbit MQ Error handler */
@Component
public class RabbitRetryHandler implements RabbitListenerErrorHandler {
private static final Logger log = LoggerFactory.getLogger(RabbitRetryHandler.class);
@Override
public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
ListenerExecutionFailedException exception) throws Exception {
if (amqpMessage.getMessageProperties().isRedelivered() || exception.getCause().getMessage().equals("DontRetry")) {
throw new AmqpRejectAndDontRequeueException(exception.getCause());
} else {
throw exception;
}
}
}
/* And finally my Listener */
@Override
@RabbitListener(queues = "${queueconfig.queuename}",containerFactory = "sdRabbitListenerContainerFactory",errorHandler="rabbitRetryHandler")
public void processMessage(Message incomingMsg) throws Exception {
log.info("{} - Correlation ID: {} Received message: {} from {} queue.", Thread.currentThread().getId(),
incomingMsg.getMessageProperties().getCorrelationId(), new String(incomingMsg.getBody()),
incomingMsg.getMessageProperties().getConsumerQueue());
try {
performAction();
} catch(CustomDontRequeueException cex) {
throw cex;
} catch (Exception ex) {
throw ex;
}
}
@Override
public void performAction() throws Exception {
try {
} catch (HttpClientErrorException ex) {
if (ex.getStatusCode() == HttpStatus.NOT_FOUND || ex.getStatusCode() == HttpStatus.REQUEST_TIMEOUT) {
throw new RuntimeException(ex);
} else {
throw new CustomDontRequeueException("DontRetry",ex);
}
}catch (Exception e) {
throw new CustomDontRequeueException(e);
}
}
预期结果,如果抛出 CustomDontRequeueException,则不应重新排队消息。
实际结果,不管是什么异常,消息都会重新排队 n 次,然后丢弃到 DLQ。
exceptionsMap.put(AmqpRejectAndDontRequeueException.class, false);//not retriable
exceptionsMap.put(ListenerExecutionFailedException.class, true); //retriable
您尚未配置 CustomDontRequeueException
不重试,您已配置 AmqpRejectAndDontRequeueException
。
此外,你不应该显式设置ListenerExecutionFailedException
,因为它会被首先找到,防止原因遍历。
我修改的代码:
/* Kept 2 different custom exceptions one for retry and one for not retry*/
@Bean
public SimpleRetryPolicy rejectionRetryPolicy(){
Map<Class<? extends Throwable> , Boolean> exceptionsMap = new HashMap<Class<? extends Throwable> , Boolean>();
exceptionsMap.put(CustomDontRequeueException.class, false);//not retriable
exceptionsMap.put(CustomRequeueException.class, true);//retriable
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3 , exceptionsMap ,true);
return retryPolicy;
}
@Bean
public RetryOperationsInterceptor workMessagesRetryInterceptor() {
return RetryInterceptorBuilder.stateless().retryPolicy(rejectionRetryPolicy())
.backOffPolicy(exponentialBackOffPolicy)
.recoverer(new RepublishMessageRecoverer(defaultTemplate, this.getDlqExchange(), this.getDlqroutingkey()))
.build();
}
/* Listener --- removed error handler*/
@Override
@RabbitListener(queues = "${queueconfig.signdocuments.queuename}",containerFactory = "asdrabbitListenerContainerFactory")
public void processMessage(Message incomingMsg) throws Exception {
try {
performAction();
} catch(CustomDontRequeueException cex) {
throw cex;
} catch (Exception ex) {
throw ex;
}
}
/* Action which throws custom exceptions depending on what exceptions they get*/
@Override
public void performAction() throws Exception {
try {
} catch (HttpClientErrorException ex) {
if (ex.getStatusCode() == HttpStatus.NOT_FOUND || ex.getStatusCode() == HttpStatus.REQUEST_TIMEOUT) {
throw new CustomRequeueException("Retry",ex);
} else {
throw new CustomDontRequeueException("DontRetry",ex);
}
}catch (Exception e) {
throw new CustomDontRequeueException("DontRetry",e);
}
}
在异常A的情况下:重试有限次,最后当重试次数用完时,将消息写入死信队列
在异常 B 的情况下:简单地说,消息应该写入死信队列
我正在尝试实现与中相同的用例,并且我已按照正确答案执行了所有步骤,但我仍然看到我的自定义异常被包装到 ListenerExecutionFailedException 中。我无法让它停止重试自定义异常。
我已按照以下答案中的步骤进行操作
和
@Bean
public SimpleRetryPolicy rejectionRetryPolicy(){
Map<Class<? extends Throwable> , Boolean> exceptionsMap = new HashMap<Class<? extends Throwable> , Boolean>();
exceptionsMap.put(AmqpRejectAndDontRequeueException.class, false);//not retriable
exceptionsMap.put(ListenerExecutionFailedException.class, true); //retriable
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3 , exceptionsMap ,true);
return retryPolicy;
}
@Bean
public RetryOperationsInterceptor workMessagesRetryInterceptor() {
return RetryInterceptorBuilder.stateless().retryPolicy(rejectionRetryPolicy())
//.backOffOptions(1000, 2, 10000)
.recoverer(new RepublishMessageRecoverer(defaultTemplate, this.getDlqExchange(), this.getDlqroutingkey()))
.build();
}
/* My Rabbit MQ Error handler */
@Component
public class RabbitRetryHandler implements RabbitListenerErrorHandler {
private static final Logger log = LoggerFactory.getLogger(RabbitRetryHandler.class);
@Override
public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
ListenerExecutionFailedException exception) throws Exception {
if (amqpMessage.getMessageProperties().isRedelivered() || exception.getCause().getMessage().equals("DontRetry")) {
throw new AmqpRejectAndDontRequeueException(exception.getCause());
} else {
throw exception;
}
}
}
/* And finally my Listener */
@Override
@RabbitListener(queues = "${queueconfig.queuename}",containerFactory = "sdRabbitListenerContainerFactory",errorHandler="rabbitRetryHandler")
public void processMessage(Message incomingMsg) throws Exception {
log.info("{} - Correlation ID: {} Received message: {} from {} queue.", Thread.currentThread().getId(),
incomingMsg.getMessageProperties().getCorrelationId(), new String(incomingMsg.getBody()),
incomingMsg.getMessageProperties().getConsumerQueue());
try {
performAction();
} catch(CustomDontRequeueException cex) {
throw cex;
} catch (Exception ex) {
throw ex;
}
}
@Override
public void performAction() throws Exception {
try {
} catch (HttpClientErrorException ex) {
if (ex.getStatusCode() == HttpStatus.NOT_FOUND || ex.getStatusCode() == HttpStatus.REQUEST_TIMEOUT) {
throw new RuntimeException(ex);
} else {
throw new CustomDontRequeueException("DontRetry",ex);
}
}catch (Exception e) {
throw new CustomDontRequeueException(e);
}
}
预期结果,如果抛出 CustomDontRequeueException,则不应重新排队消息。
实际结果,不管是什么异常,消息都会重新排队 n 次,然后丢弃到 DLQ。
exceptionsMap.put(AmqpRejectAndDontRequeueException.class, false);//not retriable
exceptionsMap.put(ListenerExecutionFailedException.class, true); //retriable
您尚未配置 CustomDontRequeueException
不重试,您已配置 AmqpRejectAndDontRequeueException
。
此外,你不应该显式设置ListenerExecutionFailedException
,因为它会被首先找到,防止原因遍历。
我修改的代码:
/* Kept 2 different custom exceptions one for retry and one for not retry*/
@Bean
public SimpleRetryPolicy rejectionRetryPolicy(){
Map<Class<? extends Throwable> , Boolean> exceptionsMap = new HashMap<Class<? extends Throwable> , Boolean>();
exceptionsMap.put(CustomDontRequeueException.class, false);//not retriable
exceptionsMap.put(CustomRequeueException.class, true);//retriable
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3 , exceptionsMap ,true);
return retryPolicy;
}
@Bean
public RetryOperationsInterceptor workMessagesRetryInterceptor() {
return RetryInterceptorBuilder.stateless().retryPolicy(rejectionRetryPolicy())
.backOffPolicy(exponentialBackOffPolicy)
.recoverer(new RepublishMessageRecoverer(defaultTemplate, this.getDlqExchange(), this.getDlqroutingkey()))
.build();
}
/* Listener --- removed error handler*/
@Override
@RabbitListener(queues = "${queueconfig.signdocuments.queuename}",containerFactory = "asdrabbitListenerContainerFactory")
public void processMessage(Message incomingMsg) throws Exception {
try {
performAction();
} catch(CustomDontRequeueException cex) {
throw cex;
} catch (Exception ex) {
throw ex;
}
}
/* Action which throws custom exceptions depending on what exceptions they get*/
@Override
public void performAction() throws Exception {
try {
} catch (HttpClientErrorException ex) {
if (ex.getStatusCode() == HttpStatus.NOT_FOUND || ex.getStatusCode() == HttpStatus.REQUEST_TIMEOUT) {
throw new CustomRequeueException("Retry",ex);
} else {
throw new CustomDontRequeueException("DontRetry",ex);
}
}catch (Exception e) {
throw new CustomDontRequeueException("DontRetry",e);
}
}