Spring AMQP - 死信实现和业务逻辑失败时的最大重试
Spring AMQP - dead letter implementation and max retry if business logic fails
使用 Spring AMQP - spring-amqp with Spring Boot
这里我尝试实现死信交换。我正在向队列发送消息,如果发生业务异常,则它应该将消息发送到 "dlq" 队列并在那里等待 5 秒,然后它应该再次进入队列进行处理..... 5 次尝试后它应该从容器中出来。
请查找配置
application.yml
spring:
rabbitmq:
host: localhost
username: guest
password: guest
port: 5672
server:
port: 8081
MQ 配置
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
// public static final String OUTGOING_QUEUE = "outgoing.example";
// public static final String INCOMING_QUEUE = "incoming.example";
@Autowired
private ConnectionFactory cachingConnectionFactory;
// Setting the annotation listeners to use the jackson2JsonMessageConverter
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter());
return factory;
}
// Standardize on a single objectMapper for all message queue items
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Queue outgoingQueue() {
Map<String, Object> args = new HashMap<String, Object>();
// The default exchange
args.put("x-dead-letter-exchange", "dlx");
// Route to the incoming queue when the TTL occurs
// args.put("x-dead-letter-routing-key", "q.with.dlx");
// TTL 5 seconds
args.put("x-message-ttl", 5000);
return new Queue("q.with.dlx", false, false, false, args);
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public Queue incomingQueue() {
return new Queue("dlq");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("dlx") ;
}
@Bean
public Binding binding(Queue incomingQueue, DirectExchange directExchange) {
return BindingBuilder.bind(incomingQueue()).to(directExchange()).with("q.with.dlx");
}
出版商
import java.util.Date;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.spring.amqp.domain.ExampleObject;
@Component
@RestController
@RequestMapping("/publish")
public class Publisher {
@Autowired
private RabbitTemplate rabbitTemplate;
// Scheduled task to send an object every 5 seconds
// @Scheduled(fixedDelay = 5000)
@GetMapping()
public void sender() {
ExampleObject ex = new ExampleObject();
ex.setDate(new Date());
rabbitTemplate.convertAndSend("q.with.dlx",ex);
}
}
消费者
package com.spring.amqp.service;
import java.util.List;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import com.spring.amqp.domain.ExampleObject;
@Component
public class Consumer {
// Annotation to listen for an ExampleObject
@RabbitListener(queues = "q.with.dlx")
public void handleMessage(ExampleObject exampleObject,
@Header(required = false, name = "x-death") List<String> xDeath) {
System.out.println("Message" + ":" + (xDeath == null ? "" : xDeath));
System.out.println("Received incoming object at " + exampleObject.getDate());
// // String x_header_count = xDeath.get("count");
// System.out.println(x_header_count);
try{
int a = 5 / 0;
System.out.println(a);
}
catch(Exception ex) {
throw new AmqpRejectAndDontRequeueException("amqp exception") ;
}
}
}
**在 x-header 中我什么也没得到。 **
现在,当我点击 localhost:8081/publish 时,它会向 q.with.dlx 发送一条消息,我抛出 AmqpRejectAndRequeue 异常,然后该消息在 "dlq" 命名队列中被触发。之后什么也没有发生。我有一个名为 ExampleObject 的域对象,我将其从发布者发送给消费者。
请交叉检查我所有的配置,如果可能的话,有人可以 运行 告诉我错误是什么吗?提前致谢。
Gary Russell 感谢这个很棒的消息传递框架。
您在错误的队列上设置了 TTL。
您需要在您的 dlq 上配置生存时间死信:
@Bean
public Queue incomingQueue() {
return new Queue("dlq");
}
添加参数 x-message-ttl = 5000
和死信配置以将过期消息路由回原始队列。
您的队列 bean 名称有点混乱;你通常会有类似...
someExchange -> mainQueue (with dead-lettering to DLX)
DLX -> dlq (with TTL and dead-lettering to someExchange)
使用 Spring AMQP - spring-amqp with Spring Boot
这里我尝试实现死信交换。我正在向队列发送消息,如果发生业务异常,则它应该将消息发送到 "dlq" 队列并在那里等待 5 秒,然后它应该再次进入队列进行处理..... 5 次尝试后它应该从容器中出来。
请查找配置
application.yml
spring:
rabbitmq:
host: localhost
username: guest
password: guest
port: 5672
server:
port: 8081
MQ 配置
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
// public static final String OUTGOING_QUEUE = "outgoing.example";
// public static final String INCOMING_QUEUE = "incoming.example";
@Autowired
private ConnectionFactory cachingConnectionFactory;
// Setting the annotation listeners to use the jackson2JsonMessageConverter
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter());
return factory;
}
// Standardize on a single objectMapper for all message queue items
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Queue outgoingQueue() {
Map<String, Object> args = new HashMap<String, Object>();
// The default exchange
args.put("x-dead-letter-exchange", "dlx");
// Route to the incoming queue when the TTL occurs
// args.put("x-dead-letter-routing-key", "q.with.dlx");
// TTL 5 seconds
args.put("x-message-ttl", 5000);
return new Queue("q.with.dlx", false, false, false, args);
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public Queue incomingQueue() {
return new Queue("dlq");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("dlx") ;
}
@Bean
public Binding binding(Queue incomingQueue, DirectExchange directExchange) {
return BindingBuilder.bind(incomingQueue()).to(directExchange()).with("q.with.dlx");
}
出版商
import java.util.Date;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.spring.amqp.domain.ExampleObject;
@Component
@RestController
@RequestMapping("/publish")
public class Publisher {
@Autowired
private RabbitTemplate rabbitTemplate;
// Scheduled task to send an object every 5 seconds
// @Scheduled(fixedDelay = 5000)
@GetMapping()
public void sender() {
ExampleObject ex = new ExampleObject();
ex.setDate(new Date());
rabbitTemplate.convertAndSend("q.with.dlx",ex);
}
}
消费者
package com.spring.amqp.service;
import java.util.List;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import com.spring.amqp.domain.ExampleObject;
@Component
public class Consumer {
// Annotation to listen for an ExampleObject
@RabbitListener(queues = "q.with.dlx")
public void handleMessage(ExampleObject exampleObject,
@Header(required = false, name = "x-death") List<String> xDeath) {
System.out.println("Message" + ":" + (xDeath == null ? "" : xDeath));
System.out.println("Received incoming object at " + exampleObject.getDate());
// // String x_header_count = xDeath.get("count");
// System.out.println(x_header_count);
try{
int a = 5 / 0;
System.out.println(a);
}
catch(Exception ex) {
throw new AmqpRejectAndDontRequeueException("amqp exception") ;
}
}
}
**在 x-header 中我什么也没得到。 ** 现在,当我点击 localhost:8081/publish 时,它会向 q.with.dlx 发送一条消息,我抛出 AmqpRejectAndRequeue 异常,然后该消息在 "dlq" 命名队列中被触发。之后什么也没有发生。我有一个名为 ExampleObject 的域对象,我将其从发布者发送给消费者。
请交叉检查我所有的配置,如果可能的话,有人可以 运行 告诉我错误是什么吗?提前致谢。
Gary Russell 感谢这个很棒的消息传递框架。
您在错误的队列上设置了 TTL。
您需要在您的 dlq 上配置生存时间死信:
@Bean
public Queue incomingQueue() {
return new Queue("dlq");
}
添加参数 x-message-ttl = 5000
和死信配置以将过期消息路由回原始队列。
您的队列 bean 名称有点混乱;你通常会有类似...
someExchange -> mainQueue (with dead-lettering to DLX)
DLX -> dlq (with TTL and dead-lettering to someExchange)