RabbitMQ 确保消息到达队列
RabbitMQ be sure message reaches a queue
我想确定邮件正在到达队列。
否则,我要例外。
我试过发布者 returns,但这不是我需要的,因为它在不同的线程上,我认为在发送消息的线程上以某种方式等待它会很棘手。
没有交易通道,convertAndSend方法在没有交换的情况下成功返回,现在交易通道抛出异常。
在没有基于路由键的路由时,我需要的也是一样
@SpringBootApplication
public class DemoApplication {
private static final Logger log = Logger.getGlobal();
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info(replyCode + "," + replyText));
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
@Bean
CommandLineRunner commandLineRunner(RabbitTemplate rabbitTemplate) {
return args -> {
rabbitTemplate.convertAndSend("exchangeName", "routingKey", "message");
log.info("Send is done.");
};
}
}
仅 属性: spring.rabbitmq.publisher-returns=true
Spring开机版本:2.1.7.RELEASE
实际:
no exchange -> convertAndSend throws exception
no route at exchange -> method returns
预期:
no exchange -> convertAndSend throws exception
no route at exchange -> convertAndSend throws exception
您需要使用发布者确认和关联数据:
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.template.mandatory=true
@SpringBootApplication
public class So57464212Application {
public static void main(String[] args) {
SpringApplication.run(So57464212Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.err.println("Returned: " + replyText);
});
template.setConfirmCallback((correlationData, ack, cause) -> {
System.err.println("ack:" + ack);
});
return args -> {
CorrelationData correlationData = new CorrelationData("foo");
template.convertAndSend("", "NOQUEUE", "bar", correlationData);
correlationData.getFuture().get(10, TimeUnit.SECONDS);
if (correlationData.getReturnedMessage() != null) {
throw new RuntimeException("Message was returned");
}
};
}
}
我想确定邮件正在到达队列。 否则,我要例外。
我试过发布者 returns,但这不是我需要的,因为它在不同的线程上,我认为在发送消息的线程上以某种方式等待它会很棘手。
没有交易通道,convertAndSend方法在没有交换的情况下成功返回,现在交易通道抛出异常。
在没有基于路由键的路由时,我需要的也是一样
@SpringBootApplication
public class DemoApplication {
private static final Logger log = Logger.getGlobal();
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info(replyCode + "," + replyText));
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
@Bean
CommandLineRunner commandLineRunner(RabbitTemplate rabbitTemplate) {
return args -> {
rabbitTemplate.convertAndSend("exchangeName", "routingKey", "message");
log.info("Send is done.");
};
}
}
仅 属性: spring.rabbitmq.publisher-returns=true
Spring开机版本:2.1.7.RELEASE
实际:
no exchange -> convertAndSend throws exception
no route at exchange -> method returns
预期:
no exchange -> convertAndSend throws exception
no route at exchange -> convertAndSend throws exception
您需要使用发布者确认和关联数据:
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.template.mandatory=true
@SpringBootApplication
public class So57464212Application {
public static void main(String[] args) {
SpringApplication.run(So57464212Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.err.println("Returned: " + replyText);
});
template.setConfirmCallback((correlationData, ack, cause) -> {
System.err.println("ack:" + ack);
});
return args -> {
CorrelationData correlationData = new CorrelationData("foo");
template.convertAndSend("", "NOQUEUE", "bar", correlationData);
correlationData.getFuture().get(10, TimeUnit.SECONDS);
if (correlationData.getReturnedMessage() != null) {
throw new RuntimeException("Message was returned");
}
};
}
}