spring amqp-outbound gateway 产生来自不同 thead 的回复(像 jms-outbound gateway)
spring amqp-outbound gateway to produce reply from a different thead (Like jms-outbound gateway)
问题陈述:
Spring amqp-outbound gateway 从不同的线程产生回复(像 jms-outbound gateway,有不同的队列,使用关联键关联 request/response)。
无法将消息与此示例相关联。
Spring 整合
<int:gateway id="outboundGateway" service-interface="com.amqp.outbound.gateway.OutboundGateway"
default-reply-channel="defaultReplyChannel" >
<int:method name="process" request-channel="inboundRequestChannel"/>
</int:gateway>
<int:channel id="defaultReplyChannel"/>
<int:channel id="inboundRequestChannel"/>
<int:channel id="enrichedInboundRequestChannel"/>
<int:channel id="processAuthRequestChannel"/>
<int:channel id="postProcessorChannel"/>
<int:chain input-channel="inboundRequestChannel" output-channel="enrichedInboundRequestChannel">
<int:service-activator id="serviceActivator"
ref="ouboundService" method="createRequest"/>
</int:chain>
<int-amqp:outbound-gateway id="outboundGtwyId" header-mapper="headerMapper"
request-channel="enrichedInboundRequestChannel"
reply-channel="defaultReplyChannel"
amqp-template="template"
reply-timeout="30000"
exchange-name="request_exchange"
routing-key="request_exchange_queue"/>
<int-amqp:inbound-channel-adapter id="amqpMessageDriven" queue-names="request_queue"
connection-factory="rabbitConnectionFactory" channel="processAuthRequestChannel"/>
<int:service-activator id="serviceActivator"
ref="ouboundService" input-channel="processAuthRequestChannel" output-channel="postProcessorChannel"
method="processRequest"/>
<int-amqp:outbound-channel-adapter amqp-template="template" channel="postProcessorChannel"
header-mapper="headerMapper" exchange-name="reply_exchange" routing-key="reply_exchange_queue"/>
<bean id="headerMapper" class="org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper"/>
配置
@Bean
public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){
final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
template.setQueue("reply_queue");
return template;
}
@Bean
public Binding binding(){
return BindingBuilder.bind(this.queue()).to(this.exchange()).with("request_exchange_queue");
}
@Bean
public DirectExchange exchange(){
return new DirectExchange("request_exchange");
}
@Bean
public Queue queue(){
return new Queue("request_queue", true, false, true);
}
@Bean
public Binding bindingReply(){
return BindingBuilder.bind(this.queue()).to(this.exchange()).with("reply_exchange_queue");
}
@Bean
public DirectExchange exchangeReply(){
return new DirectExchange("reply_exchange");
}
@Bean
public Queue replyQueue(){
return new Queue("reply_queue", true, false, true);
}
服务
@Service
public final class OuboundService {
public Message createRequest(String message){
System.out.println("Inside createRequest : "+ message);
final String transactionId = UUID.randomUUID().toString();
final Message builtMessage = MessageBuilder.withBody(message.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setHeader(AmqpHeaders.CORRELATION_ID, transactionId)
.build();
return builtMessage;
}
public Message processRequest(Message message){
System.out.println("Inside process Request : "+ new String(message.getBody()));
System.out.println("Header values : "+message.getMessageProperties().getHeaders());
final Message result = MessageBuilder.withBody("Successful".getBytes()).copyProperties(message.getMessageProperties())
.copyHeaders(message.getMessageProperties().getHeaders()).build();
return result;
}
}
错误:
org.springframework.integration.handler.ReplyRequiredException: 处理程序 'outboundGtwyId' 未产生任何回复,其 'requiresReply' 属性 设置为 true。
GitHub 源代码(解决方案)
https://github.com/kingkongprab/spring-amqp-outbound-gateway
关联也在 Spring AMQP 中完成。有关详细信息,请参阅其 RabbitTemplate#sendAndRecevie()
。在 Reference Manual.
中也有关于此事的很好的文档
Spring 与其 AbstractAmqpOutboundEndpoint
和 AmqpInboundGateway
实现的集成提供了 out-of-the-box request-reply 相关解决方案。如果你不能在服务器端使用AmqpInboundGateway
,你应该确保correlationId
从接收到的请求传输到要发送回的回复。是的,您可以使用专用交换器进行回复,这就是 RabbitTemplate#setQueue()
支持的等待客户端出站回复的方式。但如果没有适当的 correlation
转移,那仍然行不通。有关 headers(包括 correlationId
)如何映射到 Spring 集成的信息,另请参阅 https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-message-headers。
更新
感谢您分享您的申请。
嗯,现在我看到几个问题:
您肯定缺少 replyQueue
绑定:
@Bean
public Binding bindingReply(){
return BindingBuilder.bind(this.replyQueue()).to(this.exchangeReply()).with("reply_exchange_queue");
}
RabbitTemplate
必须使用 setReplyAddress()
。您必须为 reply_queue
配置 MessageListenerContainer
并将 RabbitTemplate
作为侦听器:
@Bean
public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){
final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
template.setReplyAddress(replyQueue().getName());
return template;
}
@Bean
public MessageListenerContainer replyContainer(RabbitTemplate template) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(template.getConnectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(template);
return container;
}
你的 OuboundService
和 org.springframework.amqp.core.Message
操作是没用的。通道适配器不知道这种类型的 payload
,您的自定义 Message
只是变成另一个 org.springframework.amqp.core.Message
的序列化 body
。我已经将其更改为这个并且一切正常:
public String createRequest(String message){
System.out.println("Inside createRequest : "+ message);
return message;
}
public Message processRequest(Message message){
System.out.println("Inside process Request : " + message);
return message;
}
无论如何,我建议您重新考虑您的设计并返回 AmqpInboundGateway
。
顺便说一句,在最终的解决方案中你不需要关心任何 correlation
。框架会自动为您完成。
问题陈述:
Spring amqp-outbound gateway 从不同的线程产生回复(像 jms-outbound gateway,有不同的队列,使用关联键关联 request/response)。
无法将消息与此示例相关联。
Spring 整合
<int:gateway id="outboundGateway" service-interface="com.amqp.outbound.gateway.OutboundGateway"
default-reply-channel="defaultReplyChannel" >
<int:method name="process" request-channel="inboundRequestChannel"/>
</int:gateway>
<int:channel id="defaultReplyChannel"/>
<int:channel id="inboundRequestChannel"/>
<int:channel id="enrichedInboundRequestChannel"/>
<int:channel id="processAuthRequestChannel"/>
<int:channel id="postProcessorChannel"/>
<int:chain input-channel="inboundRequestChannel" output-channel="enrichedInboundRequestChannel">
<int:service-activator id="serviceActivator"
ref="ouboundService" method="createRequest"/>
</int:chain>
<int-amqp:outbound-gateway id="outboundGtwyId" header-mapper="headerMapper"
request-channel="enrichedInboundRequestChannel"
reply-channel="defaultReplyChannel"
amqp-template="template"
reply-timeout="30000"
exchange-name="request_exchange"
routing-key="request_exchange_queue"/>
<int-amqp:inbound-channel-adapter id="amqpMessageDriven" queue-names="request_queue"
connection-factory="rabbitConnectionFactory" channel="processAuthRequestChannel"/>
<int:service-activator id="serviceActivator"
ref="ouboundService" input-channel="processAuthRequestChannel" output-channel="postProcessorChannel"
method="processRequest"/>
<int-amqp:outbound-channel-adapter amqp-template="template" channel="postProcessorChannel"
header-mapper="headerMapper" exchange-name="reply_exchange" routing-key="reply_exchange_queue"/>
<bean id="headerMapper" class="org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper"/>
配置
@Bean
public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){
final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
template.setQueue("reply_queue");
return template;
}
@Bean
public Binding binding(){
return BindingBuilder.bind(this.queue()).to(this.exchange()).with("request_exchange_queue");
}
@Bean
public DirectExchange exchange(){
return new DirectExchange("request_exchange");
}
@Bean
public Queue queue(){
return new Queue("request_queue", true, false, true);
}
@Bean
public Binding bindingReply(){
return BindingBuilder.bind(this.queue()).to(this.exchange()).with("reply_exchange_queue");
}
@Bean
public DirectExchange exchangeReply(){
return new DirectExchange("reply_exchange");
}
@Bean
public Queue replyQueue(){
return new Queue("reply_queue", true, false, true);
}
服务
@Service
public final class OuboundService {
public Message createRequest(String message){
System.out.println("Inside createRequest : "+ message);
final String transactionId = UUID.randomUUID().toString();
final Message builtMessage = MessageBuilder.withBody(message.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setHeader(AmqpHeaders.CORRELATION_ID, transactionId)
.build();
return builtMessage;
}
public Message processRequest(Message message){
System.out.println("Inside process Request : "+ new String(message.getBody()));
System.out.println("Header values : "+message.getMessageProperties().getHeaders());
final Message result = MessageBuilder.withBody("Successful".getBytes()).copyProperties(message.getMessageProperties())
.copyHeaders(message.getMessageProperties().getHeaders()).build();
return result;
}
}
错误:
org.springframework.integration.handler.ReplyRequiredException: 处理程序 'outboundGtwyId' 未产生任何回复,其 'requiresReply' 属性 设置为 true。
GitHub 源代码(解决方案)
https://github.com/kingkongprab/spring-amqp-outbound-gateway
关联也在 Spring AMQP 中完成。有关详细信息,请参阅其 RabbitTemplate#sendAndRecevie()
。在 Reference Manual.
Spring 与其 AbstractAmqpOutboundEndpoint
和 AmqpInboundGateway
实现的集成提供了 out-of-the-box request-reply 相关解决方案。如果你不能在服务器端使用AmqpInboundGateway
,你应该确保correlationId
从接收到的请求传输到要发送回的回复。是的,您可以使用专用交换器进行回复,这就是 RabbitTemplate#setQueue()
支持的等待客户端出站回复的方式。但如果没有适当的 correlation
转移,那仍然行不通。有关 headers(包括 correlationId
)如何映射到 Spring 集成的信息,另请参阅 https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-message-headers。
更新
感谢您分享您的申请。
嗯,现在我看到几个问题:
您肯定缺少
replyQueue
绑定:@Bean public Binding bindingReply(){ return BindingBuilder.bind(this.replyQueue()).to(this.exchangeReply()).with("reply_exchange_queue"); }
RabbitTemplate
必须使用setReplyAddress()
。您必须为reply_queue
配置MessageListenerContainer
并将RabbitTemplate
作为侦听器:@Bean public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){ final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory); template.setReplyAddress(replyQueue().getName()); return template; } @Bean public MessageListenerContainer replyContainer(RabbitTemplate template) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(template.getConnectionFactory()); container.setQueues(replyQueue()); container.setMessageListener(template); return container; }
你的
OuboundService
和org.springframework.amqp.core.Message
操作是没用的。通道适配器不知道这种类型的payload
,您的自定义Message
只是变成另一个org.springframework.amqp.core.Message
的序列化body
。我已经将其更改为这个并且一切正常:public String createRequest(String message){ System.out.println("Inside createRequest : "+ message); return message; } public Message processRequest(Message message){ System.out.println("Inside process Request : " + message); return message; }
无论如何,我建议您重新考虑您的设计并返回 AmqpInboundGateway
。
顺便说一句,在最终的解决方案中你不需要关心任何 correlation
。框架会自动为您完成。