如何配置 "end to end" 发布者使用 Spring AMQP 确认
How to configure "end to end" publisher confirms with Spring AMQP
我想通过 RabbitMQ 和 Spring AMQP 使用发布者确认,这样如果侦听器在处理消息期间抛出异常,消息确认回调将获得 NACK。
下面this blog post,我说的是红色标记的用例:
主要问题是:
我必须如何配置 ConnectionFactory、RabbitTemplate 和 ListenerContainer 才能启用手动 NACK?
我必须在我的侦听器中做什么才能 NACK 消息并在出现异常时使用 success = false
调用确认回调?
这是我的豆子:
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public ConfirmCallback confirmCallback() {
return new ConfirmCallbackTestImplementation();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ConfirmCallback confirmCallback) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setExchange(DIRECT_EXCHANGE);
return rabbitTemplate;
}
@Bean
public FaultyMessageListener faultyListener(RabbitAdmin rabbitAdmin, DirectExchange exchange, ConnectionFactory connectionFactory) {
Queue queue = queue(rabbitAdmin, exchange, "faultyListener");
FaultyMessageListener listener = new FaultyMessageListener();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setMessageListener(listener);
container.setQueues(queue);
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.start();
return listener;
}
private Queue queue(RabbitAdmin rabbitAdmin, DirectExchange exchange, String routingKey) {
Queue queue = new Queue(routingKey, true, false, true);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
return queue;
}
这是我的监听器实现:
public class FaultyMessageListener implements ChannelAwareMessageListener {
private final List<Message> receivedMessages = new ArrayList<>();
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
receivedMessages.add(message);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
latch.countDown();
throw new AmqpException("Message could not be processed");
}
}
这是我的确认回调:
public static class ConfirmCallbackTestImplementation implements ConfirmCallback {
private volatile Map<String, Boolean> confirmations = new HashMap<>();
private volatile HashMap<String, CountDownLatch> expectationLatches = new HashMap<>();
@Override
public void confirm(CorrelationData correlationData, boolean success, String s) {
confirmations.put(correlationData.getId(), success);
expectationLatches.get(correlationData.getId()).countDown();
}
public CountDownLatch expect(String correlationId) {
CountDownLatch latch = new CountDownLatch(1);
this.expectationLatches.put(correlationId, latch);
return latch;
}
}
然后我使用以下测试来验证所需的行为:
@Autowired
private RabbitTemplate template;
@Autowired
private FaultyMessageListener faultyListener;
@Autowired
private ConfirmCallbackTestImplementation testConfirmCallback;
@Test
public void sendMessageToFaultyMessageListenerResultsInNack() throws InterruptedException {
String correlationId = "corr-data-test-2";
CountDownLatch confirmationLatch = testConfirmCallback.expect(correlationId);
template.convertAndSend("ConnectionsTests.PublisherConfirm", "faultyListener", "faulty message", new CorrelationData(correlationId));
assertTrue(faultyListener.latch.await(1, TimeUnit.SECONDS));
confirmationLatch.await(1, TimeUnit.SECONDS);
assertThat(faultyListener.receivedMessages.size(), is(1));
assertThat(testConfirmCallback.confirmations.get(correlationId), is(false));
}
测试结果为:
java.lang.AssertionError:
Expected: is <false>
but: was <true>
最后一个断言。对我来说,这听起来像是总是用 success = true
调用 confirm 回调,而不是 success = false
我期望从我的监听器中的 channel.basicNack(...)
得到的。
这样不行;发布者端的 ack/nack 纯粹是代理是否已接受消息。事实上,很少返回 nack,因为这意味着代理本身存在问题 - 请参阅 the rabbit documentation.
basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue.
同样,消费者端的 ack/nack 纯粹是关于消费者是否已接受对消息的责任,并且 nack 允许消息重新排队、丢弃或路由到死信队列。
消息发布后,消费者不会再与发布者进行任何通信。如果你需要这样的通信,你需要设置回复队列。
如果您希望发布者和消费者之间紧密耦合,则可以改用 Spring Remoting (RPC) Over RabbitMQ。如果消费者抛出异常,它将传播回发布者 - 但是,该机制仅支持 Java Serializable
个对象。
尽管文档引用了 XML,但您可以将代理和服务调用程序连接为 @Bean
s
我想通过 RabbitMQ 和 Spring AMQP 使用发布者确认,这样如果侦听器在处理消息期间抛出异常,消息确认回调将获得 NACK。
下面this blog post,我说的是红色标记的用例:
主要问题是:
我必须如何配置 ConnectionFactory、RabbitTemplate 和 ListenerContainer 才能启用手动 NACK?
我必须在我的侦听器中做什么才能 NACK 消息并在出现异常时使用
success = false
调用确认回调?
这是我的豆子:
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public ConfirmCallback confirmCallback() {
return new ConfirmCallbackTestImplementation();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ConfirmCallback confirmCallback) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setExchange(DIRECT_EXCHANGE);
return rabbitTemplate;
}
@Bean
public FaultyMessageListener faultyListener(RabbitAdmin rabbitAdmin, DirectExchange exchange, ConnectionFactory connectionFactory) {
Queue queue = queue(rabbitAdmin, exchange, "faultyListener");
FaultyMessageListener listener = new FaultyMessageListener();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setMessageListener(listener);
container.setQueues(queue);
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.start();
return listener;
}
private Queue queue(RabbitAdmin rabbitAdmin, DirectExchange exchange, String routingKey) {
Queue queue = new Queue(routingKey, true, false, true);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
return queue;
}
这是我的监听器实现:
public class FaultyMessageListener implements ChannelAwareMessageListener {
private final List<Message> receivedMessages = new ArrayList<>();
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
receivedMessages.add(message);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
latch.countDown();
throw new AmqpException("Message could not be processed");
}
}
这是我的确认回调:
public static class ConfirmCallbackTestImplementation implements ConfirmCallback {
private volatile Map<String, Boolean> confirmations = new HashMap<>();
private volatile HashMap<String, CountDownLatch> expectationLatches = new HashMap<>();
@Override
public void confirm(CorrelationData correlationData, boolean success, String s) {
confirmations.put(correlationData.getId(), success);
expectationLatches.get(correlationData.getId()).countDown();
}
public CountDownLatch expect(String correlationId) {
CountDownLatch latch = new CountDownLatch(1);
this.expectationLatches.put(correlationId, latch);
return latch;
}
}
然后我使用以下测试来验证所需的行为:
@Autowired
private RabbitTemplate template;
@Autowired
private FaultyMessageListener faultyListener;
@Autowired
private ConfirmCallbackTestImplementation testConfirmCallback;
@Test
public void sendMessageToFaultyMessageListenerResultsInNack() throws InterruptedException {
String correlationId = "corr-data-test-2";
CountDownLatch confirmationLatch = testConfirmCallback.expect(correlationId);
template.convertAndSend("ConnectionsTests.PublisherConfirm", "faultyListener", "faulty message", new CorrelationData(correlationId));
assertTrue(faultyListener.latch.await(1, TimeUnit.SECONDS));
confirmationLatch.await(1, TimeUnit.SECONDS);
assertThat(faultyListener.receivedMessages.size(), is(1));
assertThat(testConfirmCallback.confirmations.get(correlationId), is(false));
}
测试结果为:
java.lang.AssertionError:
Expected: is <false>
but: was <true>
最后一个断言。对我来说,这听起来像是总是用 success = true
调用 confirm 回调,而不是 success = false
我期望从我的监听器中的 channel.basicNack(...)
得到的。
这样不行;发布者端的 ack/nack 纯粹是代理是否已接受消息。事实上,很少返回 nack,因为这意味着代理本身存在问题 - 请参阅 the rabbit documentation.
basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue.
同样,消费者端的 ack/nack 纯粹是关于消费者是否已接受对消息的责任,并且 nack 允许消息重新排队、丢弃或路由到死信队列。
消息发布后,消费者不会再与发布者进行任何通信。如果你需要这样的通信,你需要设置回复队列。
如果您希望发布者和消费者之间紧密耦合,则可以改用 Spring Remoting (RPC) Over RabbitMQ。如果消费者抛出异常,它将传播回发布者 - 但是,该机制仅支持 Java Serializable
个对象。
尽管文档引用了 XML,但您可以将代理和服务调用程序连接为 @Bean
s