spring-amqp RabbitMQ 动态更改发布者确认

spring-amqp RabbitMQ dynamically change publisher-confirms

有没有办法更改每条消息的发布者确认?我们有一个接收消息并发布到 RabbitMQ 的休息层。我们会根据某些消息属性来决定是否需要发布者确认。

有没有办法在发送消息时覆盖发布者确认?

否;我们必须添加一堆脚手架来支持 returns。此外,通道会被缓存,一旦设置,就无法关闭对通道的确认。我们必须保留 2 个不同的缓存。

如果您希望使用条件确认,您可以定义两个连接工厂(和模板),一个启用确认,一个不启用,select在运行时使用哪个模板。

编辑

@SpringBootApplication
public class So41131612Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So41131612Application.class, args);
        context.getBean("normalTemplate", RabbitTemplate.class).convertAndSend("foo", "foo");
        context.getBean("confirmingTemplate", RabbitTemplate.class).convertAndSend("", "foo", "foo",
                new CorrelationData("foo"));
        Thread.sleep(2000);
        context.getBean(RabbitAdmin.class).deleteQueue("foo");
        context.close();
    }

    @Bean
    public Queue foo() {
        return new Queue("foo");
    }

    @Bean
    @Primary
    public CachingConnectionFactory rabbitConnectionFactory() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public CachingConnectionFactory confirmingCf() {
        CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
        cf.setPublisherConfirms(true);
        return cf;
    }

    @Bean
    public AmqpTemplate normalTemplate(@Qualifier("rabbitConnectionFactory") CachingConnectionFactory normalCf) {
        return new RabbitTemplate(normalCf);
    }

    @Bean
    public AmqpTemplate confirmingTemplate(@Qualifier("confirmingCf") CachingConnectionFactory confirmingCf) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(confirmingCf);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((cd, ack, cause) -> {
            System.out.println("Correlation:" + cd + " ack: " + ack);
        });
        return rabbitTemplate;
    }

}