如何将错误消息移动到rabbitmq死信队列
How to move error message to rabbitmq dead letter queue
我读了很多 documentation/Whosebug 但在将消息移动到死信队列时发生异常时我仍然遇到问题。我正在使用 spring-boot 这是我的配置:
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
RetryOperationsInterceptor interceptor() {
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate, "error_exchange ", "error_key");
return RetryInterceptorBuilder
.stateless()
.recoverer(recoverer)
.build();
}
死信队列:
Features
x-dead-letter-routing-key: error_key
x-dead-letter-exchange: error_exchange
durable: true
Policy DLX
队列名称:错误
我的交流:
name:error_exchange
绑定:到:错误,routing_key:error_key
这是我的消费者:
@RabbitListener(queues = "${rss_reader_chat_queue}")
public void consumeMessage(Message message) {
try {
List<ChatMessage> chatMessages = messageTransformer.transformMessage(message);
List<ChatMessage> save = chatMessageRepository.save(chatMessages);
sendMessagesToChat(save);
}
catch(Exception ex) {
throw new AmqpRejectAndDontRequeueException(ex);
}
}
因此,当我发送一条无效消息并发生某些异常时,它会发生一次(这很好,因为之前的消息被一遍又一遍地发送)但该消息不会进入我的死信队列。你能帮我解决这个问题吗?
您需要显示您的其余配置 - 启动属性、队列 @Bean
s 等。您似乎也对使用重新发布恢复器与死信队列之间存在一些混淆;它们是实现相似结果的不同方法。您通常不会同时使用两者。
这是一个简单的启动应用程序,演示了如何使用 DLX/DLQ...
@SpringBootApplication
public class So43694619Application implements CommandLineRunner {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(So43694619Application.class, args);
context.close();
}
@Autowired
RabbitTemplate template;
@Autowired
AmqpAdmin admin;
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void run(String... arg0) throws Exception {
this.template.convertAndSend("so43694619main", "foo");
this.latch.await(10, TimeUnit.SECONDS);
this.admin.deleteExchange("so43694619dlx");
this.admin.deleteQueue("so43694619main");
this.admin.deleteQueue("so43694619dlx");
}
@Bean
public Queue main() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "so43694619dlx");
args.put("x-dead-letter-routing-key", "so43694619dlxRK");
return new Queue("so43694619main", true, false, false, args);
}
@Bean
public Queue dlq() {
return new Queue("so43694619dlq");
}
@Bean
public DirectExchange dlx() {
return new DirectExchange("so43694619dlx");
}
@Bean
public Binding dlqBinding() {
return BindingBuilder.bind(dlq()).to(dlx()).with("so43694619dlxRK");
}
@RabbitListener(queues = "so43694619main")
public void listenMain(String in) {
throw new AmqpRejectAndDontRequeueException("failed");
}
@RabbitListener(queues = "so43694619dlq")
public void listenDlq(String in) {
System.out.println("ReceivedFromDLQ: " + in);
this.latch.countDown();
}
}
结果:
ReceivedFromDLQ: foo
我读了很多 documentation/Whosebug 但在将消息移动到死信队列时发生异常时我仍然遇到问题。我正在使用 spring-boot 这是我的配置:
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
RetryOperationsInterceptor interceptor() {
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate, "error_exchange ", "error_key");
return RetryInterceptorBuilder
.stateless()
.recoverer(recoverer)
.build();
}
死信队列:
Features
x-dead-letter-routing-key: error_key
x-dead-letter-exchange: error_exchange
durable: true
Policy DLX
队列名称:错误
我的交流: name:error_exchange 绑定:到:错误,routing_key:error_key
这是我的消费者:
@RabbitListener(queues = "${rss_reader_chat_queue}")
public void consumeMessage(Message message) {
try {
List<ChatMessage> chatMessages = messageTransformer.transformMessage(message);
List<ChatMessage> save = chatMessageRepository.save(chatMessages);
sendMessagesToChat(save);
}
catch(Exception ex) {
throw new AmqpRejectAndDontRequeueException(ex);
}
}
因此,当我发送一条无效消息并发生某些异常时,它会发生一次(这很好,因为之前的消息被一遍又一遍地发送)但该消息不会进入我的死信队列。你能帮我解决这个问题吗?
您需要显示您的其余配置 - 启动属性、队列 @Bean
s 等。您似乎也对使用重新发布恢复器与死信队列之间存在一些混淆;它们是实现相似结果的不同方法。您通常不会同时使用两者。
这是一个简单的启动应用程序,演示了如何使用 DLX/DLQ...
@SpringBootApplication
public class So43694619Application implements CommandLineRunner {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(So43694619Application.class, args);
context.close();
}
@Autowired
RabbitTemplate template;
@Autowired
AmqpAdmin admin;
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void run(String... arg0) throws Exception {
this.template.convertAndSend("so43694619main", "foo");
this.latch.await(10, TimeUnit.SECONDS);
this.admin.deleteExchange("so43694619dlx");
this.admin.deleteQueue("so43694619main");
this.admin.deleteQueue("so43694619dlx");
}
@Bean
public Queue main() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "so43694619dlx");
args.put("x-dead-letter-routing-key", "so43694619dlxRK");
return new Queue("so43694619main", true, false, false, args);
}
@Bean
public Queue dlq() {
return new Queue("so43694619dlq");
}
@Bean
public DirectExchange dlx() {
return new DirectExchange("so43694619dlx");
}
@Bean
public Binding dlqBinding() {
return BindingBuilder.bind(dlq()).to(dlx()).with("so43694619dlxRK");
}
@RabbitListener(queues = "so43694619main")
public void listenMain(String in) {
throw new AmqpRejectAndDontRequeueException("failed");
}
@RabbitListener(queues = "so43694619dlq")
public void listenDlq(String in) {
System.out.println("ReceivedFromDLQ: " + in);
this.latch.countDown();
}
}
结果:
ReceivedFromDLQ: foo