spring-amqp RabbitMQ 动态更改发布者确认
spring-amqp RabbitMQ dynamically change publisher-confirms
有没有办法更改每条消息的发布者确认?我们有一个接收消息并发布到 RabbitMQ 的休息层。我们会根据某些消息属性来决定是否需要发布者确认。
有没有办法在发送消息时覆盖发布者确认?
否;我们必须添加一堆脚手架来支持 returns。此外,通道会被缓存,一旦设置,就无法关闭对通道的确认。我们必须保留 2 个不同的缓存。
如果您希望使用条件确认,您可以定义两个连接工厂(和模板),一个启用确认,一个不启用,select在运行时使用哪个模板。
编辑
@SpringBootApplication
public class So41131612Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So41131612Application.class, args);
context.getBean("normalTemplate", RabbitTemplate.class).convertAndSend("foo", "foo");
context.getBean("confirmingTemplate", RabbitTemplate.class).convertAndSend("", "foo", "foo",
new CorrelationData("foo"));
Thread.sleep(2000);
context.getBean(RabbitAdmin.class).deleteQueue("foo");
context.close();
}
@Bean
public Queue foo() {
return new Queue("foo");
}
@Bean
@Primary
public CachingConnectionFactory rabbitConnectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public CachingConnectionFactory confirmingCf() {
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
cf.setPublisherConfirms(true);
return cf;
}
@Bean
public AmqpTemplate normalTemplate(@Qualifier("rabbitConnectionFactory") CachingConnectionFactory normalCf) {
return new RabbitTemplate(normalCf);
}
@Bean
public AmqpTemplate confirmingTemplate(@Qualifier("confirmingCf") CachingConnectionFactory confirmingCf) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(confirmingCf);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((cd, ack, cause) -> {
System.out.println("Correlation:" + cd + " ack: " + ack);
});
return rabbitTemplate;
}
}
有没有办法更改每条消息的发布者确认?我们有一个接收消息并发布到 RabbitMQ 的休息层。我们会根据某些消息属性来决定是否需要发布者确认。
有没有办法在发送消息时覆盖发布者确认?
否;我们必须添加一堆脚手架来支持 returns。此外,通道会被缓存,一旦设置,就无法关闭对通道的确认。我们必须保留 2 个不同的缓存。
如果您希望使用条件确认,您可以定义两个连接工厂(和模板),一个启用确认,一个不启用,select在运行时使用哪个模板。
编辑
@SpringBootApplication
public class So41131612Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So41131612Application.class, args);
context.getBean("normalTemplate", RabbitTemplate.class).convertAndSend("foo", "foo");
context.getBean("confirmingTemplate", RabbitTemplate.class).convertAndSend("", "foo", "foo",
new CorrelationData("foo"));
Thread.sleep(2000);
context.getBean(RabbitAdmin.class).deleteQueue("foo");
context.close();
}
@Bean
public Queue foo() {
return new Queue("foo");
}
@Bean
@Primary
public CachingConnectionFactory rabbitConnectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public CachingConnectionFactory confirmingCf() {
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
cf.setPublisherConfirms(true);
return cf;
}
@Bean
public AmqpTemplate normalTemplate(@Qualifier("rabbitConnectionFactory") CachingConnectionFactory normalCf) {
return new RabbitTemplate(normalCf);
}
@Bean
public AmqpTemplate confirmingTemplate(@Qualifier("confirmingCf") CachingConnectionFactory confirmingCf) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(confirmingCf);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((cd, ack, cause) -> {
System.out.println("Correlation:" + cd + " ack: " + ack);
});
return rabbitTemplate;
}
}