JpaPagingItemReader 无法读取数据并转到下一步
JpaPagingItemReader failing to read data and move to next step
我有一个 Spring 批处理程序,它从 SQL 服务器读取数据并写入文件。我正在使用 returns 350000 行的查询,在 SQL Server studio 上 运行 大约需要 2 分钟 - 因此查询已针对最佳性能进行了调整。
下面是 JobConfiguration class:
@Configuration
@Slf4j
public class JobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final FileProperties fileProperties;
private JobProperties jobProperties;
private FileUtility fileUtility;
private CompensationDetailStatusRepository compensationDetailStatusRepository;
private CompensationDetailsRepository compensationDetailsRepository;
@Autowired
public JobConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory,
JobProperties jobProperties, FileProperties fileProperties
) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.jobProperties = jobProperties;
this.fileProperties = fileProperties;
}
@Bean
public Job processExtractJob(ExtractProcessListener listener,
Step processExtract,
Step moveFiles
) {
return jobBuilderFactory.get("processExtractJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(processExtract)
.on("COMPLETED")
.to(moveFiles)
.end()
.build();
}
@Bean
public Step processExtract(ItemReader<Person> PersonReader,
ExtractProcessor extractProcessor,
ExtractWriter extractWriter
) {
return stepBuilderFactory.get("processExtract")
.<Person, ExtractVO>chunk(50)
.faultTolerant()
.retryLimit(1)
.retry(IOException.class)
.retry(UncheckedIOException.class)
.reader(PersonReader)
.processor(extractProcessor)
.writer(extractWriter)
.build();
}
@Bean
public ExtractWriter extractWriter() {
return new ExtractWriter();
}
@Bean
public ExtractProcessor extractProcessor() {
return new ExtractProcessor();
}
@Bean
public Step moveFiles() {
return stepBuilderFactory.get("moveFiles")
.tasklet((contribution, chunkContext) -> {
fileUtility.moveFiles(String.format("%s*", fileProperties.getFileNamePrefix()),
fileProperties.getFileStagingPath(), fileProperties.getFileTargetPath());
return RepeatStatus.FINISHED;
}).build();
}
@Bean
@StepScope
public ItemReader<Person> PersonReader(EntityManagerFactory entityManagerFactory) {
String query = "Select new com.example.Person (DTL.personId,DTL.description) FROM DETAIL DTL;";
JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();
reader.setQueryString(query);
reader.setTransacted(false);
reader.setPageSize(jobProperties.getReaderPageSize());
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(jobProperties.getReaderPageSize());
return reader;
}
@Bean
public ExtractProcessListener jobExecutionListener() {
return new ExtractProcessListener();
}
}
下面是数据库结果集的 java 对象 (Person.java):
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Data
public class PersonCompensation {
private String personId;
private String description;
}
当我运行批处理时,出现以下异常:
2020-04-15 21:34:59.172 ERROR 11748 --- [rTaskExecutor-1] o.s.batch.core.step.AbstractStep : Encountered an error executing step processExtract in job processExtractJob
org.springframework.batch.core.step.skip.NonSkippableReadException: Non-skippable exception during read
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:105) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch...
Caused by: java.lang.NullPointerException: null
at org.springframework.batch.item.database.JpaPagingItemReader.doReadPage(JpaPagingItemReader.java:196) ~[spring-batch-infrastructure-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:108) ~[spring-batch-infrastructure-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.item.support...
... 27 common frames omitted
2020-04-15 21:34:59.946 WARN 11748 --- [rTaskExecutor-1] o.s.b.f.support.DisposableBeanAdapter : Invocation of destroy method 'close' failed on bean with name 'scopedTarget.personCompensationReader': org.springframework.batch.item.ItemStreamException: Error while closing item reader
2020-04-15 21:35:00.277 ERROR 11748 --- [rTaskExecutor-1] o.s.batch.core.job.AbstractJob : Encountered fatal error executing job
org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:140) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core...
Caused by: org.springframework.batch.core.job.flow.FlowExecutionException: Next state not found in flow=processExtractJob for state=processExtractJob.processExtract with exit status=FAILED
at org.springframework.batch.core.job.flow.support.SimpleFlow.nextState(SimpleFlow.java:230) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core...
Exit Code: FAILED
Exit Desc: org.springframework.batch.core.step.skip.NonSkippableReadException: Non-skippable exception during read
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:105)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doInIteration(SimpleChunkProvider.java:116)
at org.springframework.batch...
Caused by: java.lang.NullPointerException
at org.springframework.batch.item.database.JpaPagingItemReader.doReadPage(JpaPagingItemReader.java:196)
at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:108)
at org.springframework.batch.item...
... 27 more
我有三个问题:
- spring 批处理是否一次查询并获取所有 350000 行并将记录以 50(块大小)为单位的块提供给处理器?
- 我在代码中有这一行
JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();
。 Person class 应该是 Entity 对象才能工作还是 POJO 可以正常工作?
- 什么可能导致上述异常?
- Does the spring batch process query and fetch all the 350000 rows at once and give the records to processor in chunks of 50 (chunk size) ?
不,它一次只会读取 pageSize
个项目,而不是全部(这就是分页 reader 的意义所在)。 ChunkSize 和 PageSize 是不同的参数。例如,您可以阅读包含 100 个项目的页面,并且每页有 10 个块,每个块包含 10 个项目。匹配 pageSize 和 chunkSize 通常会提供更好的性能。请参考Javadoc.
- I have this line in the code JpaPagingItemReader reader = new JpaPagingItemReader<>();. Should the Person class be a Entity object for this to work or any will a POJO work fine?
是的,否则我看不出使用 JPA reader 的目的(一个 JDBC 就足够了)。
- What could be causing the above exception?
您有一个 NullPointerException here,这意味着实体管理器是 null
。您需要在 reader 上调用 afterPropertiesSet
以确保其配置正确。
我有一个 Spring 批处理程序,它从 SQL 服务器读取数据并写入文件。我正在使用 returns 350000 行的查询,在 SQL Server studio 上 运行 大约需要 2 分钟 - 因此查询已针对最佳性能进行了调整。
下面是 JobConfiguration class:
@Configuration
@Slf4j
public class JobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final FileProperties fileProperties;
private JobProperties jobProperties;
private FileUtility fileUtility;
private CompensationDetailStatusRepository compensationDetailStatusRepository;
private CompensationDetailsRepository compensationDetailsRepository;
@Autowired
public JobConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory,
JobProperties jobProperties, FileProperties fileProperties
) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.jobProperties = jobProperties;
this.fileProperties = fileProperties;
}
@Bean
public Job processExtractJob(ExtractProcessListener listener,
Step processExtract,
Step moveFiles
) {
return jobBuilderFactory.get("processExtractJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(processExtract)
.on("COMPLETED")
.to(moveFiles)
.end()
.build();
}
@Bean
public Step processExtract(ItemReader<Person> PersonReader,
ExtractProcessor extractProcessor,
ExtractWriter extractWriter
) {
return stepBuilderFactory.get("processExtract")
.<Person, ExtractVO>chunk(50)
.faultTolerant()
.retryLimit(1)
.retry(IOException.class)
.retry(UncheckedIOException.class)
.reader(PersonReader)
.processor(extractProcessor)
.writer(extractWriter)
.build();
}
@Bean
public ExtractWriter extractWriter() {
return new ExtractWriter();
}
@Bean
public ExtractProcessor extractProcessor() {
return new ExtractProcessor();
}
@Bean
public Step moveFiles() {
return stepBuilderFactory.get("moveFiles")
.tasklet((contribution, chunkContext) -> {
fileUtility.moveFiles(String.format("%s*", fileProperties.getFileNamePrefix()),
fileProperties.getFileStagingPath(), fileProperties.getFileTargetPath());
return RepeatStatus.FINISHED;
}).build();
}
@Bean
@StepScope
public ItemReader<Person> PersonReader(EntityManagerFactory entityManagerFactory) {
String query = "Select new com.example.Person (DTL.personId,DTL.description) FROM DETAIL DTL;";
JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();
reader.setQueryString(query);
reader.setTransacted(false);
reader.setPageSize(jobProperties.getReaderPageSize());
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(jobProperties.getReaderPageSize());
return reader;
}
@Bean
public ExtractProcessListener jobExecutionListener() {
return new ExtractProcessListener();
}
}
下面是数据库结果集的 java 对象 (Person.java):
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Data
public class PersonCompensation {
private String personId;
private String description;
}
当我运行批处理时,出现以下异常:
2020-04-15 21:34:59.172 ERROR 11748 --- [rTaskExecutor-1] o.s.batch.core.step.AbstractStep : Encountered an error executing step processExtract in job processExtractJob
org.springframework.batch.core.step.skip.NonSkippableReadException: Non-skippable exception during read
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:105) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch...
Caused by: java.lang.NullPointerException: null
at org.springframework.batch.item.database.JpaPagingItemReader.doReadPage(JpaPagingItemReader.java:196) ~[spring-batch-infrastructure-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:108) ~[spring-batch-infrastructure-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.item.support...
... 27 common frames omitted
2020-04-15 21:34:59.946 WARN 11748 --- [rTaskExecutor-1] o.s.b.f.support.DisposableBeanAdapter : Invocation of destroy method 'close' failed on bean with name 'scopedTarget.personCompensationReader': org.springframework.batch.item.ItemStreamException: Error while closing item reader
2020-04-15 21:35:00.277 ERROR 11748 --- [rTaskExecutor-1] o.s.batch.core.job.AbstractJob : Encountered fatal error executing job
org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:140) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core...
Caused by: org.springframework.batch.core.job.flow.FlowExecutionException: Next state not found in flow=processExtractJob for state=processExtractJob.processExtract with exit status=FAILED
at org.springframework.batch.core.job.flow.support.SimpleFlow.nextState(SimpleFlow.java:230) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core...
Exit Code: FAILED
Exit Desc: org.springframework.batch.core.step.skip.NonSkippableReadException: Non-skippable exception during read
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:105)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doInIteration(SimpleChunkProvider.java:116)
at org.springframework.batch...
Caused by: java.lang.NullPointerException
at org.springframework.batch.item.database.JpaPagingItemReader.doReadPage(JpaPagingItemReader.java:196)
at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:108)
at org.springframework.batch.item...
... 27 more
我有三个问题:
- spring 批处理是否一次查询并获取所有 350000 行并将记录以 50(块大小)为单位的块提供给处理器?
- 我在代码中有这一行
JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();
。 Person class 应该是 Entity 对象才能工作还是 POJO 可以正常工作? - 什么可能导致上述异常?
- Does the spring batch process query and fetch all the 350000 rows at once and give the records to processor in chunks of 50 (chunk size) ?
不,它一次只会读取 pageSize
个项目,而不是全部(这就是分页 reader 的意义所在)。 ChunkSize 和 PageSize 是不同的参数。例如,您可以阅读包含 100 个项目的页面,并且每页有 10 个块,每个块包含 10 个项目。匹配 pageSize 和 chunkSize 通常会提供更好的性能。请参考Javadoc.
- I have this line in the code JpaPagingItemReader reader = new JpaPagingItemReader<>();. Should the Person class be a Entity object for this to work or any will a POJO work fine?
是的,否则我看不出使用 JPA reader 的目的(一个 JDBC 就足够了)。
- What could be causing the above exception?
您有一个 NullPointerException here,这意味着实体管理器是 null
。您需要在 reader 上调用 afterPropertiesSet
以确保其配置正确。