使用 spring-data-jpa 的自定义 ItemReader

Custom ItemReader with spring-data-jpa

我正在使用现有实体和存储库创建一个 Spring 批处理项目。我需要为使用现有 jpa 存储库读取数据的作业使用自定义 ItemReader

自定义 reader

public class InMemoryReader implements ItemReader<Product> {

    @Autowired
    private ProductService productService;


    private int nextStudentIndex;
    private List<Product> studentData;

    public InMemoryReader() {
        initialize();
    }

    private void initialize() {
        studentData = new ArrayList<Product>();
        studentData.add(new Product("hi"));

        for (Product p : productService.get())
            studentData.add(p);
        nextStudentIndex = 0;
    }

    @Override
    public Product read() throws Exception {
        Product nextStudent = null;

        if (nextStudentIndex < studentData.size()) {
            nextStudent = studentData.get(nextStudentIndex);
            nextStudentIndex++;
        }

        return nextStudent;
    }
}

但我无法在 itemreader 中自动装配 ProductService。它抛出如下错误:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'reader' defined in class path resource [wariyum/sb/emailNotifier/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.item.ItemReader]: Factory method 'reader' threw exception; nested exception is java.lang.NullPointerException
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:599)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1123)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1018)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:510)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getObject(AbstractBeanFactory.java:305)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:301)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:196)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:772)
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:834)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:537)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:689)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:321)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:969)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:958)
    at wariyum.sb.emailNotifier.Application.main(Application.java:15)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.item.ItemReader]: Factory method 'reader' threw exception; nested exception is java.lang.NullPointerException
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:189)
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:588)
    ... 16 common frames omitted
Caused by: java.lang.NullPointerException: null
    at wariyum.sb.emailNotifier.service.InMemoryReader.initialize(InMemoryReader.java:31)
    at wariyum.sb.emailNotifier.service.InMemoryReader.<init>(InMemoryReader.java:24)
    at wariyum.sb.emailNotifier.BatchConfiguration.reader(BatchConfiguration.java:41)
    at wariyum.sb.emailNotifier.BatchConfiguration$$EnhancerBySpringCGLIB$$df186b4e.CGLIB$reader[=12=](<generated>)
    at wariyum.sb.emailNotifier.BatchConfiguration$$EnhancerBySpringCGLIB$$df186b4e$$FastClassBySpringCGLIB$4d6a16.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:228)
    at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:318)
    at wariyum.sb.emailNotifier.BatchConfiguration$$EnhancerBySpringCGLIB$$df186b4e.reader(<generated>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:162)
    ... 17 common frames omitted

批量配置文件

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {


    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public ItemReader<Product> reader() throws Exception {
        return new InMemoryReader();
    }


    @Bean
    public ProductItemProcessor processor() {
        return new ProductItemProcessor();
    }

    @Bean
    public ItemWriter<Product> writer() {
        return new ItemWriter<Product>() {
            @Override
            public void write(List<? extends Product> items) throws Exception {

            }
        };
    }
    // end::readerwriterprocessor[]

    // tag::listener[]

    @Bean
    public JobExecutionListener listener() {
        return null;
    }

    // end::listener[]

    // tag::jobstep[]

    @Bean
    public Job importPerson(JobBuilderFactory jobs, Step s1) {

        return jobs.get("import")
                .incrementer(new RunIdIncrementer()) // because a spring config bug, this incrementer is not really useful
                .flow(s1)
                .end()
                .build();
    }

    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Product, Product>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }
}

产品服务实施

@Service("productService")
public class ProductServiceImpl implements ProductService {

    private ProductRepository repository;

    @Autowired
    public ProductServiceImpl(ProductRepository repository) {
        this.repository = repository;
    }

    @Override
    public List<Product> get() {
        List<Product> list = new ArrayList<Product>();
        for (Product p : repository.findAll())
            list.add(p);
        return list;
    }
}

您正在从构造函数调用 initialize(),即在连接 ProductService 依赖项之前。

从构造函数中删除对 initialize() 的调用,而是让 Spring 在通过使用 @PostConstruct 注释或使用概述的其他方法之一连接依赖关系后调用它这里:

How to call a method after bean initialization is complete?