spring amqp setConfirmCallback 问题

spring amqp setConfirmCallback problems

我像下面这样配置 rabbitTemplate:

@Autowired
public Sender(RabbitTemplate rabbitTemplate) {
    //消息是否到达交换机的回调
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (!ack) {
            log.info("sender not send message to the right exchange" + " correlationData=" + correlationData + " ack=" + ack + " cause" + cause);
        } else {
            log.info("sender send message to the right exchange" + " correlationData=" + correlationData + " ack=" + ack + " cause" + cause);
        }
    });
    //消息是否到达正确的消息队列,如果没有会把消息返回
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> {
        log.info("Sender send message failed: " + message + " " + replyCode + " " + replyText + " " + tmpExchange + " " + tmpRoutingKey);
        //try to resend msg
    });

    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    rabbitTemplate.setRetryTemplate(retryTemplate);
    rabbitTemplate.setMandatory(true);
    this.rabbitTemplate = rabbitTemplate;

}

和发送方法

  public void send() {
        System.out.println("sender is sending message");
        String uuid1 = UUID.randomUUID().toString();
        String uuid2 = UUID.randomUUID().toString();
        String uuid3 = UUID.randomUUID().toString();
        System.out.println("UUID="+uuid1+"---"+uuid2+"---"+uuid3);
        // the right excharge name and routing key 
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "aaa.orange.bbb", "hello,world1 2", new CorrelationData(uuid1));
         // wrong exchage name 
        rabbitTemplate.convertAndSend("测试交换机名", "aaa.orange.ccc", "测试错误的交换机名", new CorrelationData(uuid2));
        // wrong excharge name  
        rabbitTemplate.convertAndSend("测试交换机名", "1111111", "测试错误的队列名", new CorrelationData(uuid3));

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

我的问题是什么时候我只编码

 rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "aaa.orange.bbb", "hello,world1 2", new CorrelationData(uuid1));

评论两行

rabbitTemplate.convertAndSend("测试交换机名", "aaa.orange.ccc", "测试错误的交换机名", new CorrelationData(uuid2));
// wrong excharge name  
rabbitTemplate.convertAndSend("测试交换机名", "1111111", "测试错误的队列名", new CorrelationData(uuid3));

confirmCallback 日志是 "sender send message to the right exchange"

但是如果我一次发送三个消息,confirmCallback 日志是 三个 "sender not send message to the right exchange" 日志和我检查队列,正确的消息正在发送到队列,我该如何解决这个问题?

你的问题不清楚;如果你的意思是你要发送到一个不存在的交易所——这对通道来说是致命的,所以任何未决的确认都将丢失。

由于Spring AMQP 缓存通道以供重用,下游操作可能导致通道关闭和确认丢失。

例如:

@SpringBootApplication
public class So48518319Application {

    public static void main(String[] args) {
        SpringApplication.run(So48518319Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            template.setConfirmCallback((correlation, ack, cause) -> {
                System.out.println(correlation + ":" + ack + " " + (cause == null ? "" : cause));
                ((MyCorrelationData) correlation).getLatch().countDown();
            });
            MyCorrelationData foo = new MyCorrelationData("foo");
            MyCorrelationData bar = new MyCorrelationData("bar");
            MyCorrelationData baz = new MyCorrelationData("baz");
            template.convertAndSend("output", "output.foo", "foo", foo);
            template.convertAndSend("output", "output.foo", "foo", bar);
            template.convertAndSend("output", "output.foo", "foo", baz);
            if (!foo.getLatch().await(10, TimeUnit.SECONDS)) {
                throw new RuntimeException("Foo failed");
            }
            if (!bar.getLatch().await(10, TimeUnit.SECONDS)) {
                throw new RuntimeException("Bar failed");
            }
            if (!baz.getLatch().await(10, TimeUnit.SECONDS)) {
                throw new RuntimeException("Baz failed");
            }
            System.out.println("All good");
        };
    }

    public static class MyCorrelationData extends CorrelationData {

        private CountDownLatch latch = new CountDownLatch(1);

        public MyCorrelationData(String id) {
            super(id);
        }

        protected CountDownLatch getLatch() {
            return this.latch;
        }

        protected void setLatch(CountDownLatch latch) {
            this.latch = latch;
        }

    }

}

效果不错

CorrelationData [id=foo]:true 
CorrelationData [id=bar]:true 
CorrelationData [id=baz]:true 

一切都很好,但如果我将其更改为

template.convertAndSend("output", "output.foo", "foo", foo);
template.convertAndSend("noutput", "output.foo", "foo", bar);
template.convertAndSend("noutput", "output.foo", "foo", baz);

我们得到

CorrelationData [id=foo]:false channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noutput' in vhost '/', class-id=60, method-id=40)
CorrelationData [id=bar]:false channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noutput' in vhost '/', class-id=60, method-id=40)
CorrelationData [id=baz]:false channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noutput' in vhost '/', class-id=60, method-id=40)

为了避免在收到 ack 之前重复使用通道,您可以使用模板的 invoke 方法;这可以防止通道被错误发送重用:

@SpringBootApplication
public class So48518319Application {

    public static void main(String[] args) {
        SpringApplication.run(So48518319Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            template.setConfirmCallback((correlation, ack, cause) -> {
                System.out.println(correlation + ":" + ack + " " + (cause == null ? "" : cause));
                MyCorrelationData myCorrelation = (MyCorrelationData) correlation;
                myCorrelation.getLatch().countDown();
                myCorrelation.setAck(ack);
            });
            MyCorrelationData foo = new MyCorrelationData("foo");
            MyCorrelationData bar = new MyCorrelationData("bar");
            MyCorrelationData baz = new MyCorrelationData("baz");
            boolean result1 = template.invoke(t -> {
                t.convertAndSend("output", "output.foo", "foo", foo);
                try {
                    if (!foo.getLatch().await(10, TimeUnit.SECONDS)) {
                        throw new RuntimeException("Foo failed");
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return foo.isAck();
            });
            boolean result2 = template.invoke(t -> {
                t.convertAndSend("noutput", "output.foo", "bar", bar);
                try {
                    if (!bar.getLatch().await(10, TimeUnit.SECONDS)) {
                        throw new RuntimeException("Bar failed");
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return bar.isAck();
            });
            boolean result3 = template.invoke(t -> {
                t.convertAndSend("noutput", "output.foo", "baz", baz);
                try {
                    if (!baz.getLatch().await(10, TimeUnit.SECONDS)) {
                        throw new RuntimeException("Baz failed");
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return baz.isAck();
            });
            System.out.println("All done: " + result1 + "," + result2 + "," + result3);
        };
    }

    public static class MyCorrelationData extends CorrelationData {

        private final CountDownLatch latch = new CountDownLatch(1);

        private volatile boolean ack;

        public MyCorrelationData(String id) {
            super(id);
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        public boolean isAck() {
            return this.ack;
        }

        public void setAck(boolean ack) {
            this.ack = ack;
        }

    }

}

CorrelationData [id=foo]:true 
CorrelationData [id=bar]:false channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noutput' in vhost '/', class-id=60, method-id=40)
CorrelationData [id=baz]:false channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noutput' in vhost '/', class-id=60, method-id=40)

全部完成:true, false, false

但这会破坏使用发布者确认的好处,除非您在单独的线程上进行发送。

底线是"don't send messages to non-existent exchanges if you are using confirms"。