Spring 批处理 & Spring 集成 (JMS) & 负载平衡从站
Spring Batch & Spring Integration (JMS) & Load Balance Slaves
我正在使用
Spring批量
- 第 1 步
- 第 2 步大师(分区程序)
- 步骤 3
Spring 主从集成(JMS)
我们看到的问题是,第一个从站处理所有 JMS 消息,而不是在从站之间平均分配。
配置见下
硕士
<bean id="PreProcess" class="com.job.tasklet.PreProcessTasklet" scope="step">
<constructor-arg index="0" value="${run.slave}"/>
<property name="maxNumberOfSlaves" value="#{jobParameters['max-slave-count']}"/>
</bean>
<bean id="PostProcess" class="com.job.tasklet.PostProcessTasklet" scope="prototype">
<constructor-arg index="0" ref="chpsJobDataSource"/>
</bean>
<bean id="partitioner" class="com.job.partition.DatabasePartitioner" scope="step">
<constructor-arg index="3" value="${max.row.count}"/>
</bean>
<bean id="partitionHandler" class="com.job.handler.StepExecutionAggregatorHandler">
<property name="stepName" value="processAutoHoldSlaveStep"/>
<property name="gridSize" value="${grid.size}"/>
<property name="replyChannel" ref="aggregatedGroupRuleReplyChannel"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="groupRuleRequestsChannel"/>
</bean>
</property>
</bean>
<!-- Request Start -->
<int:channel id="groupRuleRequestsChannel" />
<int-jms:outbound-channel-adapter channel="groupRuleRequestsChannel" jms-template="jmsTemplateToSlave"/>
<bean id="jmsTemplateToSlave" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="receiveTimeout" value="5000"/>
<property name="defaultDestinationName" value="defaultRequest"/>
</bean>
<bean id="jmsTemplateFromSlave" class="org.springframework.jms.core.JmsTemplate" parent="jmsTemplateToSlave">
<property name="defaultDestinationName" value="defaultRequest"/>
</bean>
<!-- Response Test Start -->
<int:channel id="groupRuleReplyChannel">
<!-- <int:queue/> -->
</int:channel>
<int-jms:inbound-channel-adapter channel="groupRuleReplyChannel" jms-template="jmsTemplateFromSlave">
<int:poller id="defaultPoller" default="true" max-messages-per-poll="1" fixed-rate="3000" />
</int-jms:inbound-channel-adapter>
<!-- define aggregatedReplyChannel -->
<int:channel id="aggregatedGroupRuleReplyChannel">
<int:queue/>
</int:channel>
<int:aggregator ref="partitionHandler"
input-channel="groupRuleReplyChannel"
output-channel="aggregatedGroupRuleReplyChannel"
send-timeout="3600000"/>
奴隶
<int:channel id="requestsChannel" />
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="${spring.activemq.broker-url}" />
<property name="trustAllPackages" value="true" />
</bean>
<int-jms:message-driven-channel-adapter id="jmsIn" destination-name="#{args[0]}" channel="requestsChannel" connection-factory="connectionFactory" max-messages-per-task="1"/>
<int:service-activator input-channel="requestsChannel" output-channel="replyChannel" ref="stepExecutionRequestHandler" />
<int:channel id="replyChannel" />
<int-jms:outbound-channel-adapter connection-factory="connectionFactory" destination-name="#{args[1]}" channel="replyChannel" />
如有遇到问题请指教
如果您需要更多信息,请告诉我。
注意:我已经在这里和 google 搜索了很多,但还没有找到解决方案。
ActiveMQ 默认使用 1000 的预取 see here。
换句话说,前(最多)1000 个分区将分配给第一个消费者等。
你可以减少预取; 1 可能适合此应用程序。
我正在使用
Spring批量
- 第 1 步
- 第 2 步大师(分区程序)
- 步骤 3
Spring 主从集成(JMS)
我们看到的问题是,第一个从站处理所有 JMS 消息,而不是在从站之间平均分配。
配置见下
硕士
<bean id="PreProcess" class="com.job.tasklet.PreProcessTasklet" scope="step"> <constructor-arg index="0" value="${run.slave}"/> <property name="maxNumberOfSlaves" value="#{jobParameters['max-slave-count']}"/> </bean> <bean id="PostProcess" class="com.job.tasklet.PostProcessTasklet" scope="prototype"> <constructor-arg index="0" ref="chpsJobDataSource"/> </bean> <bean id="partitioner" class="com.job.partition.DatabasePartitioner" scope="step"> <constructor-arg index="3" value="${max.row.count}"/> </bean> <bean id="partitionHandler" class="com.job.handler.StepExecutionAggregatorHandler"> <property name="stepName" value="processAutoHoldSlaveStep"/> <property name="gridSize" value="${grid.size}"/> <property name="replyChannel" ref="aggregatedGroupRuleReplyChannel"/> <property name="messagingOperations"> <bean class="org.springframework.integration.core.MessagingTemplate"> <property name="defaultChannel" ref="groupRuleRequestsChannel"/> </bean> </property> </bean>
<!-- Request Start --> <int:channel id="groupRuleRequestsChannel" /> <int-jms:outbound-channel-adapter channel="groupRuleRequestsChannel" jms-template="jmsTemplateToSlave"/> <bean id="jmsTemplateToSlave" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="receiveTimeout" value="5000"/> <property name="defaultDestinationName" value="defaultRequest"/> </bean> <bean id="jmsTemplateFromSlave" class="org.springframework.jms.core.JmsTemplate" parent="jmsTemplateToSlave"> <property name="defaultDestinationName" value="defaultRequest"/> </bean> <!-- Response Test Start --> <int:channel id="groupRuleReplyChannel"> <!-- <int:queue/> --> </int:channel> <int-jms:inbound-channel-adapter channel="groupRuleReplyChannel" jms-template="jmsTemplateFromSlave"> <int:poller id="defaultPoller" default="true" max-messages-per-poll="1" fixed-rate="3000" /> </int-jms:inbound-channel-adapter> <!-- define aggregatedReplyChannel --> <int:channel id="aggregatedGroupRuleReplyChannel"> <int:queue/> </int:channel> <int:aggregator ref="partitionHandler" input-channel="groupRuleReplyChannel" output-channel="aggregatedGroupRuleReplyChannel" send-timeout="3600000"/>
奴隶
<int:channel id="requestsChannel" /> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="${spring.activemq.broker-url}" /> <property name="trustAllPackages" value="true" /> </bean> <int-jms:message-driven-channel-adapter id="jmsIn" destination-name="#{args[0]}" channel="requestsChannel" connection-factory="connectionFactory" max-messages-per-task="1"/> <int:service-activator input-channel="requestsChannel" output-channel="replyChannel" ref="stepExecutionRequestHandler" /> <int:channel id="replyChannel" /> <int-jms:outbound-channel-adapter connection-factory="connectionFactory" destination-name="#{args[1]}" channel="replyChannel" />
如有遇到问题请指教
如果您需要更多信息,请告诉我。
注意:我已经在这里和 google 搜索了很多,但还没有找到解决方案。
ActiveMQ 默认使用 1000 的预取 see here。
换句话说,前(最多)1000 个分区将分配给第一个消费者等。
你可以减少预取; 1 可能适合此应用程序。