RabbitMQ request/response "RabbitTemplate is not configured as listener"
RabbitMQ request/response "RabbitTemplate is not configured as listener"
我正在使用 Spring-AMQP rabbitmq 实现测试 request/response 模式,但我无法让它工作...
我配置了以下工件:
test_exchange 有问候队列。路由键 = greeting
reply_exchange 有回复队列。路由键 = 回复
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("....IP of broker...");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public Queue greeting() {
return new Queue("greeting");
}
@Bean
public Queue replies() {
return new Queue("replies");
}
MessageListener receiver() {
return new MessageListenerAdapter(new RabbitMqReceiver(), "onMessage");
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Queue replies) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setExchange("test_exchange");
template.setRoutingKey("greeting");
template.setReplyAddress("reply_exchange"+"/"+replies.getName());
template.setReplyTimeout(60000);
return template;
}
@Bean
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory,
RabbitTemplate rabbitTemplate, Queue replies) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setMessageListener(rabbitTemplate);
container.setQueues(replies);
return container;
}
@Bean
public SimpleMessageListenerContainer serviceListenerContainer(
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(greeting());
container.setMessageListener(receiver());
return container;
}
我正在按照 github 上的示例进行操作,但它崩溃了:
Caused by: java.lang.IllegalStateException: RabbitTemplate is not configured as
MessageListener - cannot use a 'replyAddress': reply_exchange/replies
文档说:
Starting with version 1.5, the RabbitTemplate will detect if it has been configured as a MessageListener to receive replies. If not, attempts to send and receive messages with a reply address will fail with an IllegalStateException (because the replies will never be received).
这很好,但是 RabbitTemplate 是怎么做到的呢?它如何检测是否配置为 MessageListener?
提前致谢
PS: 发送码:
public void send() {
Message message = MessageBuilder.withBody("Payload".getBytes())
.setContentType("text/plain")
.build();
Message reply = this.template.sendAndReceive(message);
System.out.println("Reply from server is: "+new String(reply.getBody()));
}
当回复容器启动时,它检测到模板是 ListenerContainerAware
并调用 expectedQueueNames()
来检索回复队列(如果 replyAddress 的格式为 exch/rk,则为 null);如果返回非空结果,则容器检查队列是否正确;如果 exch/rk 是回复地址,你会得到这个
logger.debug("Cannot verify reply queue because it has the form 'exchange/routingKey'");
此方法无条件设置 isListener
布尔值,从而避免了该异常。所以看起来容器在您发送消息之前还没有启动 - 您是在上下文完全初始化之前发送吗?
请注意,由于 RabbitMQ 实现了直接回复,通常不再需要使用回复容器(除非您想要 HA 回复队列或出于其他原因需要显式回复队列)。直接回复消除了驱使我们实施回复容器机制的性能问题。
加里,你的直觉一如既往的完美。
我更改了我的发送代码:
@SpringBootApplication
public class App
{
@Bean(initMethod="send")
public RabbitMqSender sender() {
final RabbitMqSender sender = new RabbitMqSender();
return sender;
}
public static void main(String[] args) throws Exception {
SpringApplication.run(App.class, args);
}
}
至:
@SpringBootApplication
public class App
{
public static void main(String[] args) throws Exception {
final ConfigurableApplicationContext configAppContext = SpringApplication.run(App.class, args);
final RabbitMqSender sender = configAppContext.getBean(RabbitMqSender.class);
sender.send();
}
}
确保在所有 bean 都准备就绪时发送,request/response 效果非常好!
是的,我一定会尝试 Direct-TO 模式。
谢谢加里的帮助。
问候
托马斯
我正在使用 Spring-AMQP rabbitmq 实现测试 request/response 模式,但我无法让它工作...
我配置了以下工件:
test_exchange 有问候队列。路由键 = greeting
reply_exchange 有回复队列。路由键 = 回复
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("....IP of broker...");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public Queue greeting() {
return new Queue("greeting");
}
@Bean
public Queue replies() {
return new Queue("replies");
}
MessageListener receiver() {
return new MessageListenerAdapter(new RabbitMqReceiver(), "onMessage");
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Queue replies) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setExchange("test_exchange");
template.setRoutingKey("greeting");
template.setReplyAddress("reply_exchange"+"/"+replies.getName());
template.setReplyTimeout(60000);
return template;
}
@Bean
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory,
RabbitTemplate rabbitTemplate, Queue replies) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setMessageListener(rabbitTemplate);
container.setQueues(replies);
return container;
}
@Bean
public SimpleMessageListenerContainer serviceListenerContainer(
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(greeting());
container.setMessageListener(receiver());
return container;
}
我正在按照 github 上的示例进行操作,但它崩溃了:
Caused by: java.lang.IllegalStateException: RabbitTemplate is not configured as MessageListener - cannot use a 'replyAddress': reply_exchange/replies
文档说:
Starting with version 1.5, the RabbitTemplate will detect if it has been configured as a MessageListener to receive replies. If not, attempts to send and receive messages with a reply address will fail with an IllegalStateException (because the replies will never be received).
这很好,但是 RabbitTemplate 是怎么做到的呢?它如何检测是否配置为 MessageListener?
提前致谢
PS: 发送码:
public void send() {
Message message = MessageBuilder.withBody("Payload".getBytes())
.setContentType("text/plain")
.build();
Message reply = this.template.sendAndReceive(message);
System.out.println("Reply from server is: "+new String(reply.getBody()));
}
当回复容器启动时,它检测到模板是 ListenerContainerAware
并调用 expectedQueueNames()
来检索回复队列(如果 replyAddress 的格式为 exch/rk,则为 null);如果返回非空结果,则容器检查队列是否正确;如果 exch/rk 是回复地址,你会得到这个
logger.debug("Cannot verify reply queue because it has the form 'exchange/routingKey'");
此方法无条件设置 isListener
布尔值,从而避免了该异常。所以看起来容器在您发送消息之前还没有启动 - 您是在上下文完全初始化之前发送吗?
请注意,由于 RabbitMQ 实现了直接回复,通常不再需要使用回复容器(除非您想要 HA 回复队列或出于其他原因需要显式回复队列)。直接回复消除了驱使我们实施回复容器机制的性能问题。
加里,你的直觉一如既往的完美。
我更改了我的发送代码:
@SpringBootApplication
public class App
{
@Bean(initMethod="send")
public RabbitMqSender sender() {
final RabbitMqSender sender = new RabbitMqSender();
return sender;
}
public static void main(String[] args) throws Exception {
SpringApplication.run(App.class, args);
}
}
至:
@SpringBootApplication
public class App
{
public static void main(String[] args) throws Exception {
final ConfigurableApplicationContext configAppContext = SpringApplication.run(App.class, args);
final RabbitMqSender sender = configAppContext.getBean(RabbitMqSender.class);
sender.send();
}
}
确保在所有 bean 都准备就绪时发送,request/response 效果非常好!
是的,我一定会尝试 Direct-TO 模式。
谢谢加里的帮助。
问候
托马斯