在 Spring AMQP-RabbitMQ 中处理发布者确认超时

Handling Publisher Confirm timeout in Spring AMQP-RabbitMQ

我正在测试用于 AMQP 的 Spring RabbitMQ 实现,我想使用发布者确认。我在文档和代码中都缺少的是我应该如何处理特定年龄的未确认消息。

裸露的 RabbitMQ 客户端 java 库提供了一个 Channel.waitForConfirmsOrDie(timeout) 方法,它工作得很好,但这会迫使我在 Spring 抽象下更深入,为什么不呢我想继续发布并重试未确认的消息? (顺便说一句,如果 spring-retry 可以用于此,那就太棒了,目前我必须实现它)。

我确实找到了 RabbitTemplate.getUnconfirmed(long) 但我遇到的问题是它似乎不是线程安全的,因为当我的发布者不断发送消息并且我尝试重新发送超过 5 秒的未确认消息时,它抛出错误:

Exception in thread "publisher-A-500000to999999" java.util.ConcurrentModificationException
at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1207)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1243)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1238)
at org.springframework.amqp.rabbit.core.RabbitTemplate.getUnconfirmed(RabbitTemplate.java:503)
at com.mycompany.rabbitmq.tools.failover.Publisher.resendUnconfirmed(Publisher.java:65)
at com.mycompany.rabbitmq.tools.failover.Publisher.run(Publisher.java:52)
at java.lang.Thread.run(Thread.java:745)

可能是我做错了什么,因为我使用 CorrelationData 作为消息的持有者,所以重新发送更容易。

我创建了一个 MessageConfirmData class:

private static class MessageCorrelationData extends  CorrelationData {

    private final Message message;
    private final long messageIndex;
    private final int retryCount;

    public MessageCorrelationData(Message message, long messageIndex, int retryCount) {
        super(UUID.randomUUID().toString());
        this.message = message;
        this.messageIndex = messageIndex;
        this.retryCount = retryCount;
    }
}

这是我在每发送 100 条消息后调用的重发逻辑:

private int resendUnconfirmed() {
    Collection<CorrelationData> unconfirmed = rabbitTemplate.getUnconfirmed(5000);
    int numUnconfirmed = 0;
    if (unconfirmed != null ) {
        numUnconfirmed = unconfirmed.size();

        for (CorrelationData correlationData : unconfirmed) {
            MessageCorrelationData messageCorrelationData = (MessageCorrelationData) correlationData;
            trySend(exchange, messageCorrelationData.message, messageCorrelationData.retryCount + 1, messageCorrelationData.messageIndex);
        }
    }
    return numUnconfirmed;
}

我的确认回调:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        MessageCorrelationData mcd = (MessageCorrelationData) correlationData;
        if (!ack) {
            LOG.error("NACK, cause: " + cause + " resending, retry: " + mcd.retryCount);
            trySend(exchange,mcd.message, mcd.retryCount + 1, mcd.messageIndex);
        }
    });

最后发送:

    rabbitTemplate.convertAndSend(exchange, "", amqpMessage, new MessageCorrelationData(amqpMessage, messageIndex, retryCount));

您发现了一个错误;想要关注的我已经提出了JIRA Issue

我不确定 Spring 重试对这里有何帮助;如果您有一些想法,请随时打开新功能或 'Improvement' JIRA 问题。

编辑

Pull request issued.

我们应该很快就会发布它。