Spring AMQP rabbitmq 重试退避与恢复不使来自 DLQ 的消息出列
Spring AMQP rabbitmq retry backoff with recover do not dequeue messages from DLQ
我正在使用 spring-amqp consumerBatchEnabled 来批量处理消息。对于错误处理和重试,使用死信交换和spring-具有恢复功能的重试退避。
如果在处理主队列中的批次期间出现任何异常,我将拒绝整个批次,然后由死信队列侦听器单独处理消息。我已经为 DLQ 侦听器配置了 Advice 链以进行重试。想法是在丢弃消息之前在 DLQ 侦听器中进行固定次数的重试。
重试功能工作正常,恢复器也被调用,但消息在死信队列中仍处于未确认状态。根据我的理解,在 AmqpRejectAndDontRequeueException 的情况下,消息应该从队列中删除,但它似乎没有发生,我在这里遗漏了什么。 (如果我附加了 DLX,这也不起作用)
(使用 application.yml 中的默认侦听器容器工厂和重试属性,这按预期工作,但不适用于自定义侦听器容器工厂)
下面是测试代码:
配置Class
package com.example.demo;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@EnableRabbit
@Configuration
public class RabbitMQConfig {
@Bean
DirectExchange deadLetterExchange() {
return new DirectExchange("deadLetterExchange");
}
@Bean
DirectExchange exchange() {
return new DirectExchange("person.exchange");
}
@Bean
Queue dlq() {
return QueueBuilder.durable("deadLetter.queue").build();
}
@Bean
Queue queue() {
return QueueBuilder.durable("person.queue").withArgument("x-dead-letter-exchange", "deadLetterExchange")
.withArgument("x-dead-letter-routing-key", "deadLetter").build();
}
@Bean
Binding DLQbinding() {
return BindingBuilder.bind(dlq()).to(deadLetterExchange()).with("deadLetter");
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("person.key");
}
@Bean
@Autowired
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
String rabbitmq_host = "localhost";
String rabbitmq_port = "5672";
String rabbitmq_user = "guest";
String rabbitmq_pwd = "guest";
factory.setHost(rabbitmq_host);
factory.setPort(Integer.parseInt(rabbitmq_port));
factory.setUsername(rabbitmq_user);
factory.setPassword(rabbitmq_pwd);
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(1000);
factory.setBatchListener(true);
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
factory.setReceiveTimeout(1000l);
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitDlqListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConcurrentConsumers(1);
factory.setPrefetchCount(1000);
factory.setDefaultRequeueRejected(true);
factory.setAdviceChain(RetryInterceptorBuilder.stateless().backOffOptions(
2000,1,10000).maxAttempts(3).
recoverer(new RejectAndDontRequeueRecoverer()).build());
configurer.configure(factory, connectionFactory);
return factory;
}
}
听众
package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Component
public class RabbitMQConsumer {
List<String> list = new ArrayList<>();
private int i = 0;
@RabbitListener(id = "myRabbitListener", queues = "person.queue", containerFactory = "rabbitListenerContainerFactory")
public void getMessage(List<Message<Person>> messages, Channel channel) throws InvalidPersonException, IOException {
System.out.println(messages.size());
long tag = (long)messages.get(messages.size()-1).getHeaders().get((AmqpHeaders.DELIVERY_TAG));
channel.basicNack(tag, true, false);
/* System.out.println(Thread.currentThread());
if(message.getPayload().getName().equals("test")) {
channel.basicNack(tag, false, false);
}*/
}
}
package com.example.demo;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.messaging.handler.annotation.Header;
import static java.util.concurrent.TimeUnit.SECONDS;
@Component
public class DlqConsumer {
List<String> list = new ArrayList<>();
private int i = 0;
@RabbitListener(id = "myDlqListener", queues = "deadLetter.queue", containerFactory = "rabbitDlqListenerContainerFactory")
public void getMessage(Message<Person> message, Channel channel) throws InvalidPersonException {
System.out.println(message.getPayload().getName());
long tag = (long)message.getHeaders().get((AmqpHeaders.DELIVERY_TAG));
System.out.println(Thread.currentThread());
throw new InvalidPersonException();
}
}
异常Class:
package com.example.demo;
public class InvalidPersonException extends Exception{
private static final long serialVersionUID = -3154618962130084535L;
}
package com.example.demo;
import java.io.Serializable;
public class Person implements Serializable {
private Long Id;
private String name;
public Person(){
}
public Person(Long id, String name) {
super();
Id = id;
this.name = name;
}
public Long getId() {
return Id;
}
public void setId(Long id) {
Id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
build.gradle
buildscript {
ext {
springBootVersion = '2.2.10.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.test'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
implementation 'junit:junit:4.12'
compile('org.springframework.boot:spring-boot-starter-amqp:2.4.8')
compile('org.springframework.boot:spring-boot-starter-web:2.2.10.RELEASE')
testCompile('org.springframework.boot:spring-boot-starter-test')
compile "io.springfox:springfox-swagger2:2.7.0"
compile "io.springfox:springfox-swagger-ui:2.7.0"
testCompile "org.springframework.amqp:spring-rabbit-test:2.3.6"
}
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
当您进行手动确认时,您对所有Acks/Nacks负责。
使用默认模式 (AUTO
)。
我正在使用 spring-amqp consumerBatchEnabled 来批量处理消息。对于错误处理和重试,使用死信交换和spring-具有恢复功能的重试退避。
如果在处理主队列中的批次期间出现任何异常,我将拒绝整个批次,然后由死信队列侦听器单独处理消息。我已经为 DLQ 侦听器配置了 Advice 链以进行重试。想法是在丢弃消息之前在 DLQ 侦听器中进行固定次数的重试。
重试功能工作正常,恢复器也被调用,但消息在死信队列中仍处于未确认状态。根据我的理解,在 AmqpRejectAndDontRequeueException 的情况下,消息应该从队列中删除,但它似乎没有发生,我在这里遗漏了什么。 (如果我附加了 DLX,这也不起作用)
(使用 application.yml 中的默认侦听器容器工厂和重试属性,这按预期工作,但不适用于自定义侦听器容器工厂)
下面是测试代码:
配置Class
package com.example.demo;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@EnableRabbit
@Configuration
public class RabbitMQConfig {
@Bean
DirectExchange deadLetterExchange() {
return new DirectExchange("deadLetterExchange");
}
@Bean
DirectExchange exchange() {
return new DirectExchange("person.exchange");
}
@Bean
Queue dlq() {
return QueueBuilder.durable("deadLetter.queue").build();
}
@Bean
Queue queue() {
return QueueBuilder.durable("person.queue").withArgument("x-dead-letter-exchange", "deadLetterExchange")
.withArgument("x-dead-letter-routing-key", "deadLetter").build();
}
@Bean
Binding DLQbinding() {
return BindingBuilder.bind(dlq()).to(deadLetterExchange()).with("deadLetter");
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("person.key");
}
@Bean
@Autowired
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
String rabbitmq_host = "localhost";
String rabbitmq_port = "5672";
String rabbitmq_user = "guest";
String rabbitmq_pwd = "guest";
factory.setHost(rabbitmq_host);
factory.setPort(Integer.parseInt(rabbitmq_port));
factory.setUsername(rabbitmq_user);
factory.setPassword(rabbitmq_pwd);
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(1000);
factory.setBatchListener(true);
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
factory.setReceiveTimeout(1000l);
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitDlqListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConcurrentConsumers(1);
factory.setPrefetchCount(1000);
factory.setDefaultRequeueRejected(true);
factory.setAdviceChain(RetryInterceptorBuilder.stateless().backOffOptions(
2000,1,10000).maxAttempts(3).
recoverer(new RejectAndDontRequeueRecoverer()).build());
configurer.configure(factory, connectionFactory);
return factory;
}
}
听众
package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Component
public class RabbitMQConsumer {
List<String> list = new ArrayList<>();
private int i = 0;
@RabbitListener(id = "myRabbitListener", queues = "person.queue", containerFactory = "rabbitListenerContainerFactory")
public void getMessage(List<Message<Person>> messages, Channel channel) throws InvalidPersonException, IOException {
System.out.println(messages.size());
long tag = (long)messages.get(messages.size()-1).getHeaders().get((AmqpHeaders.DELIVERY_TAG));
channel.basicNack(tag, true, false);
/* System.out.println(Thread.currentThread());
if(message.getPayload().getName().equals("test")) {
channel.basicNack(tag, false, false);
}*/
}
}
package com.example.demo;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.messaging.handler.annotation.Header;
import static java.util.concurrent.TimeUnit.SECONDS;
@Component
public class DlqConsumer {
List<String> list = new ArrayList<>();
private int i = 0;
@RabbitListener(id = "myDlqListener", queues = "deadLetter.queue", containerFactory = "rabbitDlqListenerContainerFactory")
public void getMessage(Message<Person> message, Channel channel) throws InvalidPersonException {
System.out.println(message.getPayload().getName());
long tag = (long)message.getHeaders().get((AmqpHeaders.DELIVERY_TAG));
System.out.println(Thread.currentThread());
throw new InvalidPersonException();
}
}
异常Class:
package com.example.demo;
public class InvalidPersonException extends Exception{
private static final long serialVersionUID = -3154618962130084535L;
}
package com.example.demo;
import java.io.Serializable;
public class Person implements Serializable {
private Long Id;
private String name;
public Person(){
}
public Person(Long id, String name) {
super();
Id = id;
this.name = name;
}
public Long getId() {
return Id;
}
public void setId(Long id) {
Id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
build.gradle
buildscript {
ext {
springBootVersion = '2.2.10.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.test'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
implementation 'junit:junit:4.12'
compile('org.springframework.boot:spring-boot-starter-amqp:2.4.8')
compile('org.springframework.boot:spring-boot-starter-web:2.2.10.RELEASE')
testCompile('org.springframework.boot:spring-boot-starter-test')
compile "io.springfox:springfox-swagger2:2.7.0"
compile "io.springfox:springfox-swagger-ui:2.7.0"
testCompile "org.springframework.amqp:spring-rabbit-test:2.3.6"
}
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
当您进行手动确认时,您对所有Acks/Nacks负责。
使用默认模式 (AUTO
)。