需要验证方法:Spring Integration + AMQP + Async

Need to validate approach: Spring Integration + AMQP + Async

我最近一直在研究 Spring 集成和 AMQP (RabbitMQ),因为我需要使用异步方法来通信两个应用程序(中间件和后端),以便中间件在接收时不会阻塞客户来电。

我首先采用了更简单的同步实现方法,这意味着我在中间件上有一个网关接口和一个出站网关(requiresReply=true),然后是一个入站网关和一个服务激活器后端。这种初始方法效果很好(我使用了 Spring 集成 XML 配置)。

现在我需要说明以异步方式完成这项工作的方法。

通过查看 RabbitMQ 教程 6,最好使用回调队列和 correlationId,根据我的理解,这类似于调用 Spring RabbitTemplate 的 convertAndSend() 然后接收( ),而不是 convertSendAndReceive()(它将阻塞直到收到响应)。

我检查了 Spring 集成文档,我需要在其中将中间件上的网关接口替换为 return Future 或 ListenableFuture。

Async Gateway

完成后,我还查看了 outbound gateway 的文档,其中说它可以与 RabbitTemplate 一起工作来管理 correlationID 和 replyTo 消息属性。

我的问题是:

  1. 为了使用异步方法使其工作,我应该继续使用 outbound/inbound 网关,而不是 outbound/inbound 消息转换器吗?
  2. 如果遵循 outbound/inbound 消息转换器方法(在我看来这类似于 RabbitMQ 教程中显示的内容),我如何将网关接口上的 Future 与从 inbound 返回的结果相关联频道适配器?

老实说,您没有提供原始业务需求。可能确实没有理由处理这个 async 交接,因为你有一个 @Gateway 作为入口点,它是 thread-free,即使它被阻止到等待回复它不会影响可能执行类似 sendAndReceive 操作的其他线程。在大多数情况下,在同一个请求者线程中完成所有事情就足够了,并且不会因转移到共享 ThreadPoolExecutor.

而降低性能。

是的,Future 允许您稍微释放调用者以准备好在同一线程中接受新请求。

因为它是一个 MessagingGateway 并且您无论如何都想得到回复,所以有一个与请求关联的挂钩 - TemporaryReplyChannel header。这就是 <outbound-gateway> 正常工作的原因:它将阻塞回复放置到网关 return(或 FutureTask#set())的通道。

我想说我们可以通过您的异步回复要求实现相同的 TemporaryReplyChannel 收益。

  1. 您应该使用 inbound/outbound 通道适配器对。
  2. 在将消息发送到 <int-amqp:outbound-channel-adapter> 之前,您应该为 <header-enricher> 执行此操作 <header-channels-to-string>
  3. 服务器端可能相同 - <int-amqp:inbound-gateway>
  4. 您应该使用 fixed replyQueue 作为 header 以便这些消息通过 <int-amqp:outbound-channel-adapter>
  5. 发送
  6. 应该为 fixed replyQueue.
  7. 配置 <int-amqp:inbound-channel-adapter>
  8. 客户端的 <int-amqp:outbound-channel-adapter><int-amqp:inbound-gateway> 都必须为 mapped-request-headers="*" 配置,以允许将 reply-channel header 传播到服务器并使用虎钳相反。
  9. 客户端的 <int-amqp:inbound-channel-adapter> 只会将回复发送给 reply-channel,就像 <int-amqp:outbound-gateway>
  10. 一样
  11. 您可能需要手动处理 correlationId,因为 <int-amqp:inbound-gateway> 可能需要它才能正确生成回复。

嗯,类似的东西...

HTH

欢迎提出更多问题。或者如果我误解了你的问题请纠正我。