SimpleRabbitListenerContainerFactory 和 defaultRequeueRejected
SimpleRabbitListenerContainerFactory and defaultRequeueRejected
根据文档,defaultRequeueRejected 的默认值为 true,但查看代码似乎是 false。我不确定我是否遗漏了什么,或者我们必须在 SimpleRabbitListenerContainerFactory.java
中更改它
编辑
示例代码,将消息放入测试队列后,我希望它在失败后留在队列中,但它正在将其丢弃。我希望重试消息,所以我在容器工厂中配置了如果重试后消息失败,我希望它回到队列中。我确定我在这里缺少理解。
@SpringBootApplication
public class MsgRequeExampleApplication {
public static void main(String[] args) {
SpringApplication.run(MsgRequeExampleApplication.class, args);
}
@Bean(name = "myContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setMissingQueuesFatal(false);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(500);
factory.setAdviceChain(new Advice[] { org.springframework.amqp.rabbit.config.RetryInterceptorBuilder.stateless()
.maxAttempts(2).backOffPolicy(backOffPolicy).build() });
return factory;
}
@RabbitListener(queues = "test", containerFactory = "myContainerFactory")
public void processAdvisory(Message message) throws MyBusinessException {
try{
//Simulating exception while processing message
String nullString=null;
nullString.length();
}catch(Exception ex){
throw new MyBusinessException(ex.getMessage());
}
}
public class MyBusinessException extends Exception {
public MyBusinessException(String msg) {
super(msg);
}
}
}
SimpleMessageListenerContainer
JavaDocs 中有很好的描述:
/**
* Set the default behavior when a message is rejected, for example because the listener
* threw an exception. When true, messages will be requeued, when false, they will not. For
* versions of Rabbit that support dead-lettering, the message must not be requeued in order
* to be sent to the dead letter exchange. Setting to false causes all rejections to not
* be requeued. When true, the default can be overridden by the listener throwing an
* {@link AmqpRejectAndDontRequeueException}. Default true.
* @param defaultRequeueRejected true to reject by default.
*/
public void setDefaultRequeueRejected(boolean defaultRequeueRejected) {
this.defaultRequeueRejected = defaultRequeueRejected;
}
你觉得有意义吗?
更新
要在重试耗尽后重新排队,您需要在 RetryInterceptorBuilder
上配置一些自定义 MessageRecoverer
,代码如下:
.recoverer((message, cause) -> {
ReflectionUtils.rethrowRuntimeException(cause);
})
这样,异常将被抛出到侦听器容器,并根据其 defaultRequeueRejected
消息是否重新排队。
根据文档,defaultRequeueRejected 的默认值为 true,但查看代码似乎是 false。我不确定我是否遗漏了什么,或者我们必须在 SimpleRabbitListenerContainerFactory.java
中更改它编辑
示例代码,将消息放入测试队列后,我希望它在失败后留在队列中,但它正在将其丢弃。我希望重试消息,所以我在容器工厂中配置了如果重试后消息失败,我希望它回到队列中。我确定我在这里缺少理解。
@SpringBootApplication
public class MsgRequeExampleApplication {
public static void main(String[] args) {
SpringApplication.run(MsgRequeExampleApplication.class, args);
}
@Bean(name = "myContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setMissingQueuesFatal(false);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(500);
factory.setAdviceChain(new Advice[] { org.springframework.amqp.rabbit.config.RetryInterceptorBuilder.stateless()
.maxAttempts(2).backOffPolicy(backOffPolicy).build() });
return factory;
}
@RabbitListener(queues = "test", containerFactory = "myContainerFactory")
public void processAdvisory(Message message) throws MyBusinessException {
try{
//Simulating exception while processing message
String nullString=null;
nullString.length();
}catch(Exception ex){
throw new MyBusinessException(ex.getMessage());
}
}
public class MyBusinessException extends Exception {
public MyBusinessException(String msg) {
super(msg);
}
}
}
SimpleMessageListenerContainer
JavaDocs 中有很好的描述:
/**
* Set the default behavior when a message is rejected, for example because the listener
* threw an exception. When true, messages will be requeued, when false, they will not. For
* versions of Rabbit that support dead-lettering, the message must not be requeued in order
* to be sent to the dead letter exchange. Setting to false causes all rejections to not
* be requeued. When true, the default can be overridden by the listener throwing an
* {@link AmqpRejectAndDontRequeueException}. Default true.
* @param defaultRequeueRejected true to reject by default.
*/
public void setDefaultRequeueRejected(boolean defaultRequeueRejected) {
this.defaultRequeueRejected = defaultRequeueRejected;
}
你觉得有意义吗?
更新
要在重试耗尽后重新排队,您需要在 RetryInterceptorBuilder
上配置一些自定义 MessageRecoverer
,代码如下:
.recoverer((message, cause) -> {
ReflectionUtils.rethrowRuntimeException(cause);
})
这样,异常将被抛出到侦听器容器,并根据其 defaultRequeueRejected
消息是否重新排队。