Spring 启动 rabbitmq 消息未重新排队
Spring boot rabbitmq message not getting requeued
您好,如果抛出特定异常,我正在尝试重新排队某些消息,但对于任何验证失败,我希望它们直接进入死信队列。我启用了相关队列和死信队列。我发现我的验证失败到达了 dlq,但其他失败一直处于 unack 状态并不断重试,超出了我设置的最大尝试次数和乘数,知道这是为什么吗?下面的代码我正在使用 Spring boot 2.0.4 版本
@RabbitListener(queues = "${queuename}")
public void consume(final @Valid @Payload MyRequest myRequest) {
if (method.fail()) {
throw new RuntimeException("");
}
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(jackson2Converter());
factory.setValidator(amqpValidator());
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
@Bean
public Validator amqpValidator() {
return new OptionalValidatorFactoryBean();
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory listenerContainerFactory =
new SimpleRabbitListenerContainerFactory();
listenerContainerFactory.setConnectionFactory(connectionFactory());
listenerContainerFactory.setErrorHandler(new ConditionalRejectingErrorHandler(
new MyErrorPayload()));
listenerContainerFactory.setMessageConverter(jackson2JsonMessageConverter());
return listenerContainerFactory;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitQueueHost);
connectionFactory.setUsername(rabbitQueueUsername);
connectionFactory.setPassword(rabbitQueuePassword);
connectionFactory.setVirtualHost(rabbitQueueVirtualHost);
return connectionFactory;
}
public class MyErrorPayload implements FatalExceptionStrategy {
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException &&
(t.getCause() instanceof MessageConversionException ||
t.getCause() instanceof MethodArgumentNotValidException )
) {
return true;
}
return false;
}
}
application.yml (属性)
spring:
rabbitmq:
host: localhost
username: uu
password: pp
virtual-host: /
listener:
simple:
default-requeue-rejected: false
retry:
enabled: true
initial-interval: 2000
multiplier: 1.5
max-interval: 10000
max-attempts: 3
这是因为您没有为容器工厂使用 Boot 的自动配置。所以忽略重试配置。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory listenerContainerFactory =
new SimpleRabbitListenerContainerFactory();
listenerContainerFactory.setConnectionFactory(connectionFactory());
listenerContainerFactory.setErrorHandler(new ConditionalRejectingErrorHandler(
new MyErrorPayload()));
listenerContainerFactory.setMessageConverter(jackson2JsonMessageConverter());
return listenerContainerFactory;
}
@Barath 在他的评论中引用的示例也是如此。
将配置器注入您的工厂方法并调用它;例如,对于那个样本...
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(errorHandler());
return factory;
}
如果只有一个消息转换器 Bean,配置器也会添加它。
我已经更新了示例。
编辑
选择性异常的自定义重试策略;以下禁用 ValidationException
的重试,但重试所有其他。 (同样,对于示例应用程序)...
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer, RabbitProperties properties) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(errorHandler());
ListenerRetry retryConfig = properties.getListener().getSimple().getRetry();
if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
? RetryInterceptorBuilder.stateless()
: RetryInterceptorBuilder.stateful());
RetryTemplate retryTemplate = new RetryTemplate();
Map<Class<? extends Throwable>, Boolean> retryableExceptions = Collections
.singletonMap(ValidationException.class, false);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(retryConfig.getMaxAttempts(),
retryableExceptions, true, true); // retry all exceptions except Validation
retryTemplate.setRetryPolicy(retryPolicy);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(retryConfig.getInitialInterval().toMillis());
backOffPolicy.setMaxInterval(retryConfig.getMaxInterval().toMillis());
backOffPolicy.setMultiplier(retryConfig.getMultiplier());
retryTemplate.setBackOffPolicy(backOffPolicy);
builder.retryOperations(retryTemplate);
builder.recoverer(new RejectAndDontRequeueRecoverer());
factory.setAdviceChain(builder.build());
}
return factory;
}
自从您 default-requeue-rejected: false
.
以来,没有消息被重新排队
您好,如果抛出特定异常,我正在尝试重新排队某些消息,但对于任何验证失败,我希望它们直接进入死信队列。我启用了相关队列和死信队列。我发现我的验证失败到达了 dlq,但其他失败一直处于 unack 状态并不断重试,超出了我设置的最大尝试次数和乘数,知道这是为什么吗?下面的代码我正在使用 Spring boot 2.0.4 版本
@RabbitListener(queues = "${queuename}")
public void consume(final @Valid @Payload MyRequest myRequest) {
if (method.fail()) {
throw new RuntimeException("");
}
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(jackson2Converter());
factory.setValidator(amqpValidator());
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
@Bean
public Validator amqpValidator() {
return new OptionalValidatorFactoryBean();
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory listenerContainerFactory =
new SimpleRabbitListenerContainerFactory();
listenerContainerFactory.setConnectionFactory(connectionFactory());
listenerContainerFactory.setErrorHandler(new ConditionalRejectingErrorHandler(
new MyErrorPayload()));
listenerContainerFactory.setMessageConverter(jackson2JsonMessageConverter());
return listenerContainerFactory;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitQueueHost);
connectionFactory.setUsername(rabbitQueueUsername);
connectionFactory.setPassword(rabbitQueuePassword);
connectionFactory.setVirtualHost(rabbitQueueVirtualHost);
return connectionFactory;
}
public class MyErrorPayload implements FatalExceptionStrategy {
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException &&
(t.getCause() instanceof MessageConversionException ||
t.getCause() instanceof MethodArgumentNotValidException )
) {
return true;
}
return false;
}
}
application.yml (属性)
spring:
rabbitmq:
host: localhost
username: uu
password: pp
virtual-host: /
listener:
simple:
default-requeue-rejected: false
retry:
enabled: true
initial-interval: 2000
multiplier: 1.5
max-interval: 10000
max-attempts: 3
这是因为您没有为容器工厂使用 Boot 的自动配置。所以忽略重试配置。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory listenerContainerFactory =
new SimpleRabbitListenerContainerFactory();
listenerContainerFactory.setConnectionFactory(connectionFactory());
listenerContainerFactory.setErrorHandler(new ConditionalRejectingErrorHandler(
new MyErrorPayload()));
listenerContainerFactory.setMessageConverter(jackson2JsonMessageConverter());
return listenerContainerFactory;
}
@Barath 在他的评论中引用的示例也是如此。
将配置器注入您的工厂方法并调用它;例如,对于那个样本...
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(errorHandler());
return factory;
}
如果只有一个消息转换器 Bean,配置器也会添加它。
我已经更新了示例。
编辑
选择性异常的自定义重试策略;以下禁用 ValidationException
的重试,但重试所有其他。 (同样,对于示例应用程序)...
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer, RabbitProperties properties) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(errorHandler());
ListenerRetry retryConfig = properties.getListener().getSimple().getRetry();
if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
? RetryInterceptorBuilder.stateless()
: RetryInterceptorBuilder.stateful());
RetryTemplate retryTemplate = new RetryTemplate();
Map<Class<? extends Throwable>, Boolean> retryableExceptions = Collections
.singletonMap(ValidationException.class, false);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(retryConfig.getMaxAttempts(),
retryableExceptions, true, true); // retry all exceptions except Validation
retryTemplate.setRetryPolicy(retryPolicy);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(retryConfig.getInitialInterval().toMillis());
backOffPolicy.setMaxInterval(retryConfig.getMaxInterval().toMillis());
backOffPolicy.setMultiplier(retryConfig.getMultiplier());
retryTemplate.setBackOffPolicy(backOffPolicy);
builder.retryOperations(retryTemplate);
builder.recoverer(new RejectAndDontRequeueRecoverer());
factory.setAdviceChain(builder.build());
}
return factory;
}
自从您 default-requeue-rejected: false
.