Spring 集成多线程

Spring integration multithreading

参考我之前在 URL 的问题 - Spring integration multithreading requirement - 我想我可能已经找出问题的根本原因。
我的简要要求 -
在 1 秒的固定延迟后轮询数据库,然后将非常有限的数据发布到 Tibco EMS 队列。现在,我必须从这个 EMS 队列中以多线程方式执行以下任务:-
i) 使用消息,
ii) 现在从数据库中获取完整数据,然后
iii) 将此数据转换为 json 格式。

我的设计 -

`<int:channel id="dbchannel"/>   
    <int-jdbc:inbound-channel-adapter id="dbchanneladapter"  
        channel="dbchannel"  data-source="datasource"  
        query="${selectquery}"  update="${updatequery}"  
        max-rows-per-poll="1000">  
        <int:poller id="dbchanneladapterpoller"  
            fixed-delay="1000">  
            <int:transactional transaction-manager="transactionmanager" />  
        </int:poller>  
    </int-jdbc:inbound-channel-adapter>  
    <int:service-activator input-channel="dbchannel"
        output-channel="publishchannel" ref="jdbcmessagehandler" method="handleJdbcMessage" />  
    <bean id="jdbcmessagehandler" class="com.citigroup.handler.JdbcMessageHandler" />  

    <int:publish-subscribe-channel id="publishchannel"/>  
    <int-jms:outbound-channel-adapter id="publishchanneladapter"
        channel="publishchannel" jms-template="publishrealtimefeedinternaljmstemplate" />  

    <int:channel id="subscribechannel"/>  
    <int-jms:message-driven-channel-adapter
        id="subscribechanneladapter" destination="subscriberealtimeinternalqueue" 
        connection-factory="authenticationconnectionfactory" channel="subscribechannel" 
        concurrent-consumers="5" max-concurrent-consumers="5" />  
    <int:service-activator input-channel="subscribechannel"
        ref="subscribemessagehandler" method="logJMSMessage" />  
    <bean id="subscribemessagehandler" class="com.citigroup.handler.SubscribeJMSMessageHandler" />  
</beans>  

<bean id="authenticationconnectionfactory"
        class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory" ref="connectionFactory" />
        <property name="username" value="test" />
        <property name="password" value="test123" />
    </bean>  

<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate">
        <ref bean="jndiTemplate" />
    </property>
    <property name="jndiName" value="app.jndi.testCF" />
</bean>  

<bean id="subscriberealtimeinternalqueue" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate">
        <ref bean="jndiTemplate" />
    </property>
    <property name="jndiName"
        value="app.queue.testQueue" />
</bean>   
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
        <props>
            <prop key="java.naming.factory.initial">com.tibco.tibjms.naming.TibjmsInitialContextFactory
            </prop>
            <prop key="java.naming.provider.url">tibjmsnaming://test01d.nam.nsroot.net:7222</prop>
        </props>
    </property>
</bean>`


问题 -
使用将并发消费者值设置为 5 的消息驱动通道。但是,看起来只创建了一个消费者线程(container-2)并从 EMS 队列中获取消息。请在 log4j 日志下方找到 -

16 Aug 2018 11:31:12,077 INFO SubscribeJMSMessageHandler [subscribechanneladapter.container-2][]:
从队列中读取的记录总数这一刻是387
记录#1:: [ID=7694066395]
记录#2:: [ID=7694066423]
.. .. ..
记录#387: : [ID=6147457333]

可能的根本原因在这里 -
可能是配置中的第一步,我轮询数据库以在导致此多线程问题的固定延迟后获取数据。参考上面的日志,我这里的假设是因为获取的记录数是 387,并且所有这些都被捆绑到一个列表对象(列表>消息)中,它被认为只是 1 message/payload 而不是 387 条不同的消息这就是为什么只有一个 thread/container/consumer 收到了这条捆绑的消息。这种假设的原因是下面的日志 -

GenericMessage [payload=[{"ID":7694066395},{"ID":7694066423},{"ID":6147457333}] ,
headers={json__ContentTypeId__=class org.springframework.util.LinkedCaseInsensitiveMap, jms_redelivered=false, json__TypeId__=class java.util.ArrayList , jms_destination=队列[app.queue.testQueue], id=e034ba73-7781-b62c-0307-170099263068, 优先级=4, jms_timestamp=1534820792064, contentType=application/json, jms_messageId=ID:test.21415B667C051:40C149C0, 时间戳=1534820792481}]


问题 -
我对根本原因的理解是否正确?如果是,那么如何将这 387 条消息视为单独的消息(而不是消息的一个列表对象)并在不影响事务管理的情况下将它们一一发布?
我在之前的 post 中曾在 Whosebug 上与 https://whosebug.com/users/2756547/artem-bilan 讨论过这个问题,我不得不通过用 ActiveMQ 替换 Tibco EMS 来检查这个设计。但是,我们的架构团队仍在分析 ActiveMQ 基础架构,因此在获得批准之前无法使用。

哦!现在我明白你的问题是什么了。 int-jdbc:inbound-channel-Adapter 实际上是 returns 一个记录列表,它可以 select 来自数据库。整个列表作为一条消息发送到 JMS。这就是您在消费者端只看到一个线程的原因:只有一条消息可以从队列中获取。

如果您希望为每个提取的记录单独发送消息,您需要考虑在 JDBC 轮询操作和发送到 JMS 之间使用 <splitter>