Spring 批处理 JDBCPagingItemReader 没有为每个线程平均分区
Spring Batch JDBCPagingItemReader not partitioning equally for each thread
这是我的第一个问题。我正在处理 spring 批处理,我正在使用步进分区来处理 70K 条记录。为了进行测试,我使用了 1021 条记录,发现每个线程的分区情况并不相同。我正在使用具有 5 个线程的 JDBCPagingItemReader。分布应该是
线程 1 - 205
线程 2 - 205
线程 3 - 205
线程 4 - 205
线程 5 - 201
但不幸的是,这并没有发生,我得到了线程之间的以下记录分布
线程 1 - 100
线程 2 - 111
线程 3 - 100
线程 4 - 205
线程 5 - 200
分区时总共跳过了716条记录和305条记录。我真的不知道发生了什么。你能看看下面的配置,让我知道我遗漏了什么吗?预先感谢您的帮助。
<import resource="../config/batch-context.xml" />
<import resource="../config/database.xml" />
<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch">
<step id="masterStep" parent="abstractPartitionerStagedStep">
<partition step="slave" partitioner="rangePartitioner">
<handler grid-size="5" task-executor="taskExecutor"/>
</partition>
</step>
</job>
<bean id="abstractPartitionerStagedStep" abstract="true">
<property name="listeners">
<list>
<ref bean="updatelistener" />
</list>
</property>
</bean>
<bean id="updatelistener"
class="com.test.springbatch.model.UpdateFileCopyStatus" >
</bean>
<!-- Jobs to run -->
<step id="slave" xmlns="http://www.springframework.org/schema/batch">
<tasklet>
<chunk reader="pagingItemReader" writer="flatFileItemWriter"
processor="itemProcessor" commit-interval="1" retry-limit="0" skip-limit="100">
<skippable-exception-classes>
<include class="java.lang.Exception"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
<bean id="rangePartitioner" class="com.test.springbatch.partition.RangePartitioner">
<property name="dataSource" ref="dataSource" />
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
<property name="corePoolSize" value="5"/>
<property name="maxPoolSize" value="5"/>
<property name="queueCapacity" value="100" />
<property name="allowCoreThreadTimeOut" value="true"/>
<property name="keepAliveSeconds" value="60" />
</bean>
<bean id="itemProcessor" class="com.test.springbatch.processor.CaseProcessor" scope="step">
<property name="threadName" value="#{stepExecutionContext[name]}" />
</bean>
<bean id="pagingItemReader"
class="org.springframework.batch.item.database.JdbcPagingItemReader"
scope="step">
<property name="dataSource" ref="dataSource" />
<property name="queryProvider">
<bean
class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="selectClause" value="SELECT *" />
<property name="fromClause" value="FROM ( SELECT CASE_NUM ,CASE_STTS_CD, UPDT_TS,SBMT_OFC_CD,
SBMT_OFC_NUM,DSTR_CHNL_CD,APRV_OFC_CD,APRV_OFC_NUM,SBMT_TYP_CD, ROW_NUMBER()
OVER(ORDER BY CASE_NUM) AS rownumber FROM TSMCASE WHERE PROC_IND ='N' ) AS data" />
<property name="whereClause" value="WHERE rownumber BETWEEN :fromRow AND :toRow " />
<property name="sortKey" value="CASE_NUM" />
</bean>
</property>
<!-- Inject via the ExecutionContext in rangePartitioner -->
<property name="parameterValues">
<map>
<entry key="fromRow" value="#{stepExecutionContext[fromRow]}" />
<entry key="toRow" value="#{stepExecutionContext[toRow]}" />
</map>
</property>
<property name="pageSize" value="100" />
<property name="rowMapper">
<bean class="com.test.springbatch.model.CaseRowMapper" />
</property>
</bean>
<bean id="flatFileItemWriter" class="com.test.springbatch.writer.FNWriter" scope="step" >
</bean>
这里是分区程序代码
public class OffRangePartitioner implements Partitioner {
private String officeLst;
private double splitvalue;
private DataSource dataSource;
private static Logger LOGGER = Log4JFactory.getLogger(OffRangePartitioner.class);
private static final int INDENT_LEVEL = 6;
public String getOfficeLst() {
return officeLst;
}
public void setOfficeLst(final String officeLst) {
this.officeLst = officeLst;
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
public OfficeRangePartitioner() {
super();
final GlobalProperties globalProperties = GlobalProperties.getInstance();
splitvalue = Double.parseDouble(globalProperties.getProperty("springbatch.part.splitvalue"));
}
@Override
public Map<String, ExecutionContext> partition(int threadSize) {
FormattedTraceHelper.formattedTrace(LOGGER,"Partition method in OffRangePartitioner class Start",INDENT_LEVEL, Level.INFO_INT);
final Session currentSession = HibernateUtil.getSessionFactory(HibernateConstants.DB2_DATABASE_NAME).getCurrentSession();
Query queryObj;
double count = 0.0;
final Transaction transaction = currentSession.beginTransaction();
queryObj = currentSession.createQuery(BatchConstants.PARTITION_CNT_QRY);
if (queryObj.iterate().hasNext()) {
count = Double.parseDouble(queryObj.iterate().next().toString());
}
int fromRow = 0;
int toRow = 0;
ExecutionContext context;
FormattedTraceHelper.formattedTrace(LOGGER,"Count of total records submitted for processing >> " + count, INDENT_LEVEL, Level.DEBUG_INT);
int gridSize = (int) Math.ceil(count / splitvalue);
FormattedTraceHelper.formattedTrace(LOGGER,"Total Grid size based on the count >> " + gridSize, INDENT_LEVEL, Level.DEBUG_INT);
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
for (int threadCount = 1; threadCount <= gridSize; threadCount++) {
fromRow = toRow + 1;
if (threadCount == gridSize || gridSize == 1) {
toRow = (int) count;
} else {
toRow += splitvalue;
}
context = new ExecutionContext();
context.putInt("fromRow", fromRow);
context.putInt("toRow", toRow);
context.putString("name", "Processing Thread" + threadCount);
result.put("partition" + threadCount, context);
FormattedTraceHelper.formattedTrace(LOGGER, "Partition number >> "
+ threadCount + " from Row#: " + fromRow + " to Row#: "
+ toRow, INDENT_LEVEL, Level.DEBUG_INT);
}
if (transaction != null) {
transaction.commit();
}
FormattedTraceHelper.formattedTrace(LOGGER,
"Partition method in OffRangePartitioner class End",
INDENT_LEVEL, Level.INFO_INT);
return result;
}
}
今天,我在 Spring 框架日志调试打开的情况下测试了具有 1056 条记录的同一批次。
页面大小 100
SELECT * FROM (
SELECT CASE_NUM, CASE_STTS_CD, UPDT_TS,SBMT_OFC_CD, SBMT_OFC_NUM, DSTR_CHNL_CD,
APRV_OFC_CD, APRV_OFC_NUM,SBMT_TYP_CD, ROW_NUMBER() OVER(ORDER BY CASE_NUM) AS rownumber
FROM TCASE
WHERE **SECARCH_PROC_IND ='P'**
) AS data
WHERE
rownumber BETWEEN :fromRow AND :toRow
ORDER BY
rownumber ASC
FETCH FIRST 100 ROWS ONLY
我们正在将 SECARCH_PROC_IND ='P' 标志更新为 'C' 每条记录处理完毕。我们在主查询中使用 ROWNUM 根据 SECARCH_PROC_IND ='P' 对记录进行分区,一旦 SECARCH_PROC_IND ='P' 标志更新为 [=53=,ROWNUM 就会发生变化]任何线程。
看来这就是问题所在。
Spring 批量触发以下查询以从数据库中获取数据
SELECT * FROM ( SELECT CASE_NUM, CASE_STTS_CD, UPDT_TS,SBMT_OFC_CD, SBMT_OFC_NUM, DSTR_CHNL_CD, APRV_OFC_CD, APRV_OFC_NUM,SBMT_TYP_CD, **ROW_NUMBER()** OVER(ORDER BY CASE_NUM) AS rownumber FROM TCASE WHERE **SECARCH_PROC_IND ='P'** ) AS data WHERE rownumber BETWEEN :fromRow AND :toRow ORDER BY rownumber ASC FETCH FIRST 100 ROWS ONLY
处理每一行后,标志 SECARCH_PROC_IND ='P' 更新为 SECARCH_PROC_IND ='C'。由于 SECARCH_PROC_IND 在 WHERE 子句中使用,这实际上减少了 ROW_NUMBER 在下一个由 spring批次。这是问题的根本原因。
我们在 table 中引入了另一列 SECARCH_PROC_TMP_IND,我们在批处理之前使用 beforeJob() 中的标志 'P' 对其进行了更新方法,我们在查询的 WHERE 子句中使用该列,而不是使用 SECARCH_PROC_IND 列。
批处理后,在 afterJob() 中,我们将 SECARCH_PROC_TMP_IND 重新设置为 NULL。
这解决了分区问题。
这是我的第一个问题。我正在处理 spring 批处理,我正在使用步进分区来处理 70K 条记录。为了进行测试,我使用了 1021 条记录,发现每个线程的分区情况并不相同。我正在使用具有 5 个线程的 JDBCPagingItemReader。分布应该是
线程 1 - 205
线程 2 - 205
线程 3 - 205
线程 4 - 205
线程 5 - 201
但不幸的是,这并没有发生,我得到了线程之间的以下记录分布
线程 1 - 100
线程 2 - 111
线程 3 - 100
线程 4 - 205
线程 5 - 200
分区时总共跳过了716条记录和305条记录。我真的不知道发生了什么。你能看看下面的配置,让我知道我遗漏了什么吗?预先感谢您的帮助。
<import resource="../config/batch-context.xml" />
<import resource="../config/database.xml" />
<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch">
<step id="masterStep" parent="abstractPartitionerStagedStep">
<partition step="slave" partitioner="rangePartitioner">
<handler grid-size="5" task-executor="taskExecutor"/>
</partition>
</step>
</job>
<bean id="abstractPartitionerStagedStep" abstract="true">
<property name="listeners">
<list>
<ref bean="updatelistener" />
</list>
</property>
</bean>
<bean id="updatelistener"
class="com.test.springbatch.model.UpdateFileCopyStatus" >
</bean>
<!-- Jobs to run -->
<step id="slave" xmlns="http://www.springframework.org/schema/batch">
<tasklet>
<chunk reader="pagingItemReader" writer="flatFileItemWriter"
processor="itemProcessor" commit-interval="1" retry-limit="0" skip-limit="100">
<skippable-exception-classes>
<include class="java.lang.Exception"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
<bean id="rangePartitioner" class="com.test.springbatch.partition.RangePartitioner">
<property name="dataSource" ref="dataSource" />
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
<property name="corePoolSize" value="5"/>
<property name="maxPoolSize" value="5"/>
<property name="queueCapacity" value="100" />
<property name="allowCoreThreadTimeOut" value="true"/>
<property name="keepAliveSeconds" value="60" />
</bean>
<bean id="itemProcessor" class="com.test.springbatch.processor.CaseProcessor" scope="step">
<property name="threadName" value="#{stepExecutionContext[name]}" />
</bean>
<bean id="pagingItemReader"
class="org.springframework.batch.item.database.JdbcPagingItemReader"
scope="step">
<property name="dataSource" ref="dataSource" />
<property name="queryProvider">
<bean
class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="selectClause" value="SELECT *" />
<property name="fromClause" value="FROM ( SELECT CASE_NUM ,CASE_STTS_CD, UPDT_TS,SBMT_OFC_CD,
SBMT_OFC_NUM,DSTR_CHNL_CD,APRV_OFC_CD,APRV_OFC_NUM,SBMT_TYP_CD, ROW_NUMBER()
OVER(ORDER BY CASE_NUM) AS rownumber FROM TSMCASE WHERE PROC_IND ='N' ) AS data" />
<property name="whereClause" value="WHERE rownumber BETWEEN :fromRow AND :toRow " />
<property name="sortKey" value="CASE_NUM" />
</bean>
</property>
<!-- Inject via the ExecutionContext in rangePartitioner -->
<property name="parameterValues">
<map>
<entry key="fromRow" value="#{stepExecutionContext[fromRow]}" />
<entry key="toRow" value="#{stepExecutionContext[toRow]}" />
</map>
</property>
<property name="pageSize" value="100" />
<property name="rowMapper">
<bean class="com.test.springbatch.model.CaseRowMapper" />
</property>
</bean>
<bean id="flatFileItemWriter" class="com.test.springbatch.writer.FNWriter" scope="step" >
</bean>
这里是分区程序代码
public class OffRangePartitioner implements Partitioner {
private String officeLst;
private double splitvalue;
private DataSource dataSource;
private static Logger LOGGER = Log4JFactory.getLogger(OffRangePartitioner.class);
private static final int INDENT_LEVEL = 6;
public String getOfficeLst() {
return officeLst;
}
public void setOfficeLst(final String officeLst) {
this.officeLst = officeLst;
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
public OfficeRangePartitioner() {
super();
final GlobalProperties globalProperties = GlobalProperties.getInstance();
splitvalue = Double.parseDouble(globalProperties.getProperty("springbatch.part.splitvalue"));
}
@Override
public Map<String, ExecutionContext> partition(int threadSize) {
FormattedTraceHelper.formattedTrace(LOGGER,"Partition method in OffRangePartitioner class Start",INDENT_LEVEL, Level.INFO_INT);
final Session currentSession = HibernateUtil.getSessionFactory(HibernateConstants.DB2_DATABASE_NAME).getCurrentSession();
Query queryObj;
double count = 0.0;
final Transaction transaction = currentSession.beginTransaction();
queryObj = currentSession.createQuery(BatchConstants.PARTITION_CNT_QRY);
if (queryObj.iterate().hasNext()) {
count = Double.parseDouble(queryObj.iterate().next().toString());
}
int fromRow = 0;
int toRow = 0;
ExecutionContext context;
FormattedTraceHelper.formattedTrace(LOGGER,"Count of total records submitted for processing >> " + count, INDENT_LEVEL, Level.DEBUG_INT);
int gridSize = (int) Math.ceil(count / splitvalue);
FormattedTraceHelper.formattedTrace(LOGGER,"Total Grid size based on the count >> " + gridSize, INDENT_LEVEL, Level.DEBUG_INT);
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
for (int threadCount = 1; threadCount <= gridSize; threadCount++) {
fromRow = toRow + 1;
if (threadCount == gridSize || gridSize == 1) {
toRow = (int) count;
} else {
toRow += splitvalue;
}
context = new ExecutionContext();
context.putInt("fromRow", fromRow);
context.putInt("toRow", toRow);
context.putString("name", "Processing Thread" + threadCount);
result.put("partition" + threadCount, context);
FormattedTraceHelper.formattedTrace(LOGGER, "Partition number >> "
+ threadCount + " from Row#: " + fromRow + " to Row#: "
+ toRow, INDENT_LEVEL, Level.DEBUG_INT);
}
if (transaction != null) {
transaction.commit();
}
FormattedTraceHelper.formattedTrace(LOGGER,
"Partition method in OffRangePartitioner class End",
INDENT_LEVEL, Level.INFO_INT);
return result;
}
}
今天,我在 Spring 框架日志调试打开的情况下测试了具有 1056 条记录的同一批次。
页面大小 100
SELECT * FROM (
SELECT CASE_NUM, CASE_STTS_CD, UPDT_TS,SBMT_OFC_CD, SBMT_OFC_NUM, DSTR_CHNL_CD,
APRV_OFC_CD, APRV_OFC_NUM,SBMT_TYP_CD, ROW_NUMBER() OVER(ORDER BY CASE_NUM) AS rownumber
FROM TCASE
WHERE **SECARCH_PROC_IND ='P'**
) AS data
WHERE
rownumber BETWEEN :fromRow AND :toRow
ORDER BY
rownumber ASC
FETCH FIRST 100 ROWS ONLY
我们正在将 SECARCH_PROC_IND ='P' 标志更新为 'C' 每条记录处理完毕。我们在主查询中使用 ROWNUM 根据 SECARCH_PROC_IND ='P' 对记录进行分区,一旦 SECARCH_PROC_IND ='P' 标志更新为 [=53=,ROWNUM 就会发生变化]任何线程。
看来这就是问题所在。
Spring 批量触发以下查询以从数据库中获取数据
SELECT * FROM ( SELECT CASE_NUM, CASE_STTS_CD, UPDT_TS,SBMT_OFC_CD, SBMT_OFC_NUM, DSTR_CHNL_CD, APRV_OFC_CD, APRV_OFC_NUM,SBMT_TYP_CD, **ROW_NUMBER()** OVER(ORDER BY CASE_NUM) AS rownumber FROM TCASE WHERE **SECARCH_PROC_IND ='P'** ) AS data WHERE rownumber BETWEEN :fromRow AND :toRow ORDER BY rownumber ASC FETCH FIRST 100 ROWS ONLY
处理每一行后,标志 SECARCH_PROC_IND ='P' 更新为 SECARCH_PROC_IND ='C'。由于 SECARCH_PROC_IND 在 WHERE 子句中使用,这实际上减少了 ROW_NUMBER 在下一个由 spring批次。这是问题的根本原因。
我们在 table 中引入了另一列 SECARCH_PROC_TMP_IND,我们在批处理之前使用 beforeJob() 中的标志 'P' 对其进行了更新方法,我们在查询的 WHERE 子句中使用该列,而不是使用 SECARCH_PROC_IND 列。
批处理后,在 afterJob() 中,我们将 SECARCH_PROC_TMP_IND 重新设置为 NULL。
这解决了分区问题。