RabbitMQ Spring Java - RPC 模式
RabbitMQ Spring Java - RPC Pattern
我有一些 Spring 应用程序使用 RabbitMQ 作为代理在它们之间进行通信。我可以在它们之间异步发送和接收消息。但是现在,我需要一个应用程序向另一个应用程序发送消息并等待响应。因此,为此我正在尝试实现 RPC 模式。它正在工作,但问题是我只能使用 Spring.
生成的临时队列来做到这一点
https://www.rabbitmq.com/tutorials/tutorial-six-spring-amqp.html
这是发送消息并等待响应的代码。
public void send() {
....
Integer response = (Integer) template.convertSendAndReceive(exchange.getName(), "rpc", "message");
...
}
当我发送消息时,执行被阻塞,直到收到响应,并且 Spring 为响应创建了一个临时队列,正如预期的那样。
但我需要的是使用我定义的特定且固定的队列来接收响应。我需要使用指向固定响应队列的路由键将响应发送到交换(这样做我也可以将响应发送到另一个队列,这将记录所有响应)。
我尝试为消息设置“setReplyTo”属性,但没有用。
您使用的是什么版本?对于现代版本,默认使用直接 reply_to,但您可以通过在模板上设置 属性 来恢复使用临时队列。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#direct-reply-to
要使用命名回复队列,请参阅有关如何设置回复容器并将模板作为消息侦听器的文档:
和
https://docs.spring.io/spring-amqp/docs/current/reference/html/#reply-listener
编辑
模板将阻塞,直到回复容器将相应的回复传递给它(或超时)。
@SpringBootApplication
public class So68986604Application {
public static void main(String[] args) {
SpringApplication.run(So68986604Application.class, args);
}
@RabbitListener(queues = "foo")
public String listen(String in) {
System.out.println(in);
return in.toUpperCase();
}
@Bean
Queue foo() {
return new Queue("foo");
}
@Bean
Queue replies() {
return new Queue("foo.replies");
}
@Bean
SimpleMessageListenerContainer replyContainer(ConnectionFactory cf, RabbitTemplate template) {
SimpleMessageListenerContainer replyer = new SimpleMessageListenerContainer(cf);
replyer.setQueueNames("foo.replies");
replyer.setMessageListener(template);
template.setReplyAddress("foo.replies");
return replyer;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
System.out.println(template.convertSendAndReceive("", "foo", "test"));
};
}
}
test
TEST
我有一些 Spring 应用程序使用 RabbitMQ 作为代理在它们之间进行通信。我可以在它们之间异步发送和接收消息。但是现在,我需要一个应用程序向另一个应用程序发送消息并等待响应。因此,为此我正在尝试实现 RPC 模式。它正在工作,但问题是我只能使用 Spring.
生成的临时队列来做到这一点https://www.rabbitmq.com/tutorials/tutorial-six-spring-amqp.html
这是发送消息并等待响应的代码。
public void send() {
....
Integer response = (Integer) template.convertSendAndReceive(exchange.getName(), "rpc", "message");
...
}
当我发送消息时,执行被阻塞,直到收到响应,并且 Spring 为响应创建了一个临时队列,正如预期的那样。
但我需要的是使用我定义的特定且固定的队列来接收响应。我需要使用指向固定响应队列的路由键将响应发送到交换(这样做我也可以将响应发送到另一个队列,这将记录所有响应)。
我尝试为消息设置“setReplyTo”属性,但没有用。
您使用的是什么版本?对于现代版本,默认使用直接 reply_to,但您可以通过在模板上设置 属性 来恢复使用临时队列。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#direct-reply-to
要使用命名回复队列,请参阅有关如何设置回复容器并将模板作为消息侦听器的文档:
和
https://docs.spring.io/spring-amqp/docs/current/reference/html/#reply-listener
编辑
模板将阻塞,直到回复容器将相应的回复传递给它(或超时)。
@SpringBootApplication
public class So68986604Application {
public static void main(String[] args) {
SpringApplication.run(So68986604Application.class, args);
}
@RabbitListener(queues = "foo")
public String listen(String in) {
System.out.println(in);
return in.toUpperCase();
}
@Bean
Queue foo() {
return new Queue("foo");
}
@Bean
Queue replies() {
return new Queue("foo.replies");
}
@Bean
SimpleMessageListenerContainer replyContainer(ConnectionFactory cf, RabbitTemplate template) {
SimpleMessageListenerContainer replyer = new SimpleMessageListenerContainer(cf);
replyer.setQueueNames("foo.replies");
replyer.setMessageListener(template);
template.setReplyAddress("foo.replies");
return replyer;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
System.out.println(template.convertSendAndReceive("", "foo", "test"));
};
}
}
test
TEST