waitForConfirmsOrDie 对比 PublisherCallbackChannel.Listener

waitForConfirmsOrDie vs PublisherCallbackChannel.Listener

我需要在 spring 的核心 java 实现中实现 waitForConfirmsOrDie 的影响。在核心 java 中,它是可实现的请求明智的( channel.confirmSelect ,设置 Mandatory ,发布和 Channel.waitForConfirmsOrDie(10000) 将等待 10 秒)

我实现了 template.setConfirmCallback(希望它与 PublisherCallbackChannel.Listener 相同)并且效果很好,但是 ack/nack 在一个公共位置(确认回电),对于个人发件人不知道 waitForConfirmsOrDie ,他确定在这段时间内 ack 还没有到来并且可以采取行动

发送方法在内部等待指定的时间,如 spring 中的 waitForConfirmsOrDie,如果 ack 尚未到来且 publisherConfirms 已启用。

目前 Spring API 中没有对应的 waitForConfirmsOrDie

与发布者一起使用连接工厂确认其渠道上启用的调用confirmSelect();与模板确认回调一起,您可以通过自己记录发送次数并向回调中添加等待等待的方法来实现相同的功能 - 类似于...

@Autowired
private RabbitTemplate template;

private void runDemo() throws Exception {
    MyCallback confirmCallback = new MyCallback();
    this.template.setConfirmCallback(confirmCallback);
    this.template.setMandatory(true);
    for (int i = 0; i < 10; i++) {
        template.convertAndSend(queue().getName(), "foo");
    }
    confirmCallback.waitForConfirmsOrDie(10, 10_000);
    System.out.println("All ack'd");
}

private static class MyCallback implements ConfirmCallback {

    private final BlockingQueue<Boolean> queue = new LinkedBlockingQueue<>();

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        queue.add(ack);
    }

    public void waitForConfirmsOrDie(int count, long timeout) throws Exception {
        int remaining = count;
        while (remaining-- > 0) {
            Boolean ack = queue.poll(timeout, TimeUnit.MILLISECONDS);
            if (ack == null) {
                throw new TimeoutException("timed out waiting for acks");
            }
            else if (!ack) {
                System.err.println("Received a nack");
            }
        }
    }

}

一个区别是频道不会被强制关闭。

此外,在多线程环境中,您需要每个线程专用的 template/callback,或者使用 CorrelationData 将确认与发送相关联(例如,将线程 ID 放入相关数据并在回调中使用它)。

我已经打开 AMQP-717 让我们考虑提供这样的开箱即用的东西。