JdbcCursorItemReader 的分区程序 - Reader 必须先打开才能读取
Partitioner for JdbcCursorItemReader - Reader must be open before it can be read
我有一个正在工作的 spring 批处理作业,当我尝试使用分区程序进行多线程处理时,我开始得到 Reader 必须先打开才能读取。
org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
at org.springframework.batch.item.database.AbstractCursorItemReader.doRead(AbstractCursorItemReader.java:443) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_101]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_101]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_101]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_101]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at com.sun.proxy.$Proxy58.read(Unknown Source) ~[na:na]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:157) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doInIteration(SimpleChunkProvider.java:116) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:110) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) ~[spring-tx-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doInChunkContext(TaskletStep.java:271) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:139) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:136) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
2016-08-01 14:26:14.700 ERROR 12716 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step partitionStep in job exportMasterListCsv
org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:135) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.test.JobLauncherTestUtils.launchJob(JobLauncherTestUtils.java:152) [spring-batch-test-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at com.myer.pricing.onlinestore.export.job.ExportMasterListCsvTest.testLaunchJob(ExportMasterListCsvTest.java:60) [test-classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_101]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_101]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_101]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_101]
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:47) [junit-4.11.jar:na]
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) [junit-4.11.jar:na]
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) [junit-4.11.jar:na]
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) [junit-4.11.jar:na]
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) [junit-4.11.jar:na]
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) [junit-4.11.jar:na]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:254) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:89) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.junit.runners.ParentRunner.run(ParentRunner.java:238) [junit-4.11.jar:na]
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:63) [junit-4.11.jar:na]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) [junit-4.11.jar:na]
at org.junit.runners.ParentRunner.access[=11=]0(ParentRunner.java:53) [junit-4.11.jar:na]
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:229) [junit-4.11.jar:na]
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.junit.runners.ParentRunner.run(ParentRunner.java:309) [junit-4.11.jar:na]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:193) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) [.cp/:na]
我只是想知道 JdbcCursorItemReader 是否不是线程安全的,因此也许为什么我会收到此错误?
以下是我大致的工作设置...
@Configuration
public class ExportMasterListCsvJobConfig {
public static final String JOB_NAME = "exportMasterListCsv";
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
/**
* Create job to export the master list from the online pricing staging db.
*
* @param readStgDbAndExportMasterListStep read online staging db step and export to csv
* @return job to export the master list from the online pricing staging db.
*/
@Bean
public Job exportMasterListCsvJob(@Qualifier("partitionStep") Step partitionStep) {
return jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.start(partitionStep)
.build();
}
@Bean
public Step partitionStep(@Qualifier("readStgDbAndExportMasterListStep") Step readStgDbAndExportMasterListStep){
return stepBuilderFactory.get("partitionStep")
.partitioner(readStgDbAndExportMasterListStep)
.partitioner("readStgDbAndExportMasterListStep", partitioner())
.taskExecutor(taskExecutor())
.build();
}
/**
* Read from online pricing staging db.
*
* @param chunkSize
* @param queryOnlineStagingDbReaderForMasterList
* @param masterListOutputProcessor
* @param masterListFileWriter
* @return step to read from online pricing staging db.
*/
@Bean
public Step readStgDbAndExportMasterListStep(
@Value("${exportMasterListCsv.generateMasterListRows.chunkSize}") int chunkSize,
@Qualifier("queryOnlineStagingDbReaderForMasterList") ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList,
@Qualifier("masterListOutputProcessor") ItemProcessor<MasterList,MasterList> masterListOutputProcessor,
@Qualifier("masterListFileWriter") ItemWriter<MasterList> masterListFileWriter) {
return stepBuilderFactory.get("readStgDbAndExportMasterListStep")
.<MasterList,MasterList>chunk(chunkSize)
.reader(queryOnlineStagingDbReaderForMasterList)
.processor(masterListOutputProcessor)
.writer(masterListFileWriter)
.build();
}
/**
* Query and map rows from online pricing staging db into a master list object.
*
* @param onlineStagingDb
* @param masterListSql
* @return
*/
@Bean
@StepScope
public ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList(
DataSource onlineStagingDb,
@Value("#{stepExecutionContext[divisionId]}") Integer divisionId,
@Value("${exportMasterListCsv.generateMasterListRows.masterListSql}") String masterListSql) {
JdbcCursorItemReader<MasterList> reader = new JdbcCursorItemReader<>();
reader.setDataSource(onlineStagingDb);
reader.setSql(masterListSql);
reader.setPreparedStatementSetter(new PreparedStatementSetter() {
public void setValues(PreparedStatement ps) throws SQLException {
ps.setInt(1, divisionId);
}
});
reader.setRowMapper(new RowMapper<MasterList>() {
@Override
public MasterList mapRow(ResultSet resultSet, int i) throws SQLException {
MasterList masterList = new MasterList();
masterList.setL1(resultSet.getString(DaoConstants.COLUMN_HEADER_LEVEL_ONE));
return masterList;
}
});
return reader;
}
@Bean
public Partitioner partitioner(){
return new DivisionPartitioner();
}
我一直在上网阅读。这可能与 https://jira.spring.io/browse/BATCH-1301 有关吗?
谢谢
I was just wondering if JdbcCursorItemReader is not threadsafe hence
maybe why I am getting this error?
JdbcCursorItemReader 是一个 AbstractCursorItemReader,后者又是一个 AbstractItemCountingItemStreamItemReader。
根据 AbstractItemCountingItemStreamItemReader 的文档实现不是线程安全的
Abstract superclass for ItemReaders that supports restart by storing
item count in the ExecutionContext (therefore requires item ordering
to be preserved between runs). Subclasses are inherently not
thread-safe.
日志的这一部分提示线程问题:
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
通过将任务执行器设置为仅使用一个线程来尝试运行它的顺序。
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(1);
return taskExecutor;
}
第一次阅读时,reader需要很长时间才能准备好。有时 rowMapper 可能会走得太远,它认为 reader 没有打开,而它只是还在打开的过程中。要解决这个问题,只需在您第一次使用它时让 rowMapper 进入休眠状态。将以下块添加到您的 setRowMapper 并查看错误是否消失。
try {
if(firstTime) {
Thread.sleep(20000);
firstTime = false;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
这是由于 reader
的返回类型造成的
public ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList
应该更改为JdbcCursorItemReader
,即reader正在用该方法构建。
JdbcCursorItemReader<MasterList> reader = new JdbcCursorItemReader<>();
否则,bean reader 方法的可见性会被限制在更高级别,并且无法正确打开 reader。
我有一个正在工作的 spring 批处理作业,当我尝试使用分区程序进行多线程处理时,我开始得到 Reader 必须先打开才能读取。
org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
at org.springframework.batch.item.database.AbstractCursorItemReader.doRead(AbstractCursorItemReader.java:443) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_101]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_101]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_101]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_101]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at com.sun.proxy.$Proxy58.read(Unknown Source) ~[na:na]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:157) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doInIteration(SimpleChunkProvider.java:116) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:110) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) ~[spring-tx-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doInChunkContext(TaskletStep.java:271) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:139) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:136) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
2016-08-01 14:26:14.700 ERROR 12716 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step partitionStep in job exportMasterListCsv
org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:135) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.test.JobLauncherTestUtils.launchJob(JobLauncherTestUtils.java:152) [spring-batch-test-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at com.myer.pricing.onlinestore.export.job.ExportMasterListCsvTest.testLaunchJob(ExportMasterListCsvTest.java:60) [test-classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_101]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_101]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_101]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_101]
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:47) [junit-4.11.jar:na]
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) [junit-4.11.jar:na]
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) [junit-4.11.jar:na]
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) [junit-4.11.jar:na]
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) [junit-4.11.jar:na]
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) [junit-4.11.jar:na]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:254) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:89) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.junit.runners.ParentRunner.run(ParentRunner.java:238) [junit-4.11.jar:na]
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:63) [junit-4.11.jar:na]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) [junit-4.11.jar:na]
at org.junit.runners.ParentRunner.access[=11=]0(ParentRunner.java:53) [junit-4.11.jar:na]
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:229) [junit-4.11.jar:na]
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.junit.runners.ParentRunner.run(ParentRunner.java:309) [junit-4.11.jar:na]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:193) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) [.cp/:na]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) [.cp/:na]
我只是想知道 JdbcCursorItemReader 是否不是线程安全的,因此也许为什么我会收到此错误?
以下是我大致的工作设置...
@Configuration
public class ExportMasterListCsvJobConfig {
public static final String JOB_NAME = "exportMasterListCsv";
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
/**
* Create job to export the master list from the online pricing staging db.
*
* @param readStgDbAndExportMasterListStep read online staging db step and export to csv
* @return job to export the master list from the online pricing staging db.
*/
@Bean
public Job exportMasterListCsvJob(@Qualifier("partitionStep") Step partitionStep) {
return jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.start(partitionStep)
.build();
}
@Bean
public Step partitionStep(@Qualifier("readStgDbAndExportMasterListStep") Step readStgDbAndExportMasterListStep){
return stepBuilderFactory.get("partitionStep")
.partitioner(readStgDbAndExportMasterListStep)
.partitioner("readStgDbAndExportMasterListStep", partitioner())
.taskExecutor(taskExecutor())
.build();
}
/**
* Read from online pricing staging db.
*
* @param chunkSize
* @param queryOnlineStagingDbReaderForMasterList
* @param masterListOutputProcessor
* @param masterListFileWriter
* @return step to read from online pricing staging db.
*/
@Bean
public Step readStgDbAndExportMasterListStep(
@Value("${exportMasterListCsv.generateMasterListRows.chunkSize}") int chunkSize,
@Qualifier("queryOnlineStagingDbReaderForMasterList") ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList,
@Qualifier("masterListOutputProcessor") ItemProcessor<MasterList,MasterList> masterListOutputProcessor,
@Qualifier("masterListFileWriter") ItemWriter<MasterList> masterListFileWriter) {
return stepBuilderFactory.get("readStgDbAndExportMasterListStep")
.<MasterList,MasterList>chunk(chunkSize)
.reader(queryOnlineStagingDbReaderForMasterList)
.processor(masterListOutputProcessor)
.writer(masterListFileWriter)
.build();
}
/**
* Query and map rows from online pricing staging db into a master list object.
*
* @param onlineStagingDb
* @param masterListSql
* @return
*/
@Bean
@StepScope
public ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList(
DataSource onlineStagingDb,
@Value("#{stepExecutionContext[divisionId]}") Integer divisionId,
@Value("${exportMasterListCsv.generateMasterListRows.masterListSql}") String masterListSql) {
JdbcCursorItemReader<MasterList> reader = new JdbcCursorItemReader<>();
reader.setDataSource(onlineStagingDb);
reader.setSql(masterListSql);
reader.setPreparedStatementSetter(new PreparedStatementSetter() {
public void setValues(PreparedStatement ps) throws SQLException {
ps.setInt(1, divisionId);
}
});
reader.setRowMapper(new RowMapper<MasterList>() {
@Override
public MasterList mapRow(ResultSet resultSet, int i) throws SQLException {
MasterList masterList = new MasterList();
masterList.setL1(resultSet.getString(DaoConstants.COLUMN_HEADER_LEVEL_ONE));
return masterList;
}
});
return reader;
}
@Bean
public Partitioner partitioner(){
return new DivisionPartitioner();
}
我一直在上网阅读。这可能与 https://jira.spring.io/browse/BATCH-1301 有关吗? 谢谢
I was just wondering if JdbcCursorItemReader is not threadsafe hence maybe why I am getting this error?
JdbcCursorItemReader 是一个 AbstractCursorItemReader,后者又是一个 AbstractItemCountingItemStreamItemReader。
根据 AbstractItemCountingItemStreamItemReader 的文档实现不是线程安全的
Abstract superclass for ItemReaders that supports restart by storing item count in the ExecutionContext (therefore requires item ordering to be preserved between runs). Subclasses are inherently not thread-safe.
日志的这一部分提示线程问题:
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
通过将任务执行器设置为仅使用一个线程来尝试运行它的顺序。
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(1);
return taskExecutor;
}
第一次阅读时,reader需要很长时间才能准备好。有时 rowMapper 可能会走得太远,它认为 reader 没有打开,而它只是还在打开的过程中。要解决这个问题,只需在您第一次使用它时让 rowMapper 进入休眠状态。将以下块添加到您的 setRowMapper 并查看错误是否消失。
try {
if(firstTime) {
Thread.sleep(20000);
firstTime = false;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
这是由于 reader
的返回类型造成的public ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList
应该更改为JdbcCursorItemReader
,即reader正在用该方法构建。
JdbcCursorItemReader<MasterList> reader = new JdbcCursorItemReader<>();
否则,bean reader 方法的可见性会被限制在更高级别,并且无法正确打开 reader。