Spring 批处理 & Spring 集成 (JMS) & 负载平衡从站

Spring Batch & Spring Integration (JMS) & Load Balance Slaves

我正在使用

  1. Spring批量

    • 第 1 步
    • 第 2 步大师(分区程序)
    • 步骤 3
  2. Spring 主从集成(JMS)

我们看到的问题是,第一个从站处理所有 JMS 消息,而不是在从站之间平均分配。

配置见下

  1. 硕士

    <bean id="PreProcess" class="com.job.tasklet.PreProcessTasklet" scope="step">
        <constructor-arg index="0" value="${run.slave}"/>
        <property name="maxNumberOfSlaves" value="#{jobParameters['max-slave-count']}"/>
    </bean>
    
    <bean id="PostProcess" class="com.job.tasklet.PostProcessTasklet" scope="prototype">
        <constructor-arg index="0" ref="chpsJobDataSource"/>
    </bean>
    
    
    <bean id="partitioner" class="com.job.partition.DatabasePartitioner" scope="step">
        <constructor-arg index="3" value="${max.row.count}"/>
    </bean>
    
    <bean id="partitionHandler" class="com.job.handler.StepExecutionAggregatorHandler">
        <property name="stepName" value="processAutoHoldSlaveStep"/>
        <property name="gridSize" value="${grid.size}"/>
        <property name="replyChannel" ref="aggregatedGroupRuleReplyChannel"/>
        <property name="messagingOperations">
            <bean class="org.springframework.integration.core.MessagingTemplate">
                <property name="defaultChannel" ref="groupRuleRequestsChannel"/>
            </bean>
        </property>
    </bean>
    

    <!-- Request Start -->
    <int:channel id="groupRuleRequestsChannel" />
    <int-jms:outbound-channel-adapter channel="groupRuleRequestsChannel" jms-template="jmsTemplateToSlave"/>
    
    <bean id="jmsTemplateToSlave" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="receiveTimeout" value="5000"/>
        <property name="defaultDestinationName" value="defaultRequest"/>
    </bean>
    
    <bean id="jmsTemplateFromSlave" class="org.springframework.jms.core.JmsTemplate" parent="jmsTemplateToSlave">
        <property name="defaultDestinationName" value="defaultRequest"/>
    </bean>
    
    
    <!-- Response Test Start -->
    <int:channel id="groupRuleReplyChannel">
        <!-- <int:queue/> -->
    </int:channel>
    
    <int-jms:inbound-channel-adapter channel="groupRuleReplyChannel" jms-template="jmsTemplateFromSlave">
        <int:poller id="defaultPoller" default="true" max-messages-per-poll="1" fixed-rate="3000"  />
    </int-jms:inbound-channel-adapter>
    
    <!-- define aggregatedReplyChannel -->
    <int:channel id="aggregatedGroupRuleReplyChannel">
        <int:queue/>
    </int:channel>
    
    <int:aggregator ref="partitionHandler"
                    input-channel="groupRuleReplyChannel"
                    output-channel="aggregatedGroupRuleReplyChannel"
                    send-timeout="3600000"/>
    
  2. 奴隶

    <int:channel id="requestsChannel" />
    
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${spring.activemq.broker-url}" />
        <property name="trustAllPackages" value="true" />
    </bean>
    
     <int-jms:message-driven-channel-adapter id="jmsIn" destination-name="#{args[0]}" channel="requestsChannel" connection-factory="connectionFactory" max-messages-per-task="1"/>
    
    <int:service-activator input-channel="requestsChannel" output-channel="replyChannel" ref="stepExecutionRequestHandler" />
    
    <int:channel id="replyChannel" />
    
    <int-jms:outbound-channel-adapter connection-factory="connectionFactory" destination-name="#{args[1]}" channel="replyChannel" />
    

如有遇到问题请指教

如果您需要更多信息,请告诉我。

注意:我已经在这里和 google 搜索了很多,但还没有找到解决方案。

ActiveMQ 默认使用 1000 的预取 see here

换句话说,前(最多)1000 个分区将分配给第一个消费者等。

你可以减少预取; 1 可能适合此应用程序。