Spring 批处理 JDBCPagingItemReader 没有为每个线程平均分区

Spring Batch JDBCPagingItemReader not partitioning equally for each thread

这是我的第一个问题。我正在处理 spring 批处理,我正在使用步进分区来处理 70K 条记录。为了进行测试,我使用了 1021 条记录,发现每个线程的分区情况并不相同。我正在使用具有 5 个线程的 JDBCPagingItemReader。分布应该是

线程 1 - 205

线程 2 - 205

线程 3 - 205

线程 4 - 205

线程 5 - 201

但不幸的是,这并没有发生,我得到了线程之间的以下记录分布

线程 1 - 100

线程 2 - 111

线程 3 - 100

线程 4 - 205

线程 5 - 200

分区时总共跳过了716条记录和305条记录。我真的不知道发生了什么。你能看看下面的配置,让我知道我遗漏了什么吗?预先感谢您的帮助。

<import resource="../config/batch-context.xml" />
<import resource="../config/database.xml" />

<job id="partitionJob"  xmlns="http://www.springframework.org/schema/batch">

    <step id="masterStep" parent="abstractPartitionerStagedStep">

        <partition step="slave" partitioner="rangePartitioner">
            <handler grid-size="5" task-executor="taskExecutor"/>
        </partition>

    </step>

</job>
<bean id="abstractPartitionerStagedStep" abstract="true">
    <property name="listeners">
        <list>
            <ref bean="updatelistener" />
        </list>
    </property>

</bean>
<bean id="updatelistener" 
      class="com.test.springbatch.model.UpdateFileCopyStatus" >
</bean>
<!-- Jobs to run -->
<step id="slave" xmlns="http://www.springframework.org/schema/batch">
    <tasklet>
        <chunk reader="pagingItemReader" writer="flatFileItemWriter"
            processor="itemProcessor" commit-interval="1" retry-limit="0" skip-limit="100">
        <skippable-exception-classes>
            <include class="java.lang.Exception"/>
        </skippable-exception-classes>
        </chunk>    
    </tasklet>
</step>

<bean id="rangePartitioner" class="com.test.springbatch.partition.RangePartitioner"> 
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
<property name="corePoolSize" value="5"/>
<property name="maxPoolSize" value="5"/>
<property name="queueCapacity" value="100" />
<property name="allowCoreThreadTimeOut" value="true"/>
<property name="keepAliveSeconds" value="60" /> 
</bean>

<bean id="itemProcessor" class="com.test.springbatch.processor.CaseProcessor" scope="step">
    <property name="threadName" value="#{stepExecutionContext[name]}" />
</bean>

<bean id="pagingItemReader"
    class="org.springframework.batch.item.database.JdbcPagingItemReader"
    scope="step">
    <property name="dataSource" ref="dataSource" />
    <property name="queryProvider">
        <bean
            class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
            <property name="dataSource" ref="dataSource" />
            <property name="selectClause" value="SELECT *" />
            <property name="fromClause" value="FROM ( SELECT CASE_NUM ,CASE_STTS_CD, UPDT_TS,SBMT_OFC_CD,
                        SBMT_OFC_NUM,DSTR_CHNL_CD,APRV_OFC_CD,APRV_OFC_NUM,SBMT_TYP_CD, ROW_NUMBER() 
                        OVER(ORDER BY CASE_NUM) AS rownumber FROM TSMCASE WHERE PROC_IND ='N' ) AS data" />
            <property name="whereClause" value="WHERE rownumber BETWEEN :fromRow AND :toRow " />
            <property name="sortKey" value="CASE_NUM" />
        </bean>
    </property>
    <!--  Inject via the ExecutionContext in rangePartitioner  -->
    <property name="parameterValues">
        <map>
            <entry key="fromRow" value="#{stepExecutionContext[fromRow]}" />
            <entry key="toRow" value="#{stepExecutionContext[toRow]}" />
        </map>
    </property>
    <property name="pageSize" value="100" /> 
    <property name="rowMapper">
        <bean class="com.test.springbatch.model.CaseRowMapper" />
    </property>
</bean>

<bean id="flatFileItemWriter" class="com.test.springbatch.writer.FNWriter" scope="step" >
</bean>

这里是分区程序代码

public class OffRangePartitioner implements Partitioner {

private String officeLst;
private double splitvalue;
private DataSource dataSource;
private static Logger LOGGER = Log4JFactory.getLogger(OffRangePartitioner.class);
private static final int INDENT_LEVEL = 6;

public String getOfficeLst() {
    return officeLst;
}

public void setOfficeLst(final String officeLst) {
    this.officeLst = officeLst;
}

public void setDataSource(DataSource dataSource) {
    this.dataSource = dataSource;
}

public OfficeRangePartitioner() {
    super();
    final GlobalProperties globalProperties = GlobalProperties.getInstance();
    splitvalue = Double.parseDouble(globalProperties.getProperty("springbatch.part.splitvalue"));
}

@Override
public Map<String, ExecutionContext> partition(int threadSize) {
    FormattedTraceHelper.formattedTrace(LOGGER,"Partition method in OffRangePartitioner class Start",INDENT_LEVEL, Level.INFO_INT);
    final Session currentSession = HibernateUtil.getSessionFactory(HibernateConstants.DB2_DATABASE_NAME).getCurrentSession();

    Query queryObj;
    double count = 0.0;

    final Transaction transaction = currentSession.beginTransaction();
    queryObj = currentSession.createQuery(BatchConstants.PARTITION_CNT_QRY);

    if (queryObj.iterate().hasNext()) {
        count = Double.parseDouble(queryObj.iterate().next().toString());
    }

    int fromRow = 0;
    int toRow = 0;
    ExecutionContext context;

    FormattedTraceHelper.formattedTrace(LOGGER,"Count of total records submitted for processing >> " + count, INDENT_LEVEL, Level.DEBUG_INT);
    int gridSize = (int) Math.ceil(count / splitvalue);
    FormattedTraceHelper.formattedTrace(LOGGER,"Total Grid size based on the count >> " + gridSize, INDENT_LEVEL, Level.DEBUG_INT);
    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();

    for (int threadCount = 1; threadCount <= gridSize; threadCount++) {
        fromRow = toRow + 1;
        if (threadCount == gridSize || gridSize == 1) {
            toRow = (int) count;
        } else {
            toRow += splitvalue;
        }
        context = new ExecutionContext();
        context.putInt("fromRow", fromRow);
        context.putInt("toRow", toRow);
        context.putString("name", "Processing Thread" + threadCount);
        result.put("partition" + threadCount, context);
        FormattedTraceHelper.formattedTrace(LOGGER, "Partition number >> "
                + threadCount + " from Row#: " + fromRow + " to Row#: "
                + toRow, INDENT_LEVEL, Level.DEBUG_INT);

    }
    if (transaction != null) {
        transaction.commit();
    }
    FormattedTraceHelper.formattedTrace(LOGGER,
            "Partition method in OffRangePartitioner class End",
            INDENT_LEVEL, Level.INFO_INT);
    return result;
}

}

今天,我在 Spring 框架日志调试打开的情况下测试了具有 1056 条记录的同一批次。

页面大小 100

SELECT * FROM (
        SELECT CASE_NUM, CASE_STTS_CD, UPDT_TS,SBMT_OFC_CD, SBMT_OFC_NUM, DSTR_CHNL_CD, 
            APRV_OFC_CD, APRV_OFC_NUM,SBMT_TYP_CD, ROW_NUMBER() OVER(ORDER BY CASE_NUM) AS rownumber 
        FROM TCASE 
        WHERE **SECARCH_PROC_IND ='P'**
    ) AS data 
WHERE 
    rownumber BETWEEN :fromRow AND :toRow 
ORDER BY 
    rownumber ASC 
FETCH FIRST 100 ROWS ONLY

我们正在将 SECARCH_PROC_IND ='P' 标志更新为 'C' 每条记录处理完毕。我们在主查询中使用 ROWNUM 根据 SECARCH_PROC_IND ='P' 对记录进行分区,一旦 SECARCH_PROC_IND ='P' 标志更新为 [=53=,ROWNUM 就会发生变化]任何线程。

看来这就是问题所在。

Spring 批量触发以下查询以从数据库中获取数据

SELECT * FROM ( SELECT CASE_NUM, CASE_STTS_CD, UPDT_TS,SBMT_OFC_CD, SBMT_OFC_NUM, DSTR_CHNL_CD, APRV_OFC_CD, APRV_OFC_NUM,SBMT_TYP_CD, **ROW_NUMBER()** OVER(ORDER BY CASE_NUM) AS rownumber FROM TCASE WHERE **SECARCH_PROC_IND ='P'** ) AS data WHERE rownumber BETWEEN :fromRow AND :toRow ORDER BY rownumber ASC FETCH FIRST 100 ROWS ONLY

处理每一行后,标志 SECARCH_PROC_IND ='P' 更新为 SECARCH_PROC_IND ='C'。由于 SECARCH_PROC_IND 在 WHERE 子句中使用,这实际上减少了 ROW_NUMBER 在下一个由 spring批次。这是问题的根本原因。

我们在 table 中引入了另一列 SECARCH_PROC_TMP_IND,我们在批处理之前使用 beforeJob() 中的标志 'P' 对其进行了更新方法,我们在查询的 WHERE 子句中使用该列,而不是使用 SECARCH_PROC_IND 列。

批处理后,在 afterJob() 中,我们将 SECARCH_PROC_TMP_IND 重新设置为 NULL。

这解决了分区问题。