Spring 集成:从 JMS 读取 -> 处理消息 -> 保存在数据库中

Spring Integration : Read from JMS -> Handle message -> Persist in DB

我使用 Spring 集成从 JMS 读取消息,处理它们,然后使用我自己的 dbPersistor 的持久方法将它们持久保存到数据库,该方法具有 return 类型 void。我编写了一个测试用例来验证发布到 JMS 的消息是否已成功保存在数据库中。本次测试我的 SI 和 JMS 配置如下 -

<int:poller fixed-delay="500" default="true"/>

<int:channel id="inputChannel">
    <int:queue/>
</int:channel>
<int:channel id="errorChannel">
    <int:queue/>
</int:channel>
<jms:message-driven-channel-adapter id="jmsInboudAdapter"         
   connection-factory="connectionFactory" destination-name="MessageQueue" 
   channel="inputChannel" error-channel="errorChannel" transaction-manager="dbTxManager"                                    
   acknowledge="transacted"/>

<int:chain id="handlerChain" input-channel="inputChannel">
    <int:service-activator ref="jmsMessageHandler" method="handleMessage" />
    <int:service-activator ref="dbPersistor" method="persist" />   
</int:chain>

然后在测试中我执行以下操作 -

当我只向数据库发布一条消息时,这非常有用。但是当我通过 jmsTemplate.send() 循环发布多条消息时,主线程完成操作,而 SI 线程仍在执行并尝试验证数据库中的消息但由于某些消息尚未保留而失败.我的问题是 -

  1. 如何让主线程等待SI线程完成,然后调用验证方法?
  2. 如果发生数据库异常和回滚,我如何验证失败的消息是否返回到原始队列中?

谢谢 阿杰

  1. inputChannel 不应是队列通道 - 当消息插入队列时 JMS 事务将提交 - 数据库事务不会在 JMS 范围内执行交易。您必须为此使用直接通道(删除轮询器和 <queue/>)。见 the documentation on transactions in Spring Integration.

  2. 您将必须轮询数据库以获取结果;您可能会添加一个拦截器和一些 CountDownLatch,但在结果出现或某个时间到期之前只轮询数据库会更容易。