RabbitMQ Spring Java - RPC 模式

RabbitMQ Spring Java - RPC Pattern

我有一些 Spring 应用程序使用 RabbitMQ 作为代理在它们之间进行通信。我可以在它们之间异步发送和接收消息。但是现在,我需要一个应用程序向另一个应用程序发送消息并等待响应。因此,为此我正在尝试实现 RPC 模式。它正在工作,但问题是我只能使用 Spring.

生成的临时队列来做到这一点

https://www.rabbitmq.com/tutorials/tutorial-six-spring-amqp.html

这是发送消息并等待响应的代码。

public void send() {
    ....
    Integer response = (Integer) template.convertSendAndReceive(exchange.getName(), "rpc", "message");
    ...
}

当我发送消息时,执行被阻塞,直到收到响应,并且 Spring 为响应创建了一个临时队列,正如预期的那样。

但我需要的是使用我定义的特定且固定的队列来接收响应。我需要使用指向固定响应队列的路由键将响应发送到交换(这样做我也可以将响应发送到另一个队列,这将记录所有响应)。

我尝试为消息设置“setReplyTo”属性,但没有用。

您使用的是什么版本?对于现代版本,默认使用直接 reply_to,但您可以通过在模板上设置 属性 来恢复使用临时队列。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#direct-reply-to

要使用命名回复队列,请参阅有关如何设置回复容器并将模板作为消息侦听器的文档:

https://docs.spring.io/spring-amqp/docs/current/reference/html/#message-correlation-with-a-reply-queue

https://docs.spring.io/spring-amqp/docs/current/reference/html/#reply-listener

编辑

模板将阻塞,直到回复容器将相应的回复传递给它(或超时)。

@SpringBootApplication
public class So68986604Application {

    public static void main(String[] args) {
        SpringApplication.run(So68986604Application.class, args);
    }

    @RabbitListener(queues = "foo")
    public String listen(String in) {
        System.out.println(in);
        return in.toUpperCase();
    }

    @Bean
    Queue foo() {
        return new Queue("foo");
    }

    @Bean
    Queue replies() {
        return new Queue("foo.replies");
    }

    @Bean
    SimpleMessageListenerContainer replyContainer(ConnectionFactory cf, RabbitTemplate template) {
        SimpleMessageListenerContainer replyer = new SimpleMessageListenerContainer(cf);
        replyer.setQueueNames("foo.replies");
        replyer.setMessageListener(template);
        template.setReplyAddress("foo.replies");
        return replyer;
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            System.out.println(template.convertSendAndReceive("", "foo", "test"));
        };
    }
}
test
TEST