使用 Spring 集成的异步 RabbitMQ 通信
Async RabbitMQ communcation using Spring Integration
我有两个 spring 引导服务使用 RabbitMQ 进行通信。
Service1 向 Service2 发送创建会话的请求。
Service2 处理请求并且应该 return 响应。
Service1 应处理响应。
请求会话的 Service1 方法:
public void startSession()
{
ListenableFuture<SessionCreationResponseDTO> sessionCreationResponse = sessionGateway.requestNewSession();
sessionCreationResponse.addCallback(response -> {
//handle success
}, ex -> {
// handle exception
});
}
我在 Service1 上定义了 AsyncOutboundGateway,例如:
@Bean
public IntegrationFlow requestSessionFlow(MessageChannel requestNewSessionChannel,
AsyncRabbitTemplate amqpTemplate,
SessionProperties sessionProperties)
{
return flow -> flow.channel(requestNewSessionChannel)
.handle(Amqp.asyncOutboundGateway(amqpTemplate)
.exchangeName(sessionProperties.getRequestSession().getExchangeName())
.routingKey(sessionProperties.getRequestSession().getRoutingKey()));
}
在 Service2 上,我有接收这些消息的流程:
@Bean
public IntegrationFlow requestNewSessionFlow(ConnectionFactory connectionFactory,
SessionProperties sessionProperties,
MessageConverter messageConverter,
RequestNewSessionHandler requestNewSessionHandler)
{
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory,
sessionProperties.requestSessionProperties().queueName())
.handle(requestNewSessionHandler)
.get();
Service2 处理那里的请求:
@ServiceActivator(async = "true")
public ListenableFuture<SessionCreationResponseDTO> handleRequestNewSession()
{
SettableListenableFuture<SessionCreationResponseDTO> settableListenableFuture = new SettableListenableFuture<>();
// Goes through asynchronous process of creating session and sets value in listenable future
return settableListenableFuture;
}
问题在于 Service2 立即 return将 ListenableFuture 作为消息负载发送给 Service1,而不是等待未来的结果并发回结果。
如果我通过将 @ServiceActivator
中的异步参数设置为 true 来正确理解文档 Docs,成功的结果应该是 returned,如果出现异常,将使用错误通道。
可能我误解了文档,所以我需要在 return 将其作为响应之前解压缩 Service2 流中的 ListenableFuture,但我不确定如何实现。
我用 publishSubscribeChannel
尝试了一些东西,但运气不佳。
您的问题在这里:
.handle(requestNewSessionHandler)
这样的配置不会看到您的 @ServiceActivator(async = "true")
并将其用作常规的阻塞服务激活器。
让我们看看这是否对您有帮助:
.handle(requestNewSessionHandler, "handleRequestNewSession", e -> e.async(true))
这样想比较好:还是只注解配置。或仅通过 Java DSL 编程。
我有两个 spring 引导服务使用 RabbitMQ 进行通信。 Service1 向 Service2 发送创建会话的请求。 Service2 处理请求并且应该 return 响应。 Service1 应处理响应。
请求会话的 Service1 方法:
public void startSession()
{
ListenableFuture<SessionCreationResponseDTO> sessionCreationResponse = sessionGateway.requestNewSession();
sessionCreationResponse.addCallback(response -> {
//handle success
}, ex -> {
// handle exception
});
}
我在 Service1 上定义了 AsyncOutboundGateway,例如:
@Bean
public IntegrationFlow requestSessionFlow(MessageChannel requestNewSessionChannel,
AsyncRabbitTemplate amqpTemplate,
SessionProperties sessionProperties)
{
return flow -> flow.channel(requestNewSessionChannel)
.handle(Amqp.asyncOutboundGateway(amqpTemplate)
.exchangeName(sessionProperties.getRequestSession().getExchangeName())
.routingKey(sessionProperties.getRequestSession().getRoutingKey()));
}
在 Service2 上,我有接收这些消息的流程:
@Bean
public IntegrationFlow requestNewSessionFlow(ConnectionFactory connectionFactory,
SessionProperties sessionProperties,
MessageConverter messageConverter,
RequestNewSessionHandler requestNewSessionHandler)
{
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory,
sessionProperties.requestSessionProperties().queueName())
.handle(requestNewSessionHandler)
.get();
Service2 处理那里的请求:
@ServiceActivator(async = "true")
public ListenableFuture<SessionCreationResponseDTO> handleRequestNewSession()
{
SettableListenableFuture<SessionCreationResponseDTO> settableListenableFuture = new SettableListenableFuture<>();
// Goes through asynchronous process of creating session and sets value in listenable future
return settableListenableFuture;
}
问题在于 Service2 立即 return将 ListenableFuture 作为消息负载发送给 Service1,而不是等待未来的结果并发回结果。
如果我通过将 @ServiceActivator
中的异步参数设置为 true 来正确理解文档 Docs,成功的结果应该是 returned,如果出现异常,将使用错误通道。
可能我误解了文档,所以我需要在 return 将其作为响应之前解压缩 Service2 流中的 ListenableFuture,但我不确定如何实现。
我用 publishSubscribeChannel
尝试了一些东西,但运气不佳。
您的问题在这里:
.handle(requestNewSessionHandler)
这样的配置不会看到您的 @ServiceActivator(async = "true")
并将其用作常规的阻塞服务激活器。
让我们看看这是否对您有帮助:
.handle(requestNewSessionHandler, "handleRequestNewSession", e -> e.async(true))
这样想比较好:还是只注解配置。或仅通过 Java DSL 编程。