Spring 集成多线程
Spring integration multithreading
参考我之前在 URL 的问题 - Spring integration multithreading requirement - 我想我可能已经找出问题的根本原因。
我的简要要求 -
在 1 秒的固定延迟后轮询数据库,然后将非常有限的数据发布到 Tibco EMS 队列。现在,我必须从这个 EMS 队列中以多线程方式执行以下任务:-
i) 使用消息,
ii) 现在从数据库中获取完整数据,然后
iii) 将此数据转换为 json 格式。
我的设计 -
`<int:channel id="dbchannel"/>
<int-jdbc:inbound-channel-adapter id="dbchanneladapter"
channel="dbchannel" data-source="datasource"
query="${selectquery}" update="${updatequery}"
max-rows-per-poll="1000">
<int:poller id="dbchanneladapterpoller"
fixed-delay="1000">
<int:transactional transaction-manager="transactionmanager" />
</int:poller>
</int-jdbc:inbound-channel-adapter>
<int:service-activator input-channel="dbchannel"
output-channel="publishchannel" ref="jdbcmessagehandler" method="handleJdbcMessage" />
<bean id="jdbcmessagehandler" class="com.citigroup.handler.JdbcMessageHandler" />
<int:publish-subscribe-channel id="publishchannel"/>
<int-jms:outbound-channel-adapter id="publishchanneladapter"
channel="publishchannel" jms-template="publishrealtimefeedinternaljmstemplate" />
<int:channel id="subscribechannel"/>
<int-jms:message-driven-channel-adapter
id="subscribechanneladapter" destination="subscriberealtimeinternalqueue"
connection-factory="authenticationconnectionfactory" channel="subscribechannel"
concurrent-consumers="5" max-concurrent-consumers="5" />
<int:service-activator input-channel="subscribechannel"
ref="subscribemessagehandler" method="logJMSMessage" />
<bean id="subscribemessagehandler" class="com.citigroup.handler.SubscribeJMSMessageHandler" />
</beans>
<bean id="authenticationconnectionfactory"
class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory" ref="connectionFactory" />
<property name="username" value="test" />
<property name="password" value="test123" />
</bean>
<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate">
<ref bean="jndiTemplate" />
</property>
<property name="jndiName" value="app.jndi.testCF" />
</bean>
<bean id="subscriberealtimeinternalqueue" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate">
<ref bean="jndiTemplate" />
</property>
<property name="jndiName"
value="app.queue.testQueue" />
</bean>
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">com.tibco.tibjms.naming.TibjmsInitialContextFactory
</prop>
<prop key="java.naming.provider.url">tibjmsnaming://test01d.nam.nsroot.net:7222</prop>
</props>
</property>
</bean>`
问题 -
使用将并发消费者值设置为 5 的消息驱动通道。但是,看起来只创建了一个消费者线程(container-2)并从 EMS 队列中获取消息。请在 log4j 日志下方找到 -
16 Aug 2018 11:31:12,077 INFO SubscribeJMSMessageHandler [subscribechanneladapter.container-2][]:
从队列中读取的记录总数这一刻是387
记录#1:: [ID=7694066395]
记录#2:: [ID=7694066423]
.. .. ..
记录#387: : [ID=6147457333]
可能的根本原因在这里 -
可能是配置中的第一步,我轮询数据库以在导致此多线程问题的固定延迟后获取数据。参考上面的日志,我这里的假设是因为获取的记录数是 387,并且所有这些都被捆绑到一个列表对象(列表>消息)中,它被认为只是 1 message/payload 而不是 387 条不同的消息这就是为什么只有一个 thread/container/consumer 收到了这条捆绑的消息。这种假设的原因是下面的日志 -
GenericMessage [payload=[{"ID":7694066395},{"ID":7694066423},{"ID":6147457333}] ,
headers={json__ContentTypeId__=class org.springframework.util.LinkedCaseInsensitiveMap, jms_redelivered=false, json__TypeId__=class java.util.ArrayList , jms_destination=队列[app.queue.testQueue], id=e034ba73-7781-b62c-0307-170099263068, 优先级=4, jms_timestamp=1534820792064, contentType=application/json, jms_messageId=ID:test.21415B667C051:40C149C0, 时间戳=1534820792481}]
问题 -
我对根本原因的理解是否正确?如果是,那么如何将这 387 条消息视为单独的消息(而不是消息的一个列表对象)并在不影响事务管理的情况下将它们一一发布?
我在之前的 post 中曾在 Whosebug 上与 https://whosebug.com/users/2756547/artem-bilan 讨论过这个问题,我不得不通过用 ActiveMQ 替换 Tibco EMS 来检查这个设计。但是,我们的架构团队仍在分析 ActiveMQ 基础架构,因此在获得批准之前无法使用。
哦!现在我明白你的问题是什么了。 int-jdbc:inbound-channel-Adapter
实际上是 returns 一个记录列表,它可以 select 来自数据库。整个列表作为一条消息发送到 JMS。这就是您在消费者端只看到一个线程的原因:只有一条消息可以从队列中获取。
如果您希望为每个提取的记录单独发送消息,您需要考虑在 JDBC 轮询操作和发送到 JMS 之间使用 <splitter>
。
参考我之前在 URL 的问题 - Spring integration multithreading requirement - 我想我可能已经找出问题的根本原因。
我的简要要求 -
在 1 秒的固定延迟后轮询数据库,然后将非常有限的数据发布到 Tibco EMS 队列。现在,我必须从这个 EMS 队列中以多线程方式执行以下任务:-
i) 使用消息,
ii) 现在从数据库中获取完整数据,然后
iii) 将此数据转换为 json 格式。
我的设计 -
`<int:channel id="dbchannel"/>
<int-jdbc:inbound-channel-adapter id="dbchanneladapter"
channel="dbchannel" data-source="datasource"
query="${selectquery}" update="${updatequery}"
max-rows-per-poll="1000">
<int:poller id="dbchanneladapterpoller"
fixed-delay="1000">
<int:transactional transaction-manager="transactionmanager" />
</int:poller>
</int-jdbc:inbound-channel-adapter>
<int:service-activator input-channel="dbchannel"
output-channel="publishchannel" ref="jdbcmessagehandler" method="handleJdbcMessage" />
<bean id="jdbcmessagehandler" class="com.citigroup.handler.JdbcMessageHandler" />
<int:publish-subscribe-channel id="publishchannel"/>
<int-jms:outbound-channel-adapter id="publishchanneladapter"
channel="publishchannel" jms-template="publishrealtimefeedinternaljmstemplate" />
<int:channel id="subscribechannel"/>
<int-jms:message-driven-channel-adapter
id="subscribechanneladapter" destination="subscriberealtimeinternalqueue"
connection-factory="authenticationconnectionfactory" channel="subscribechannel"
concurrent-consumers="5" max-concurrent-consumers="5" />
<int:service-activator input-channel="subscribechannel"
ref="subscribemessagehandler" method="logJMSMessage" />
<bean id="subscribemessagehandler" class="com.citigroup.handler.SubscribeJMSMessageHandler" />
</beans>
<bean id="authenticationconnectionfactory"
class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory" ref="connectionFactory" />
<property name="username" value="test" />
<property name="password" value="test123" />
</bean>
<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate">
<ref bean="jndiTemplate" />
</property>
<property name="jndiName" value="app.jndi.testCF" />
</bean>
<bean id="subscriberealtimeinternalqueue" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate">
<ref bean="jndiTemplate" />
</property>
<property name="jndiName"
value="app.queue.testQueue" />
</bean>
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">com.tibco.tibjms.naming.TibjmsInitialContextFactory
</prop>
<prop key="java.naming.provider.url">tibjmsnaming://test01d.nam.nsroot.net:7222</prop>
</props>
</property>
</bean>`
问题 -
使用将并发消费者值设置为 5 的消息驱动通道。但是,看起来只创建了一个消费者线程(container-2)并从 EMS 队列中获取消息。请在 log4j 日志下方找到 -
16 Aug 2018 11:31:12,077 INFO SubscribeJMSMessageHandler [subscribechanneladapter.container-2][]:
从队列中读取的记录总数这一刻是387
记录#1:: [ID=7694066395]
记录#2:: [ID=7694066423]
.. .. ..
记录#387: : [ID=6147457333]
可能的根本原因在这里 -
可能是配置中的第一步,我轮询数据库以在导致此多线程问题的固定延迟后获取数据。参考上面的日志,我这里的假设是因为获取的记录数是 387,并且所有这些都被捆绑到一个列表对象(列表>消息)中,它被认为只是 1 message/payload 而不是 387 条不同的消息这就是为什么只有一个 thread/container/consumer 收到了这条捆绑的消息。这种假设的原因是下面的日志 -
GenericMessage [payload=[{"ID":7694066395},{"ID":7694066423},{"ID":6147457333}] ,
headers={json__ContentTypeId__=class org.springframework.util.LinkedCaseInsensitiveMap, jms_redelivered=false, json__TypeId__=class java.util.ArrayList , jms_destination=队列[app.queue.testQueue], id=e034ba73-7781-b62c-0307-170099263068, 优先级=4, jms_timestamp=1534820792064, contentType=application/json, jms_messageId=ID:test.21415B667C051:40C149C0, 时间戳=1534820792481}]
问题 -
我对根本原因的理解是否正确?如果是,那么如何将这 387 条消息视为单独的消息(而不是消息的一个列表对象)并在不影响事务管理的情况下将它们一一发布?
我在之前的 post 中曾在 Whosebug 上与 https://whosebug.com/users/2756547/artem-bilan 讨论过这个问题,我不得不通过用 ActiveMQ 替换 Tibco EMS 来检查这个设计。但是,我们的架构团队仍在分析 ActiveMQ 基础架构,因此在获得批准之前无法使用。
哦!现在我明白你的问题是什么了。 int-jdbc:inbound-channel-Adapter
实际上是 returns 一个记录列表,它可以 select 来自数据库。整个列表作为一条消息发送到 JMS。这就是您在消费者端只看到一个线程的原因:只有一条消息可以从队列中获取。
如果您希望为每个提取的记录单独发送消息,您需要考虑在 JDBC 轮询操作和发送到 JMS 之间使用 <splitter>
。