Spring AMQP - 发布者在发布到不存在的队列时确认 returns 确认
Spring AMQP - Publisher confirms returns ack-ed when publish to non-existent queue
如果我在 RabbitMQ 上使用发布者确认和 returns 回调,我永远不会收到 returned 消息。
来自 Spring AMQP 文档:
- Publish to an exchange but there is no matching destination queue.
- Publish to a non-existent exchange.
The first case is covered by publisher returns, as described in Publisher Confirms and Returns.
所以我认为如果我发布到存在的交换但不存在的队列,我会收到 returned 消息。但是 return 回调从未调用过。
我需要设置其他东西吗?
我正在使用 RabbitMQ 3.8.0 和 Spring Boot 2.2.1
application.yml
spring:
rabbitmq:
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
制作人
@Service
public class PublisherConfirmProducer {
private static final Logger log = LoggerFactory.getLogger(PublisherConfirmProducer.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void postConstruct() {
this.rabbitTemplate.setConfirmCallback((correlation, ack, reason) -> {
if (correlation != null) {
log.info("Received " + (ack ? " ack " : " nack ") + "for correlation: " + correlation);
}
});
this.rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("Returned: " + message + "\nreplyCode: " + replyCode + "\nreplyText: " + replyText
+ "\nexchange/rk: " + exchange + "/" + routingKey);
});
}
// Careful : will be silently dropped, since the exchange is exists, but no
// route to queue, but ack-ed. How to know that I publish to non-existing queue?
public void sendMessage_ValidExchange_InvalidQueue(DummyMessage message) {
CorrelationData correlationData = new CorrelationData("Correlation for message " + message.getContent());
this.rabbitTemplate.convertAndSend("x.test", "not-valid-routing-key", message, correlationData);
}
}
主应用程序
@SpringBootApplication
public class RabbitmqProducerTwoApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(RabbitmqProducerTwoApplication.class, args);
}
@Autowired
private PublisherConfirmProducer producer;
@Override
public void run(String... args) throws Exception {
var dummyMessage_2 = new DummyMessage("Message 2", 2);
producer.sendMessage_ValidExchange_InvalidQueue(dummyMessage_2);
}
}
记录结果
2019-11-29 04:45:23.796 INFO 8352 --- [ main] c.c.r.RabbitmqProducerTwoApplication : Starting RabbitmqProducerTwoApplication on timpamungkas with PID 8352 (D:\workspace\eclipse\my-courses\rabbitmq-1.2\rabbitmq-producer-two\bin\main started by USER in D:\workspace\eclipse\my-courses\rabbitmq-1.2\rabbitmq-producer-two)
2019-11-29 04:45:23.800 INFO 8352 --- [ main] c.c.r.RabbitmqProducerTwoApplication : No active profile set, falling back to default profiles: default
2019-11-29 04:45:24.952 INFO 8352 --- [ main] c.c.r.RabbitmqProducerTwoApplication : Started RabbitmqProducerTwoApplication in 1.696 seconds (JVM running for 3.539)
2019-11-29 04:45:24.990 INFO 8352 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-11-29 04:45:25.024 INFO 8352 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#599f571f:0/SimpleConnection@86733 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 50688]
2019-11-29 04:45:25.058 INFO 8352 --- [nectionFactory1] c.c.r.producer.PublisherConfirmProducer : Received ack for correlation: CorrelationData [id=Correlation for message Message 2]
为加里编辑
RabbitMqConfig.java
@Configuration
public class RabbitmqConfig {
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(converter);
return template;
}
}
您将在回复消息后收到肯定的确认。
在我看来,你所看到的是正确的。它与 this sample 非常相似,因此应该以相同的方式工作。
x.test
是什么类型的交换?它绑定了哪些队列,以及哪些路由键?
如果您在查看该示例后仍然无法正常工作,post 某个地方的项目,我会看一看。
实现看起来是正确的,只需实现您自己的私有回调 class 并扩展 ConfirmCallback 以注册回调,在初始化 RabbitTemplate 时设置 ConfirmCallback。
private static class PublisherCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("Confirm message returned " + correlationData.toString() + " Ack " + ack + " cause " + cause);
}
}
rabbitTemplate.setConfirmCallback(new PublisherCallback());
我使用 Springboot 2.5.2 和 RabbitMQ 3.8.18,当我使用无效路由键将队列绑定到主题交换时,我的 ReturnsCallback
被调用
@Bean(name = "binding")
Binding binding(@Qualifier("queue") Queue queue, @Qualifier("exchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("aaa"+ Constants.VALID_ROUTING_KEY);
}
//configuration
rabbitTemplate.setConfirmCallback(confirmCallbackService);
rabbitTemplate.setReturnsCallback(returnCallbackService);
rabbitTemplate.setMandatory(true); //seems this config is mandatory
//logs
2021-07-08 15:08:05,078 ERROR [connectionFactory1] com..publisher.config.ReturnCallbackService: returnedMessage =>
如果我在 RabbitMQ 上使用发布者确认和 returns 回调,我永远不会收到 returned 消息。 来自 Spring AMQP 文档:
- Publish to an exchange but there is no matching destination queue.
- Publish to a non-existent exchange.
The first case is covered by publisher returns, as described in Publisher Confirms and Returns.
所以我认为如果我发布到存在的交换但不存在的队列,我会收到 returned 消息。但是 return 回调从未调用过。
我需要设置其他东西吗?
我正在使用 RabbitMQ 3.8.0 和 Spring Boot 2.2.1
application.yml
spring:
rabbitmq:
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
制作人
@Service
public class PublisherConfirmProducer {
private static final Logger log = LoggerFactory.getLogger(PublisherConfirmProducer.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void postConstruct() {
this.rabbitTemplate.setConfirmCallback((correlation, ack, reason) -> {
if (correlation != null) {
log.info("Received " + (ack ? " ack " : " nack ") + "for correlation: " + correlation);
}
});
this.rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("Returned: " + message + "\nreplyCode: " + replyCode + "\nreplyText: " + replyText
+ "\nexchange/rk: " + exchange + "/" + routingKey);
});
}
// Careful : will be silently dropped, since the exchange is exists, but no
// route to queue, but ack-ed. How to know that I publish to non-existing queue?
public void sendMessage_ValidExchange_InvalidQueue(DummyMessage message) {
CorrelationData correlationData = new CorrelationData("Correlation for message " + message.getContent());
this.rabbitTemplate.convertAndSend("x.test", "not-valid-routing-key", message, correlationData);
}
}
主应用程序
@SpringBootApplication
public class RabbitmqProducerTwoApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(RabbitmqProducerTwoApplication.class, args);
}
@Autowired
private PublisherConfirmProducer producer;
@Override
public void run(String... args) throws Exception {
var dummyMessage_2 = new DummyMessage("Message 2", 2);
producer.sendMessage_ValidExchange_InvalidQueue(dummyMessage_2);
}
}
记录结果
2019-11-29 04:45:23.796 INFO 8352 --- [ main] c.c.r.RabbitmqProducerTwoApplication : Starting RabbitmqProducerTwoApplication on timpamungkas with PID 8352 (D:\workspace\eclipse\my-courses\rabbitmq-1.2\rabbitmq-producer-two\bin\main started by USER in D:\workspace\eclipse\my-courses\rabbitmq-1.2\rabbitmq-producer-two)
2019-11-29 04:45:23.800 INFO 8352 --- [ main] c.c.r.RabbitmqProducerTwoApplication : No active profile set, falling back to default profiles: default
2019-11-29 04:45:24.952 INFO 8352 --- [ main] c.c.r.RabbitmqProducerTwoApplication : Started RabbitmqProducerTwoApplication in 1.696 seconds (JVM running for 3.539)
2019-11-29 04:45:24.990 INFO 8352 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-11-29 04:45:25.024 INFO 8352 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#599f571f:0/SimpleConnection@86733 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 50688]
2019-11-29 04:45:25.058 INFO 8352 --- [nectionFactory1] c.c.r.producer.PublisherConfirmProducer : Received ack for correlation: CorrelationData [id=Correlation for message Message 2]
为加里编辑
RabbitMqConfig.java
@Configuration
public class RabbitmqConfig {
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(converter);
return template;
}
}
您将在回复消息后收到肯定的确认。
在我看来,你所看到的是正确的。它与 this sample 非常相似,因此应该以相同的方式工作。
x.test
是什么类型的交换?它绑定了哪些队列,以及哪些路由键?
如果您在查看该示例后仍然无法正常工作,post 某个地方的项目,我会看一看。
实现看起来是正确的,只需实现您自己的私有回调 class 并扩展 ConfirmCallback 以注册回调,在初始化 RabbitTemplate 时设置 ConfirmCallback。
private static class PublisherCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("Confirm message returned " + correlationData.toString() + " Ack " + ack + " cause " + cause);
}
}
rabbitTemplate.setConfirmCallback(new PublisherCallback());
我使用 Springboot 2.5.2 和 RabbitMQ 3.8.18,当我使用无效路由键将队列绑定到主题交换时,我的 ReturnsCallback
被调用
@Bean(name = "binding")
Binding binding(@Qualifier("queue") Queue queue, @Qualifier("exchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("aaa"+ Constants.VALID_ROUTING_KEY);
}
//configuration
rabbitTemplate.setConfirmCallback(confirmCallbackService);
rabbitTemplate.setReturnsCallback(returnCallbackService);
rabbitTemplate.setMandatory(true); //seems this config is mandatory
//logs
2021-07-08 15:08:05,078 ERROR [connectionFactory1] com..publisher.config.ReturnCallbackService: returnedMessage =>