JpaPagingItemReader 无法读取数据并转到下一步

JpaPagingItemReader failing to read data and move to next step

我有一个 Spring 批处理程序,它从 SQL 服务器读取数据并写入文件。我正在使用 returns 350000 行的查询,在 SQL Server studio 上 运行 大约需要 2 分钟 - 因此查询已针对最佳性能进行了调整。

下面是 JobConfiguration class:

@Configuration
@Slf4j
public class JobConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final FileProperties fileProperties;
    private JobProperties jobProperties;
    private FileUtility fileUtility;
    private CompensationDetailStatusRepository compensationDetailStatusRepository;
    private CompensationDetailsRepository compensationDetailsRepository;

    @Autowired
    public JobConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory,
                            JobProperties jobProperties, FileProperties fileProperties
    ) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.jobProperties = jobProperties;
        this.fileProperties = fileProperties;
    }

    @Bean
    public Job processExtractJob(ExtractProcessListener listener,
                                        Step processExtract,
                                        Step moveFiles
                                        ) {
        return jobBuilderFactory.get("processExtractJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .start(processExtract)
                .on("COMPLETED")
                .to(moveFiles)
                .end()
                .build();
    }

    @Bean
    public Step processExtract(ItemReader<Person> PersonReader,
                                      ExtractProcessor extractProcessor,
                                      ExtractWriter extractWriter
    ) {
        return stepBuilderFactory.get("processExtract")
                .<Person, ExtractVO>chunk(50)
                .faultTolerant()
                .retryLimit(1)
                .retry(IOException.class)
                .retry(UncheckedIOException.class)
                .reader(PersonReader)
                .processor(extractProcessor)
                .writer(extractWriter)
                .build();
    }

    @Bean
    public ExtractWriter extractWriter() {
        return new ExtractWriter();
    }

    @Bean
    public ExtractProcessor extractProcessor() {
        return new ExtractProcessor();
    }

    @Bean
    public Step moveFiles() {
        return stepBuilderFactory.get("moveFiles")
                .tasklet((contribution, chunkContext) -> {
                    fileUtility.moveFiles(String.format("%s*", fileProperties.getFileNamePrefix()),
                            fileProperties.getFileStagingPath(), fileProperties.getFileTargetPath());
                    return RepeatStatus.FINISHED;
                }).build();
    }

    @Bean
    @StepScope
    public ItemReader<Person> PersonReader(EntityManagerFactory entityManagerFactory) {
        String query = "Select new com.example.Person (DTL.personId,DTL.description) FROM DETAIL DTL;";
        JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();
        reader.setQueryString(query);
        reader.setTransacted(false);
        reader.setPageSize(jobProperties.getReaderPageSize());
        reader.setEntityManagerFactory(entityManagerFactory);
        reader.setPageSize(jobProperties.getReaderPageSize());
        return reader;
    }

    @Bean
    public ExtractProcessListener jobExecutionListener() {
        return new ExtractProcessListener();
    }
}

下面是数据库结果集的 java 对象 (Person.java):

@ToString
@AllArgsConstructor
@NoArgsConstructor
@Data
public class PersonCompensation {
    private String personId;
    private String description;
}

当我运行批处理时,出现以下异常:

2020-04-15 21:34:59.172 ERROR 11748 --- [rTaskExecutor-1] o.s.batch.core.step.AbstractStep         : Encountered an error executing step processExtract in job processExtractJob
org.springframework.batch.core.step.skip.NonSkippableReadException: Non-skippable exception during read
    at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:105) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.batch...
Caused by: java.lang.NullPointerException: null
    at org.springframework.batch.item.database.JpaPagingItemReader.doReadPage(JpaPagingItemReader.java:196) ~[spring-batch-infrastructure-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:108) ~[spring-batch-infrastructure-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.batch.item.support...
    ... 27 common frames omitted
2020-04-15 21:34:59.946  WARN 11748 --- [rTaskExecutor-1] o.s.b.f.support.DisposableBeanAdapter    : Invocation of destroy method 'close' failed on bean with name 'scopedTarget.personCompensationReader': org.springframework.batch.item.ItemStreamException: Error while closing item reader
2020-04-15 21:35:00.277 ERROR 11748 --- [rTaskExecutor-1] o.s.batch.core.job.AbstractJob           : Encountered fatal error executing job
org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:140) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.batch.core...
Caused by: org.springframework.batch.core.job.flow.FlowExecutionException: Next state not found in flow=processExtractJob for state=processExtractJob.processExtract with exit status=FAILED
    at org.springframework.batch.core.job.flow.support.SimpleFlow.nextState(SimpleFlow.java:230) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.batch.core...
Exit Code: FAILED
Exit Desc: org.springframework.batch.core.step.skip.NonSkippableReadException: Non-skippable exception during read
    at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:105)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doInIteration(SimpleChunkProvider.java:116)
    at org.springframework.batch...
Caused by: java.lang.NullPointerException
    at org.springframework.batch.item.database.JpaPagingItemReader.doReadPage(JpaPagingItemReader.java:196)
    at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:108)
    at org.springframework.batch.item...
    ... 27 more

我有三个问题:

  1. spring 批处理是否一次查询并获取所有 350000 行并将记录以 50(块大小)为单位的块提供给处理器?
  2. 我在代码中有这一行 JpaPagingItemReader<Person> reader = new JpaPagingItemReader<>();。 Person class 应该是 Entity 对象才能工作还是 POJO 可以正常工作?
  3. 什么可能导致上述异常?
  1. Does the spring batch process query and fetch all the 350000 rows at once and give the records to processor in chunks of 50 (chunk size) ?

不,它一次只会读取 pageSize 个项目,而不是全部(这就是分页 reader 的意义所在)。 ChunkSize 和 PageSize 是不同的参数。例如,您可以阅读包含 100 个项目的页面,并且每页有 10 个块,每个块包含 10 个项目。匹配 pageSize 和 chunkSize 通常会提供更好的性能。请参考Javadoc.

  1. I have this line in the code JpaPagingItemReader reader = new JpaPagingItemReader<>();. Should the Person class be a Entity object for this to work or any will a POJO work fine?

是的,否则我看不出使用 JPA reader 的目的(一个 JDBC 就足够了)。

  1. What could be causing the above exception?

您有一个 NullPointerException here,这意味着实体管理器是 null。您需要在 reader 上调用 afterPropertiesSet 以确保其配置正确。