使用 spring-amqp,从 PublisherReturn 回调内部向 rabbitmq 发送消息的最佳方式是什么?
With spring-amqp, what's the best way to send a message to rabbitmq from inside a PublisherReturn callback?
我正在使用 spring-amqp:2.1.6.RELEASE
我有一个带有 PublisherReturn 回调的 RabbitTemplate。
- 如果我将消息发送到没有队列绑定的 routingKey
它,然后正确调用 return 回调。发生这种情况时我
想要将消息发送到另一个 routingKey。然而,如果
我在 ReturnCallback 中使用 RabbitTemplate 它只是挂断了。我
没有看到任何消息 can/can 未发送,
RabbitTemplate 没有 return 控制我的 ReturnCallback 和我
也没有看到任何 PublisherConfirm。
- 如果我创建一个新的 RabbitTemplate(具有相同的 CachingConnectionFactory)
然后它仍然以相同的方式运行。我的电话挂断了。
- 如果我将消息发送到一个绑定了队列的 routingKey,
然后消息正确到达队列。 ReturnCallback 不是
在这种情况下调用。
经过一些调查,我得出的结论是 rabbitTemplate and/or 连接被阻塞,直到原始消息被完全处理。
如果我创建第二个 CachingConnectionFactory 和 RabbitTemplate,并在 PublisherReturn 回调中使用它们,那么它似乎工作正常。
那么,问题来了:使用 spring-amqp 在 PublisherReturn 回调中发送消息的最佳方式是什么?
我进行了搜索,但找不到任何内容来说明您应该如何执行此操作。
以下是我所拥有的简化细节:
@Configuration
public class MyConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setPublisherReturns(true);
// ... other settings left out for brevity
return connectionFactory;
}
@Bean
@Qualifier("rabbitTemplate")
public RabbitTemplate rabbitTemplate(ReturnCallbackForAlternative returnCallbackForAlternative) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(returnCallbackForAlternative);
// ... other settings left out for brevity
return rabbitTemplate;
}
@Bean
@Qualifier("connectionFactoryForUndeliverable")
public ConnectionFactory connectionFactoryForUndeliverable() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
// ... other settings left out for brevity
return connectionFactory;
}
@Bean
@Qualifier("rabbitTemplateForUndeliverable")
public RabbitTemplate rabbitTemplateForUndeliverable() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactoryForUndeliverable());
// ... other settings left out for brevity
return rabbitTemplate;
}
}
然后发送我正在使用的消息
@Autowired
@Qualifier("rabbitTemplate")
private RabbitTemplate rabbitTemplate;
public void send(Message message) {
rabbitTemplate.convertAndSend(
"exchange-name",
"primary-key",
message);
}
而ReturnCallback中的代码是
@Component
public class ReturnCallbackForAlternative implements RabbitTemplate.ReturnCallback {
@Autowired
@Qualifier("rabbitTemplateForUndeliverable")
private RabbitTemplate rabbitTemplate;
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
rabbitTemplate.convertAndSend(
"exchange-name",
"alternative-key",
message);
}
}
编辑
重现问题的简化示例。
给运行吧:
- 有RabbitMq 运行ning
- 将名为 foo 的交换绑定到名为 foo 的队列
- 运行 作为 spring 启动应用程序
您将看到以下输出:
in returnCallback before message send
但你不会看到:
in returnCallback after message send
如果您注释掉 connectionFactory.setPublisherConfirms(true);
它 运行 没问题。
@SpringBootApplication
public class HangingApplication {
public static void main(String[] args) {
SpringApplication.run(HangingApplication.class, args);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setPublisherReturns(true);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange("foo");
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("Confirm callback for main template. Ack=" + ack);
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("in returnCallback before message send");
rabbitTemplate.send("foo", message);
System.out.println("in returnCallback after message send");
});
return rabbitTemplate;
}
@Bean
public ApplicationRunner runner(@Qualifier("rabbitTemplate") RabbitTemplate template) {
return args -> {
template.convertAndSend("BADKEY", "foo payload");
};
}
@RabbitListener(queues = "foo")
public void listen(String in) {
System.out.println("Message received on undeliverable queue : " + in);
}
}
这是我使用的build.gradle:
plugins {
id 'org.springframework.boot' version '2.1.5.RELEASE'
id 'java'
}
apply plugin: 'io.spring.dependency-management'
group 'pcoates'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.11
repositories {
mavenCentral()
}
dependencies {
compile 'org.springframework.boot:spring-boot-starter-amqp'
}
它会导致 amqp-client 代码出现某种死锁。最简单的解决方案是在单独的线程上发送 - 在回调中使用 TaskExecutor
...
exec.execute(() -> template.send(...));
您可以使用相同的 template/connection 工厂,但发送必须 运行 在不同的线程上。
我以为我们最近更改了框架以始终在不同的线程上调用 return 回调(在最后一个人报告此之后),但看起来它被遗漏了。
我打开了an issue this time.
编辑
您确定使用的是 2.1.6 吗?
我们在 2.1.0 中通过阻止发送尝试使用与 return 到达的相同通道来解决此问题。这对我来说很好...
@SpringBootApplication
public class So57234770Application {
public static void main(String[] args) {
SpringApplication.run(So57234770Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
template.send("foo", message);
});
return args -> {
template.convertAndSend("BADKEY", "foo");
};
}
@RabbitListener(queues = "foo")
public void listen(String in) {
System.out.println(in);
}
}
如果您可以提供展示此行为的示例应用程序,我会看一看到底发生了什么。
我正在使用 spring-amqp:2.1.6.RELEASE
我有一个带有 PublisherReturn 回调的 RabbitTemplate。
- 如果我将消息发送到没有队列绑定的 routingKey 它,然后正确调用 return 回调。发生这种情况时我 想要将消息发送到另一个 routingKey。然而,如果 我在 ReturnCallback 中使用 RabbitTemplate 它只是挂断了。我 没有看到任何消息 can/can 未发送, RabbitTemplate 没有 return 控制我的 ReturnCallback 和我 也没有看到任何 PublisherConfirm。
- 如果我创建一个新的 RabbitTemplate(具有相同的 CachingConnectionFactory) 然后它仍然以相同的方式运行。我的电话挂断了。
- 如果我将消息发送到一个绑定了队列的 routingKey, 然后消息正确到达队列。 ReturnCallback 不是 在这种情况下调用。
经过一些调查,我得出的结论是 rabbitTemplate and/or 连接被阻塞,直到原始消息被完全处理。
如果我创建第二个 CachingConnectionFactory 和 RabbitTemplate,并在 PublisherReturn 回调中使用它们,那么它似乎工作正常。
那么,问题来了:使用 spring-amqp 在 PublisherReturn 回调中发送消息的最佳方式是什么?
我进行了搜索,但找不到任何内容来说明您应该如何执行此操作。
以下是我所拥有的简化细节:
@Configuration
public class MyConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setPublisherReturns(true);
// ... other settings left out for brevity
return connectionFactory;
}
@Bean
@Qualifier("rabbitTemplate")
public RabbitTemplate rabbitTemplate(ReturnCallbackForAlternative returnCallbackForAlternative) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(returnCallbackForAlternative);
// ... other settings left out for brevity
return rabbitTemplate;
}
@Bean
@Qualifier("connectionFactoryForUndeliverable")
public ConnectionFactory connectionFactoryForUndeliverable() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
// ... other settings left out for brevity
return connectionFactory;
}
@Bean
@Qualifier("rabbitTemplateForUndeliverable")
public RabbitTemplate rabbitTemplateForUndeliverable() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactoryForUndeliverable());
// ... other settings left out for brevity
return rabbitTemplate;
}
}
然后发送我正在使用的消息
@Autowired
@Qualifier("rabbitTemplate")
private RabbitTemplate rabbitTemplate;
public void send(Message message) {
rabbitTemplate.convertAndSend(
"exchange-name",
"primary-key",
message);
}
而ReturnCallback中的代码是
@Component
public class ReturnCallbackForAlternative implements RabbitTemplate.ReturnCallback {
@Autowired
@Qualifier("rabbitTemplateForUndeliverable")
private RabbitTemplate rabbitTemplate;
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
rabbitTemplate.convertAndSend(
"exchange-name",
"alternative-key",
message);
}
}
编辑
重现问题的简化示例。 给运行吧:
- 有RabbitMq 运行ning
- 将名为 foo 的交换绑定到名为 foo 的队列
- 运行 作为 spring 启动应用程序
您将看到以下输出:
in returnCallback before message send
但你不会看到:
in returnCallback after message send
如果您注释掉 connectionFactory.setPublisherConfirms(true);
它 运行 没问题。
@SpringBootApplication
public class HangingApplication {
public static void main(String[] args) {
SpringApplication.run(HangingApplication.class, args);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setPublisherReturns(true);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange("foo");
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("Confirm callback for main template. Ack=" + ack);
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("in returnCallback before message send");
rabbitTemplate.send("foo", message);
System.out.println("in returnCallback after message send");
});
return rabbitTemplate;
}
@Bean
public ApplicationRunner runner(@Qualifier("rabbitTemplate") RabbitTemplate template) {
return args -> {
template.convertAndSend("BADKEY", "foo payload");
};
}
@RabbitListener(queues = "foo")
public void listen(String in) {
System.out.println("Message received on undeliverable queue : " + in);
}
}
这是我使用的build.gradle:
plugins {
id 'org.springframework.boot' version '2.1.5.RELEASE'
id 'java'
}
apply plugin: 'io.spring.dependency-management'
group 'pcoates'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.11
repositories {
mavenCentral()
}
dependencies {
compile 'org.springframework.boot:spring-boot-starter-amqp'
}
它会导致 amqp-client 代码出现某种死锁。最简单的解决方案是在单独的线程上发送 - 在回调中使用 TaskExecutor
...
exec.execute(() -> template.send(...));
您可以使用相同的 template/connection 工厂,但发送必须 运行 在不同的线程上。
我以为我们最近更改了框架以始终在不同的线程上调用 return 回调(在最后一个人报告此之后),但看起来它被遗漏了。
我打开了an issue this time.
编辑
您确定使用的是 2.1.6 吗?
我们在 2.1.0 中通过阻止发送尝试使用与 return 到达的相同通道来解决此问题。这对我来说很好...
@SpringBootApplication
public class So57234770Application {
public static void main(String[] args) {
SpringApplication.run(So57234770Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
template.send("foo", message);
});
return args -> {
template.convertAndSend("BADKEY", "foo");
};
}
@RabbitListener(queues = "foo")
public void listen(String in) {
System.out.println(in);
}
}
如果您可以提供展示此行为的示例应用程序,我会看一看到底发生了什么。