使用 Function 回复 RPC 请求

Use Function to replyTo RPC request

我想使用 java.util.Function 方法来回复通过 RabbitTemplate.convertSendAndReceive 发送的请求。它在 RabbitListener 上运行良好,但我无法在函数式方法上运行它。

客户(工作)

class Client(private val template RabbitTemplate) {

    fun send() = template.convertSendAndReceive(
        "rpc-exchange",
        "rpc-routing-key",
        "payload message"
    )

}

服务器(方法 1,有效)

class Server {

    @RabbitListener(queues = ["rpc-queue"])
    fun receiveRequest(message: String) = "Response Message"

    @Bean
    fun queue(): Queue {
        return Queue("rpc-queue")
    }

    @Bean
    fun exchange(): DirectExchange {
        return DirectExchange("rpc-exchange")
    }

    @Bean
    fun binding(exchange: DirectExchange, queue: Queue): Binding {
        return BindingBuilder.bind(queue).to(exchange).with("rpc-routing-key")
    }
    
}

服务器(方法 2,无效)--> 目标

class Server {

    @Bean
    fun receiveRequest(): Function<String, String> {
        return Function { value: String ->
            "Response Message"
        }
    }

}

使用配置(方法 2)

spring.cloud.function.definition: receiveRequest
spring.cloud.stream.binding.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.binding.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key

使用方法 2 服务器接收。不幸的是,响应丢失了。有谁知道如何通过功能方法使用 RPC 模式?我不想使用 RabbitListener.

documentation/tutorial

Spring Cloud Stream 并不是真正为服务器端的 RPC 设计的,因此它不会像 @RabbitListener 那样自动处理。

但是,您可以通过添加输出绑定来实现它,以将回复路由到默认交换和 replyTo header:

spring.cloud.function.definition: receiveRequest
spring.cloud.stream.bindings.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.bindings.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key

spring.cloud.stream.bindings.receiveRequest-out-0.destination=
spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.routing-key-expression=headers['amqp_replyTo']

#logging.level.org.springframework.amqp=debug
@SpringBootApplication
public class So66586230Application {

    public static void main(String[] args) {
        SpringApplication.run(So66586230Application.class, args);
    }

    @Bean
    Function<String, String> receiveRequest() {
        return str -> {
            return str.toUpperCase();
        };
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            System.out.println(new String((byte[]) template.convertSendAndReceive(
                    "rpc-exchange",
                    "rpc-routing-key",
                    "payload message")));
        };
    }

}
PAYLOAD MESSAGE

请注意,回复将以 byte[] 形式出现;您可以在模板上使用自定义消息转换器来转换为字符串。

编辑

回复下方第三条评论。

RabbitTemplate默认使用direct reply-to,所以回复地址不是真实的queue,是binder创建的伪queue关联模板中的消费者。

您还可以将模板配置为使用临时回复 queue,但它们也会被默认交换路由到“”。

但是,您可以配置外部 reply container,并将模板作为侦听器。

然后您可以使用您想要的任何交换和路由密钥路由回。

综合起来:

spring.cloud.function.definition: receiveRequest
spring.cloud.stream.bindings.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.bindings.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key

spring.cloud.stream.bindings.receiveRequest-out-0.destination=reply-exchange
spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.routing-key-expression='reply-routing-key'
spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.declare-exchange=false

spring.rabbitmq.template.reply-timeout=10000

#logging.level.org.springframework.amqp=debug

public class So66586230Application {

    public static void main(String[] args) {
        SpringApplication.run(So66586230Application.class, args);
    }

    @Bean
    Function<String, String> receiveRequest() {
        return str -> {
            return str.toUpperCase();
        };
    }

    @Bean
    SimpleMessageListenerContainer replyContainer(SimpleRabbitListenerContainerFactory factory,
            RabbitTemplate template) {

        template.setReplyAddress("reply-queue");
        SimpleMessageListenerContainer container = factory.createListenerContainer();
        container.setQueueNames("reply-queue");
        container.setMessageListener(template);
        return container;
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template, SimpleMessageListenerContainer replyContainer) {
        return args -> {
            System.out.println(new String((byte[]) template.convertSendAndReceive(
                    "rpc-exchange",
                    "rpc-routing-key",
                    "payload message")));
        };
    }

}

重要提示:如果您有多个客户端实例,每个实例都需要自己的回复 queue。

在这种情况下,路由键必须是 queue 名称,您应该返回到前面的示例来设置路由键表达式(从 [=47] 中获取 queue 名称=]).