Spring AMQP - 发布者在发布到不存在的队列时确认 returns 确认

Spring AMQP - Publisher confirms returns ack-ed when publish to non-existent queue

如果我在 RabbitMQ 上使用发布者确认和 returns 回调,我永远不会收到 returned 消息。 来自 Spring AMQP 文档:

- Publish to an exchange but there is no matching destination queue.
- Publish to a non-existent exchange.
The first case is covered by publisher returns, as described in Publisher Confirms and Returns.

所以我认为如果我发布到存在的交换但不存在的队列,我会收到 returned 消息。但是 return 回调从未调用过。 我需要设置其他东西吗?
我正在使用 RabbitMQ 3.8.0 和 Spring Boot 2.2.1

application.yml

spring:
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true
    template:
      mandatory: true

制作人

@Service
public class PublisherConfirmProducer {

    private static final Logger log = LoggerFactory.getLogger(PublisherConfirmProducer.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void postConstruct() {
        this.rabbitTemplate.setConfirmCallback((correlation, ack, reason) -> {
            if (correlation != null) {
                log.info("Received " + (ack ? " ack " : " nack ") + "for correlation: " + correlation);
            }
        });

        this.rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("Returned: " + message + "\nreplyCode: " + replyCode + "\nreplyText: " + replyText
                    + "\nexchange/rk: " + exchange + "/" + routingKey);
        });
    }

    // Careful : will be silently dropped, since the exchange is exists, but no
    // route to queue, but ack-ed. How to know that I publish to non-existing queue?
    public void sendMessage_ValidExchange_InvalidQueue(DummyMessage message) {
        CorrelationData correlationData = new CorrelationData("Correlation for message " + message.getContent());
        this.rabbitTemplate.convertAndSend("x.test", "not-valid-routing-key", message, correlationData);
    }

}

主应用程序

@SpringBootApplication
public class RabbitmqProducerTwoApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqProducerTwoApplication.class, args);
    }

    @Autowired
    private PublisherConfirmProducer producer;

    @Override
    public void run(String... args) throws Exception {
        var dummyMessage_2 = new DummyMessage("Message 2", 2);
        producer.sendMessage_ValidExchange_InvalidQueue(dummyMessage_2);
    }

}

记录结果

2019-11-29 04:45:23.796  INFO 8352 --- [           main] c.c.r.RabbitmqProducerTwoApplication     : Starting RabbitmqProducerTwoApplication on timpamungkas with PID 8352 (D:\workspace\eclipse\my-courses\rabbitmq-1.2\rabbitmq-producer-two\bin\main started by USER in D:\workspace\eclipse\my-courses\rabbitmq-1.2\rabbitmq-producer-two)
2019-11-29 04:45:23.800  INFO 8352 --- [           main] c.c.r.RabbitmqProducerTwoApplication     : No active profile set, falling back to default profiles: default
2019-11-29 04:45:24.952  INFO 8352 --- [           main] c.c.r.RabbitmqProducerTwoApplication     : Started RabbitmqProducerTwoApplication in 1.696 seconds (JVM running for 3.539)
2019-11-29 04:45:24.990  INFO 8352 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2019-11-29 04:45:25.024  INFO 8352 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#599f571f:0/SimpleConnection@86733 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 50688]
2019-11-29 04:45:25.058  INFO 8352 --- [nectionFactory1] c.c.r.producer.PublisherConfirmProducer  : Received  ack for correlation: CorrelationData [id=Correlation for message Message 2]

为加里编辑

RabbitMqConfig.java

@Configuration
public class RabbitmqConfig {

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(converter);
        return template;
    }

}

您将在回复消息后收到肯定的确认。

在我看来,你所看到的是正确的。它与 this sample 非常相似,因此应该以相同的方式工作。

x.test 是什么类型的交换?它绑定了哪些队列,以及哪些路由键?

如果您在查看该示例后仍然无法正常工作,post 某个地方的项目,我会看一看。

实现看起来是正确的,只需实现您自己的私有回调 class 并扩展 ConfirmCallback 以注册回调,在初始化 RabbitTemplate 时设置 ConfirmCallback。

 private static class PublisherCallback implements RabbitTemplate.ConfirmCallback {
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("Confirm message returned " + correlationData.toString() + " Ack " + ack + " cause " + cause);
    }

}

rabbitTemplate.setConfirmCallback(new PublisherCallback());

我使用 Springboot 2.5.2 和 RabbitMQ 3.8.18,当我使用无效路由键将队列绑定到主题交换时,我的 ReturnsCallback 被调用

@Bean(name = "binding")

Binding binding(@Qualifier("queue") Queue queue, @Qualifier("exchange") TopicExchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with("aaa"+ Constants.VALID_ROUTING_KEY);
}

//configuration
rabbitTemplate.setConfirmCallback(confirmCallbackService);
rabbitTemplate.setReturnsCallback(returnCallbackService);
rabbitTemplate.setMandatory(true); //seems this config is mandatory


//logs
2021-07-08 15:08:05,078 ERROR [connectionFactory1] com..publisher.config.ReturnCallbackService: returnedMessage =>

Rabbitmq-Batch-Rabbitmq-Publish-Subscribe