Spring AMQP rabbitmq 重试退避与恢复不使来自 DLQ 的消息出列

Spring AMQP rabbitmq retry backoff with recover do not dequeue messages from DLQ

我正在使用 spring-amqp consumerBatchEnabled 来批量处理消息。对于错误处理和重试,使用死信交换和spring-具有恢复功能的重试退避。

如果在处理主队列中的批次期间出现任何异常,我将拒绝整个批次,然后由死信队列侦听器单独处理消息。我已经为 DLQ 侦听器配置了 Advice 链以进行重试。想法是在丢弃消息之前在 DLQ 侦听器中进行固定次数的重试。

重试功能工作正常,恢复器也被调用,但消息在死信队列中仍处于未确认状态。根据我的理解,在 AmqpRejectAndDontRequeueException 的情况下,消息应该从队列中删除,但它似乎没有发生,我在这里遗漏了什么。 (如果我附加了 DLX,这也不起作用)

(使用 application.yml 中的默认侦听器容器工厂和重试属性,这按预期工作,但不适用于自定义侦听器容器工厂)

下面是测试代码:

配置Class

package com.example.demo;

import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableRabbit
@Configuration
public class RabbitMQConfig {


    @Bean
    DirectExchange deadLetterExchange() {
        return new DirectExchange("deadLetterExchange");
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange("person.exchange");
    }

    @Bean
    Queue dlq() {
        return QueueBuilder.durable("deadLetter.queue").build();
    }

    @Bean
    Queue queue() {
        return QueueBuilder.durable("person.queue").withArgument("x-dead-letter-exchange", "deadLetterExchange")
                .withArgument("x-dead-letter-routing-key", "deadLetter").build();
    }

    @Bean
    Binding DLQbinding() {
        return BindingBuilder.bind(dlq()).to(deadLetterExchange()).with("deadLetter");
    }

    @Bean
    Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("person.key");
    }

    @Bean
    @Autowired
    public ConnectionFactory connectionFactory() {
         CachingConnectionFactory factory = new CachingConnectionFactory();

        String rabbitmq_host = "localhost";
        String rabbitmq_port = "5672";
        String rabbitmq_user = "guest";
        String rabbitmq_pwd = "guest";

        factory.setHost(rabbitmq_host);
        factory.setPort(Integer.parseInt(rabbitmq_port));
         factory.setUsername(rabbitmq_user);
        factory.setPassword(rabbitmq_pwd);

        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }


    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(1000);
        factory.setBatchListener(true);
        factory.setBatchSize(2);
        factory.setConsumerBatchEnabled(true);
        factory.setReceiveTimeout(1000l);
        configurer.configure(factory, connectionFactory);
        return factory;
    }


    @Bean
    public SimpleRabbitListenerContainerFactory rabbitDlqListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(1);
        factory.setPrefetchCount(1000);
        factory.setDefaultRequeueRejected(true);
        factory.setAdviceChain(RetryInterceptorBuilder.stateless().backOffOptions(
                2000,1,10000).maxAttempts(3).
                recoverer(new RejectAndDontRequeueRecoverer()).build());
       configurer.configure(factory, connectionFactory);
        return factory;
    }
}

听众

package com.example.demo;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@Component
public class RabbitMQConsumer {

    List<String> list = new ArrayList<>();

    private int i = 0;

    @RabbitListener(id = "myRabbitListener", queues = "person.queue", containerFactory = "rabbitListenerContainerFactory")
    public void getMessage(List<Message<Person>> messages, Channel channel) throws InvalidPersonException, IOException {
        System.out.println(messages.size());
        long tag = (long)messages.get(messages.size()-1).getHeaders().get((AmqpHeaders.DELIVERY_TAG));
        channel.basicNack(tag, true, false);
        /* System.out.println(Thread.currentThread());
        if(message.getPayload().getName().equals("test")) {
            channel.basicNack(tag, false, false);
        }*/
    }
}
package com.example.demo;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.messaging.handler.annotation.Header;

import static java.util.concurrent.TimeUnit.SECONDS;

@Component
public class DlqConsumer {

    List<String> list = new ArrayList<>();

    private int i = 0;

    @RabbitListener(id = "myDlqListener", queues = "deadLetter.queue", containerFactory = "rabbitDlqListenerContainerFactory")
    public void getMessage(Message<Person> message, Channel channel) throws InvalidPersonException {
            System.out.println(message.getPayload().getName());
            long tag = (long)message.getHeaders().get((AmqpHeaders.DELIVERY_TAG));
            System.out.println(Thread.currentThread());
           throw new InvalidPersonException();
    }
}

异常Class:

package com.example.demo;

public class InvalidPersonException extends Exception{
    private static final long serialVersionUID = -3154618962130084535L;
}
package com.example.demo;

import java.io.Serializable;

public class Person implements Serializable {
    private Long Id;

    private String name;

    public Person(){

    }

    public Person(Long id, String name) {
        super();
        Id = id;
        this.name = name;
    }

    public Long getId() {
        return Id;
    }

    public void setId(Long id) {
        Id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

}

build.gradle

buildscript {
    ext {
        springBootVersion = '2.2.10.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.test'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    implementation 'junit:junit:4.12'
    compile('org.springframework.boot:spring-boot-starter-amqp:2.4.8')
    compile('org.springframework.boot:spring-boot-starter-web:2.2.10.RELEASE')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    compile "io.springfox:springfox-swagger2:2.7.0"
    compile "io.springfox:springfox-swagger-ui:2.7.0"
    testCompile "org.springframework.amqp:spring-rabbit-test:2.3.6"
}

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

当您进行手动确认时,您对所有Acks/Nacks负责。

使用默认模式 (AUTO)。