使用 Function 回复 RPC 请求
Use Function to replyTo RPC request
我想使用 java.util.Function
方法来回复通过 RabbitTemplate.convertSendAndReceive
发送的请求。它在 RabbitListener
上运行良好,但我无法在函数式方法上运行它。
客户(工作)
class Client(private val template RabbitTemplate) {
fun send() = template.convertSendAndReceive(
"rpc-exchange",
"rpc-routing-key",
"payload message"
)
}
服务器(方法 1,有效)
class Server {
@RabbitListener(queues = ["rpc-queue"])
fun receiveRequest(message: String) = "Response Message"
@Bean
fun queue(): Queue {
return Queue("rpc-queue")
}
@Bean
fun exchange(): DirectExchange {
return DirectExchange("rpc-exchange")
}
@Bean
fun binding(exchange: DirectExchange, queue: Queue): Binding {
return BindingBuilder.bind(queue).to(exchange).with("rpc-routing-key")
}
}
服务器(方法 2,无效)--> 目标
class Server {
@Bean
fun receiveRequest(): Function<String, String> {
return Function { value: String ->
"Response Message"
}
}
}
使用配置(方法 2)
spring.cloud.function.definition: receiveRequest
spring.cloud.stream.binding.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.binding.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key
使用方法 2 服务器接收。不幸的是,响应丢失了。有谁知道如何通过功能方法使用 RPC 模式?我不想使用 RabbitListener
.
Spring Cloud Stream 并不是真正为服务器端的 RPC 设计的,因此它不会像 @RabbitListener
那样自动处理。
但是,您可以通过添加输出绑定来实现它,以将回复路由到默认交换和 replyTo
header:
spring.cloud.function.definition: receiveRequest
spring.cloud.stream.bindings.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.bindings.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key
spring.cloud.stream.bindings.receiveRequest-out-0.destination=
spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.routing-key-expression=headers['amqp_replyTo']
#logging.level.org.springframework.amqp=debug
@SpringBootApplication
public class So66586230Application {
public static void main(String[] args) {
SpringApplication.run(So66586230Application.class, args);
}
@Bean
Function<String, String> receiveRequest() {
return str -> {
return str.toUpperCase();
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
System.out.println(new String((byte[]) template.convertSendAndReceive(
"rpc-exchange",
"rpc-routing-key",
"payload message")));
};
}
}
PAYLOAD MESSAGE
请注意,回复将以 byte[]
形式出现;您可以在模板上使用自定义消息转换器来转换为字符串。
编辑
回复下方第三条评论。
RabbitTemplate
默认使用direct reply-to,所以回复地址不是真实的queue,是binder创建的伪queue关联模板中的消费者。
您还可以将模板配置为使用临时回复 queue,但它们也会被默认交换路由到“”。
但是,您可以配置外部 reply container,并将模板作为侦听器。
然后您可以使用您想要的任何交换和路由密钥路由回。
综合起来:
spring.cloud.function.definition: receiveRequest
spring.cloud.stream.bindings.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.bindings.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key
spring.cloud.stream.bindings.receiveRequest-out-0.destination=reply-exchange
spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.routing-key-expression='reply-routing-key'
spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.declare-exchange=false
spring.rabbitmq.template.reply-timeout=10000
#logging.level.org.springframework.amqp=debug
public class So66586230Application {
public static void main(String[] args) {
SpringApplication.run(So66586230Application.class, args);
}
@Bean
Function<String, String> receiveRequest() {
return str -> {
return str.toUpperCase();
};
}
@Bean
SimpleMessageListenerContainer replyContainer(SimpleRabbitListenerContainerFactory factory,
RabbitTemplate template) {
template.setReplyAddress("reply-queue");
SimpleMessageListenerContainer container = factory.createListenerContainer();
container.setQueueNames("reply-queue");
container.setMessageListener(template);
return container;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template, SimpleMessageListenerContainer replyContainer) {
return args -> {
System.out.println(new String((byte[]) template.convertSendAndReceive(
"rpc-exchange",
"rpc-routing-key",
"payload message")));
};
}
}
重要提示:如果您有多个客户端实例,每个实例都需要自己的回复 queue。
在这种情况下,路由键必须是 queue 名称,您应该返回到前面的示例来设置路由键表达式(从 [=47] 中获取 queue 名称=]).
我想使用 java.util.Function
方法来回复通过 RabbitTemplate.convertSendAndReceive
发送的请求。它在 RabbitListener
上运行良好,但我无法在函数式方法上运行它。
客户(工作)
class Client(private val template RabbitTemplate) {
fun send() = template.convertSendAndReceive(
"rpc-exchange",
"rpc-routing-key",
"payload message"
)
}
服务器(方法 1,有效)
class Server {
@RabbitListener(queues = ["rpc-queue"])
fun receiveRequest(message: String) = "Response Message"
@Bean
fun queue(): Queue {
return Queue("rpc-queue")
}
@Bean
fun exchange(): DirectExchange {
return DirectExchange("rpc-exchange")
}
@Bean
fun binding(exchange: DirectExchange, queue: Queue): Binding {
return BindingBuilder.bind(queue).to(exchange).with("rpc-routing-key")
}
}
服务器(方法 2,无效)--> 目标
class Server {
@Bean
fun receiveRequest(): Function<String, String> {
return Function { value: String ->
"Response Message"
}
}
}
使用配置(方法 2)
spring.cloud.function.definition: receiveRequest
spring.cloud.stream.binding.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.binding.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key
使用方法 2 服务器接收。不幸的是,响应丢失了。有谁知道如何通过功能方法使用 RPC 模式?我不想使用 RabbitListener
.
Spring Cloud Stream 并不是真正为服务器端的 RPC 设计的,因此它不会像 @RabbitListener
那样自动处理。
但是,您可以通过添加输出绑定来实现它,以将回复路由到默认交换和 replyTo
header:
spring.cloud.function.definition: receiveRequest
spring.cloud.stream.bindings.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.bindings.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key
spring.cloud.stream.bindings.receiveRequest-out-0.destination=
spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.routing-key-expression=headers['amqp_replyTo']
#logging.level.org.springframework.amqp=debug
@SpringBootApplication
public class So66586230Application {
public static void main(String[] args) {
SpringApplication.run(So66586230Application.class, args);
}
@Bean
Function<String, String> receiveRequest() {
return str -> {
return str.toUpperCase();
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
System.out.println(new String((byte[]) template.convertSendAndReceive(
"rpc-exchange",
"rpc-routing-key",
"payload message")));
};
}
}
PAYLOAD MESSAGE
请注意,回复将以 byte[]
形式出现;您可以在模板上使用自定义消息转换器来转换为字符串。
编辑
回复下方第三条评论。
RabbitTemplate
默认使用direct reply-to,所以回复地址不是真实的queue,是binder创建的伪queue关联模板中的消费者。
您还可以将模板配置为使用临时回复 queue,但它们也会被默认交换路由到“”。
但是,您可以配置外部 reply container,并将模板作为侦听器。
然后您可以使用您想要的任何交换和路由密钥路由回。
综合起来:
spring.cloud.function.definition: receiveRequest
spring.cloud.stream.bindings.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.bindings.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key
spring.cloud.stream.bindings.receiveRequest-out-0.destination=reply-exchange
spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.routing-key-expression='reply-routing-key'
spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.declare-exchange=false
spring.rabbitmq.template.reply-timeout=10000
#logging.level.org.springframework.amqp=debug
public class So66586230Application {
public static void main(String[] args) {
SpringApplication.run(So66586230Application.class, args);
}
@Bean
Function<String, String> receiveRequest() {
return str -> {
return str.toUpperCase();
};
}
@Bean
SimpleMessageListenerContainer replyContainer(SimpleRabbitListenerContainerFactory factory,
RabbitTemplate template) {
template.setReplyAddress("reply-queue");
SimpleMessageListenerContainer container = factory.createListenerContainer();
container.setQueueNames("reply-queue");
container.setMessageListener(template);
return container;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template, SimpleMessageListenerContainer replyContainer) {
return args -> {
System.out.println(new String((byte[]) template.convertSendAndReceive(
"rpc-exchange",
"rpc-routing-key",
"payload message")));
};
}
}
重要提示:如果您有多个客户端实例,每个实例都需要自己的回复 queue。
在这种情况下,路由键必须是 queue 名称,您应该返回到前面的示例来设置路由键表达式(从 [=47] 中获取 queue 名称=]).