如何配置 "end to end" 发布者使用 Spring AMQP 确认

How to configure "end to end" publisher confirms with Spring AMQP

我想通过 RabbitMQ 和 Spring AMQP 使用发布者确认,这样如果侦听器在处理消息期间抛出异常,消息确认回调将获得 NACK。

下面this blog post,我说的是红色标记的用例:

主要问题是:

  1. 我必须如何配置 ConnectionFactory、RabbitTemplate 和 ListenerContainer 才能启用手动 NACK?

  2. 我必须在我的侦听器中做什么才能 NACK 消息并在出现异常时使用 success = false 调用确认回调?

这是我的豆子:

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setPublisherConfirms(true);
    return connectionFactory;
}

@Bean
public ConfirmCallback confirmCallback() {
    return new ConfirmCallbackTestImplementation();
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ConfirmCallback confirmCallback) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setConfirmCallback(confirmCallback);
    rabbitTemplate.setExchange(DIRECT_EXCHANGE);
    return rabbitTemplate;
}

@Bean
public FaultyMessageListener faultyListener(RabbitAdmin rabbitAdmin, DirectExchange exchange, ConnectionFactory connectionFactory) {
    Queue queue = queue(rabbitAdmin, exchange, "faultyListener");
    FaultyMessageListener listener = new FaultyMessageListener();
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setMessageListener(listener);
    container.setQueues(queue);
    container.setDefaultRequeueRejected(false);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    container.start();
    return listener;
}

private Queue queue(RabbitAdmin rabbitAdmin, DirectExchange exchange, String routingKey) {
    Queue queue = new Queue(routingKey, true, false, true);
    rabbitAdmin.declareQueue(queue);
    rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
    return queue;
}

这是我的监听器实现:

public class FaultyMessageListener implements ChannelAwareMessageListener {

    private final List<Message> receivedMessages = new ArrayList<>();

    private final CountDownLatch latch = new CountDownLatch(1);

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        receivedMessages.add(message);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        latch.countDown();
        throw new AmqpException("Message could not be processed");
    }

}

这是我的确认回调:

public static class ConfirmCallbackTestImplementation implements ConfirmCallback {

    private volatile Map<String, Boolean> confirmations = new HashMap<>();
    private volatile HashMap<String, CountDownLatch> expectationLatches = new HashMap<>();

    @Override
    public void confirm(CorrelationData correlationData, boolean success, String s) {
        confirmations.put(correlationData.getId(), success);
        expectationLatches.get(correlationData.getId()).countDown();
    }

    public CountDownLatch expect(String correlationId) {
        CountDownLatch latch = new CountDownLatch(1);
        this.expectationLatches.put(correlationId, latch);
        return latch;
    }

}

然后我使用以下测试来验证所需的行为:

@Autowired
private RabbitTemplate template;

@Autowired
private FaultyMessageListener faultyListener;

@Autowired
private ConfirmCallbackTestImplementation testConfirmCallback;

@Test
public void sendMessageToFaultyMessageListenerResultsInNack() throws InterruptedException {
    String correlationId = "corr-data-test-2";
    CountDownLatch confirmationLatch = testConfirmCallback.expect(correlationId);

    template.convertAndSend("ConnectionsTests.PublisherConfirm", "faultyListener", "faulty message", new CorrelationData(correlationId));

    assertTrue(faultyListener.latch.await(1, TimeUnit.SECONDS));
    confirmationLatch.await(1, TimeUnit.SECONDS);

    assertThat(faultyListener.receivedMessages.size(), is(1));
    assertThat(testConfirmCallback.confirmations.get(correlationId), is(false));
}

测试结果为:

java.lang.AssertionError: 
    Expected: is <false>
         but: was <true>

最后一个断言。对我来说,这听起来像是总是用 success = true 调用 confirm 回调,而不是 success = false 我期望从我的监听器中的 channel.basicNack(...) 得到的。

这样不行;发布者端的 ack/nack 纯粹是代理是否已接受消息。事实上,很少返回 nack,因为这意味着代理本身存在问题 - 请参阅 the rabbit documentation.

basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue.

同样,消费者端的 ack/nack 纯粹是关于消费者是否已接受对消息的责任,并且 nack 允许消息重新排队、丢弃或路由到死信队列。

消息发布后,消费者不会再与发布者进行任何通信。如果你需要这样的通信,你需要设置回复队列。

如果您希望发布者和消费者之间紧密耦合,则可以改用 Spring Remoting (RPC) Over RabbitMQ。如果消费者抛出异常,它将传播回发布者 - 但是,该机制仅支持 Java Serializable 个对象。

尽管文档引用了 XML,但您可以将代理和服务调用程序连接为 @Beans