RabbitMQ 确保消息到达队列

RabbitMQ be sure message reaches a queue

我想确定邮件正在到达队列。 否则,我要例外。

我试过发布者 returns,但这不是我需要的,因为它在不同的线程上,我认为在发送消息的线程上以某种方式等待它会很棘手。

没有交易通道,convertAndSend方法在没有交换的情况下成功返回,现在交易通道抛出异常。

在没有基于路由键的路由时,我需要的也是一样

@SpringBootApplication
public class DemoApplication {

    private static final Logger log = Logger.getGlobal();

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info(replyCode + "," + replyText));
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

    @Bean
    CommandLineRunner commandLineRunner(RabbitTemplate rabbitTemplate) {
        return args -> {
            rabbitTemplate.convertAndSend("exchangeName", "routingKey", "message");
            log.info("Send is done.");
        };
    }
}

仅 属性: spring.rabbitmq.publisher-returns=true

Spring开机版本:2.1.7.RELEASE

实际:

no exchange -> convertAndSend throws exception

no route at exchange -> method returns

预期

no exchange -> convertAndSend throws exception

no route at exchange -> convertAndSend throws exception

您需要使用发布者确认和关联数据:

spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.template.mandatory=true
@SpringBootApplication
public class So57464212Application {

    public static void main(String[] args) {
        SpringApplication.run(So57464212Application.class, args);
    }


    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.err.println("Returned: " + replyText);
        });
        template.setConfirmCallback((correlationData, ack, cause) -> {
            System.err.println("ack:" + ack);
        });
        return args -> {
            CorrelationData correlationData = new CorrelationData("foo");
            template.convertAndSend("", "NOQUEUE", "bar", correlationData);
            correlationData.getFuture().get(10, TimeUnit.SECONDS);
            if (correlationData.getReturnedMessage() != null) {
                throw new RuntimeException("Message was returned");
            }
        };
    }

}