需要验证方法:Spring Integration + AMQP + Async
Need to validate approach: Spring Integration + AMQP + Async
我最近一直在研究 Spring 集成和 AMQP (RabbitMQ),因为我需要使用异步方法来通信两个应用程序(中间件和后端),以便中间件在接收时不会阻塞客户来电。
我首先采用了更简单的同步实现方法,这意味着我在中间件上有一个网关接口和一个出站网关(requiresReply=true),然后是一个入站网关和一个服务激活器后端。这种初始方法效果很好(我使用了 Spring 集成 XML 配置)。
现在我需要说明以异步方式完成这项工作的方法。
通过查看 RabbitMQ 教程 6,最好使用回调队列和 correlationId,根据我的理解,这类似于调用 Spring RabbitTemplate 的 convertAndSend() 然后接收( ),而不是 convertSendAndReceive()(它将阻塞直到收到响应)。
我检查了 Spring 集成文档,我需要在其中将中间件上的网关接口替换为 return Future 或 ListenableFuture。
完成后,我还查看了 outbound gateway 的文档,其中说它可以与 RabbitTemplate 一起工作来管理 correlationID 和 replyTo 消息属性。
我的问题是:
- 为了使用异步方法使其工作,我应该继续使用 outbound/inbound 网关,而不是 outbound/inbound 消息转换器吗?
- 如果遵循 outbound/inbound 消息转换器方法(在我看来这类似于 RabbitMQ 教程中显示的内容),我如何将网关接口上的 Future 与从 inbound 返回的结果相关联频道适配器?
老实说,您没有提供原始业务需求。可能确实没有理由处理这个 async
交接,因为你有一个 @Gateway
作为入口点,它是 thread-free,即使它被阻止到等待回复它不会影响可能执行类似 sendAndReceive
操作的其他线程。在大多数情况下,在同一个请求者线程中完成所有事情就足够了,并且不会因转移到共享 ThreadPoolExecutor
.
而降低性能。
是的,Future
允许您稍微释放调用者以准备好在同一线程中接受新请求。
因为它是一个 MessagingGateway
并且您无论如何都想得到回复,所以有一个与请求关联的挂钩 - TemporaryReplyChannel
header。这就是 <outbound-gateway>
正常工作的原因:它将阻塞回复放置到网关 return
(或 FutureTask#set()
)的通道。
我想说我们可以通过您的异步回复要求实现相同的 TemporaryReplyChannel
收益。
- 您应该使用
inbound
/outbound
通道适配器对。
- 在将消息发送到
<int-amqp:outbound-channel-adapter>
之前,您应该为 <header-enricher>
执行此操作 <header-channels-to-string>
。
- 服务器端可能相同 -
<int-amqp:inbound-gateway>
- 您应该使用 fixed
replyQueue
作为 header 以便这些消息通过 <int-amqp:outbound-channel-adapter>
发送
- 应该为 fixed
replyQueue
. 配置 <int-amqp:inbound-channel-adapter>
- 客户端的
<int-amqp:outbound-channel-adapter>
和 <int-amqp:inbound-gateway>
都必须为 mapped-request-headers="*"
配置,以允许将 reply-channel
header 传播到服务器并使用虎钳相反。
- 客户端的
<int-amqp:inbound-channel-adapter>
只会将回复发送给 reply-channel
,就像 <int-amqp:outbound-gateway>
一样
- 您可能需要手动处理
correlationId
,因为 <int-amqp:inbound-gateway>
可能需要它才能正确生成回复。
嗯,类似的东西...
HTH
欢迎提出更多问题。或者如果我误解了你的问题请纠正我。
我最近一直在研究 Spring 集成和 AMQP (RabbitMQ),因为我需要使用异步方法来通信两个应用程序(中间件和后端),以便中间件在接收时不会阻塞客户来电。
我首先采用了更简单的同步实现方法,这意味着我在中间件上有一个网关接口和一个出站网关(requiresReply=true),然后是一个入站网关和一个服务激活器后端。这种初始方法效果很好(我使用了 Spring 集成 XML 配置)。
现在我需要说明以异步方式完成这项工作的方法。
通过查看 RabbitMQ 教程 6,最好使用回调队列和 correlationId,根据我的理解,这类似于调用 Spring RabbitTemplate 的 convertAndSend() 然后接收( ),而不是 convertSendAndReceive()(它将阻塞直到收到响应)。
我检查了 Spring 集成文档,我需要在其中将中间件上的网关接口替换为 return Future 或 ListenableFuture。
完成后,我还查看了 outbound gateway 的文档,其中说它可以与 RabbitTemplate 一起工作来管理 correlationID 和 replyTo 消息属性。
我的问题是:
- 为了使用异步方法使其工作,我应该继续使用 outbound/inbound 网关,而不是 outbound/inbound 消息转换器吗?
- 如果遵循 outbound/inbound 消息转换器方法(在我看来这类似于 RabbitMQ 教程中显示的内容),我如何将网关接口上的 Future 与从 inbound 返回的结果相关联频道适配器?
老实说,您没有提供原始业务需求。可能确实没有理由处理这个 async
交接,因为你有一个 @Gateway
作为入口点,它是 thread-free,即使它被阻止到等待回复它不会影响可能执行类似 sendAndReceive
操作的其他线程。在大多数情况下,在同一个请求者线程中完成所有事情就足够了,并且不会因转移到共享 ThreadPoolExecutor
.
是的,Future
允许您稍微释放调用者以准备好在同一线程中接受新请求。
因为它是一个 MessagingGateway
并且您无论如何都想得到回复,所以有一个与请求关联的挂钩 - TemporaryReplyChannel
header。这就是 <outbound-gateway>
正常工作的原因:它将阻塞回复放置到网关 return
(或 FutureTask#set()
)的通道。
我想说我们可以通过您的异步回复要求实现相同的 TemporaryReplyChannel
收益。
- 您应该使用
inbound
/outbound
通道适配器对。 - 在将消息发送到
<int-amqp:outbound-channel-adapter>
之前,您应该为<header-enricher>
执行此操作<header-channels-to-string>
。 - 服务器端可能相同 -
<int-amqp:inbound-gateway>
- 您应该使用 fixed
replyQueue
作为 header 以便这些消息通过<int-amqp:outbound-channel-adapter>
发送
- 应该为 fixed
replyQueue
. 配置 - 客户端的
<int-amqp:outbound-channel-adapter>
和<int-amqp:inbound-gateway>
都必须为mapped-request-headers="*"
配置,以允许将reply-channel
header 传播到服务器并使用虎钳相反。 - 客户端的
<int-amqp:inbound-channel-adapter>
只会将回复发送给reply-channel
,就像<int-amqp:outbound-gateway>
一样
- 您可能需要手动处理
correlationId
,因为<int-amqp:inbound-gateway>
可能需要它才能正确生成回复。
<int-amqp:inbound-channel-adapter>
嗯,类似的东西...
HTH
欢迎提出更多问题。或者如果我误解了你的问题请纠正我。