为什么在我调用 Channel.ack 之前调用确认回调
Why is the confirm callback called before I invoked Channel.ack
我目前正在尝试了解为什么在我调用 ChannelAwareMessageListener 上的 Channel.basicAck / Channel.basicNack 之前调用了我的 ConfirmCallback。
请在下面找到我当前的设置
@Component
public class MyMessageListener implements ChannelAwareMessageListener {
private Logger LOGGER = LoggerFactory.getLogger(MyMessageListener.class);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000L);
String arg = String.valueOf(message.getBody());
LOGGER.info("Received message {}", arg);
Thread.sleep(1000L);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@Component
public class LoggingConfirmCallback implements RabbitTemplate.ConfirmCallback{
private Logger LOGGER = LoggerFactory.getLogger(LoggingConfirmCallback.class);
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
LOGGER.info("Received confirm with result {}", ack);
}
}
@SpringBootApplication
@Configuration
public class Application {
@Autowired
RabbitTemplate rabbitTemplate;
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
context.getBean(Application.class).doIt();
}
public void doIt() {
rabbitTemplate.convertAndSend(null, null, "hello", new CorrelationData(UUID.randomUUID().toString()));
}
@Bean
@Qualifier("confirmConnectionFactory")
ConnectionFactory confirmConnectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);
factory.setHost("192.168.59.103");
factory.setChannelCacheSize(5);
return factory;
}
@Bean
@Primary
RabbitTemplate firstExchange(@Qualifier("confirmConnectionFactory") ConnectionFactory connectionFactory, LoggingConfirmCallback loggingConfirmCallback) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(loggingConfirmCallback);
rabbitTemplate.setExchange("first");
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
@Bean
MessageListenerAdapter myMessageListenerAdapter(MyMessageListener receiver) {
return new MessageListenerAdapter(receiver);
}
@Bean
SimpleMessageListenerContainer myQueueListener(@Qualifier("confirmConnectionFactory")ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames("first.queue");
container.setMessageListener(listenerAdapter);
return container;
}
我在日志中看到的是这样的
2015-07-17 08:56:32.352 INFO 5892 --- [ main] com.coderskitchen.rmqdct.Application : Started Application in 1.393 seconds (JVM running for 1.704)
2015-07-17 08:56:32.372 INFO 5892 --- [168.59.103:5672] c.c.rmqdct.LoggingConfirmCallback : Received confirm with result true
2015-07-17 08:56:33.373 INFO 5892 --- [cTaskExecutor-1] c.c.rmqdct.MyMessageListener : Received message [B@67962299
但我希望收到消息
Received confirm with result true
之后
Received message [B@67962299
提前致谢
彼得
Publisher confirms与消息接收无关
代理通过成功将消息传递到配置的队列来确认它已对消息负责。
它完全独立于消费者确认接收。如果需要,您必须将应用程序级消息发送回生产者。
我目前正在尝试了解为什么在我调用 ChannelAwareMessageListener 上的 Channel.basicAck / Channel.basicNack 之前调用了我的 ConfirmCallback。
请在下面找到我当前的设置
@Component
public class MyMessageListener implements ChannelAwareMessageListener {
private Logger LOGGER = LoggerFactory.getLogger(MyMessageListener.class);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000L);
String arg = String.valueOf(message.getBody());
LOGGER.info("Received message {}", arg);
Thread.sleep(1000L);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@Component
public class LoggingConfirmCallback implements RabbitTemplate.ConfirmCallback{
private Logger LOGGER = LoggerFactory.getLogger(LoggingConfirmCallback.class);
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
LOGGER.info("Received confirm with result {}", ack);
}
}
@SpringBootApplication
@Configuration
public class Application {
@Autowired
RabbitTemplate rabbitTemplate;
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
context.getBean(Application.class).doIt();
}
public void doIt() {
rabbitTemplate.convertAndSend(null, null, "hello", new CorrelationData(UUID.randomUUID().toString()));
}
@Bean
@Qualifier("confirmConnectionFactory")
ConnectionFactory confirmConnectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);
factory.setHost("192.168.59.103");
factory.setChannelCacheSize(5);
return factory;
}
@Bean
@Primary
RabbitTemplate firstExchange(@Qualifier("confirmConnectionFactory") ConnectionFactory connectionFactory, LoggingConfirmCallback loggingConfirmCallback) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(loggingConfirmCallback);
rabbitTemplate.setExchange("first");
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
@Bean
MessageListenerAdapter myMessageListenerAdapter(MyMessageListener receiver) {
return new MessageListenerAdapter(receiver);
}
@Bean
SimpleMessageListenerContainer myQueueListener(@Qualifier("confirmConnectionFactory")ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames("first.queue");
container.setMessageListener(listenerAdapter);
return container;
}
我在日志中看到的是这样的
2015-07-17 08:56:32.352 INFO 5892 --- [ main] com.coderskitchen.rmqdct.Application : Started Application in 1.393 seconds (JVM running for 1.704)
2015-07-17 08:56:32.372 INFO 5892 --- [168.59.103:5672] c.c.rmqdct.LoggingConfirmCallback : Received confirm with result true
2015-07-17 08:56:33.373 INFO 5892 --- [cTaskExecutor-1] c.c.rmqdct.MyMessageListener : Received message [B@67962299
但我希望收到消息
Received confirm with result true
之后
Received message [B@67962299
提前致谢
彼得
Publisher confirms与消息接收无关
代理通过成功将消息传递到配置的队列来确认它已对消息负责。
它完全独立于消费者确认接收。如果需要,您必须将应用程序级消息发送回生产者。