Spring AMQP - 死信实现和业务逻辑失败时的最大重试

Spring AMQP - dead letter implementation and max retry if business logic fails

使用 Spring AMQP - spring-amqp with Spring Boot

这里我尝试实现死信交换。我正在向队列发送消息,如果发生业务异常,则它应该将消息发送到 "dlq" 队列并在那里等待 5 秒,然后它应该再次进入队列进行处理..... 5 次尝试后它应该从容器中出来。

请查找配置

application.yml

spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    port: 5672
server:
port: 8081

MQ 配置

import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
 import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
 import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class MQConfig {

// public static final String OUTGOING_QUEUE = "outgoing.example";

  // public static final String INCOMING_QUEUE = "incoming.example";

  @Autowired
  private ConnectionFactory cachingConnectionFactory;

  // Setting the annotation listeners to use the jackson2JsonMessageConverter
  @Bean
  public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(cachingConnectionFactory);
    factory.setMessageConverter(jackson2JsonMessageConverter());
    return factory;
  }

  // Standardize on a single objectMapper for all message queue items
  @Bean
  public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
  }

  @Bean
  public Queue outgoingQueue() {
    Map<String, Object> args = new HashMap<String, Object>();
    // The default exchange
    args.put("x-dead-letter-exchange", "dlx");
    // Route to the incoming queue when the TTL occurs
    // args.put("x-dead-letter-routing-key", "q.with.dlx");
    // TTL 5 seconds
    args.put("x-message-ttl", 5000);
    return new Queue("q.with.dlx", false, false, false, args);
  }

  @Bean
  public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    return rabbitTemplate;
  }

  @Bean
  public Queue incomingQueue() {
    return new Queue("dlq");
  }

  @Bean
  public DirectExchange directExchange() {
      return new DirectExchange("dlx") ; 
  }

  @Bean
  public Binding binding(Queue incomingQueue, DirectExchange directExchange) {
      return BindingBuilder.bind(incomingQueue()).to(directExchange()).with("q.with.dlx");
  }

出版商

import java.util.Date;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.spring.amqp.domain.ExampleObject;

@Component
@RestController
@RequestMapping("/publish")
public class Publisher {

@Autowired
private RabbitTemplate rabbitTemplate;

// Scheduled task to send an object every 5 seconds
// @Scheduled(fixedDelay = 5000)
@GetMapping()
public void sender() {
    ExampleObject ex = new ExampleObject();
    ex.setDate(new Date());
    rabbitTemplate.convertAndSend("q.with.dlx",ex);
}
}

消费者

package com.spring.amqp.service;
import java.util.List;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import com.spring.amqp.domain.ExampleObject;

@Component
public class Consumer {

// Annotation to listen for an ExampleObject
@RabbitListener(queues = "q.with.dlx")
public void handleMessage(ExampleObject exampleObject,
        @Header(required = false, name = "x-death") List<String> xDeath) {
    System.out.println("Message" + ":" + (xDeath == null ? "" : xDeath));
    System.out.println("Received incoming object at " + exampleObject.getDate());


    // // String x_header_count = xDeath.get("count");
    // System.out.println(x_header_count);
    try{
    int a = 5 / 0;
    System.out.println(a);
    }
    catch(Exception ex) {
        throw new AmqpRejectAndDontRequeueException("amqp exception") ;
    }


}
}

**在 x-header 中我什么也没得到。 ** 现在,当我点击 localhost:8081/publish 时,它会向 q.with.dlx 发送一条消息,我抛出 AmqpRejectAndRequeue 异常,然后该消息在 "dlq" 命名队列中被触发。之后什么也没有发生。我有一个名为 ExampleObject 的域对象,我将其从发布者发送给消费者。

请交叉检查我所有的配置,如果可能的话,有人可以 运行 告诉我错误是什么吗?提前致谢。

Gary​​ Russell 感谢这个很棒的消息传递框架。

您在错误的队列上设置了 TTL。

您需要在您的 dlq 上配置生存时间死信:

  @Bean
  public Queue incomingQueue() {
      return new Queue("dlq");
  }

添加参数 x-message-ttl = 5000 和死信配置以将过期消息路由回原始队列。

您的队列 bean 名称有点混乱;你通常会有类似...

someExchange -> mainQueue (with dead-lettering to DLX)

DLX -> dlq (with TTL and dead-lettering to someExchange)