spring 引导和 spring 批处理在 ItemProcessor 批处理的 DAO 中抛出 NullPointerException

spring boot and spring batch throw NullPointerException in DAO of ItemProcessor batch

我在 spring 引导和 spring 批处理应用程序中遇到问题,我分享了我的情况,我的 Dao 层是一个接口,正在抛出 NPE (NullPointerException) 并且它实现了,它有注释 @Autowired 我用 Junit 进行了测试以检查它和工作,但我不知道我的配置工作或 CustomItemProccesor 是错误的,这是我的代码,我希望你能帮助我,拜托,谢谢。

配置作业:

@ComponentScan({"com.company.batch.config","com.company.batch.dao","com.company.batch.mapper","com.company.batch.model","com.company.batch.particion","com.company.batch.procesos","com.company.batch.reader","com.company.batch.writers"})
public class ConfigJob 
{

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("sqlserverDataSource")
    private DataSource dataSource;


    @Bean(name = "demoPartitionStep")
    public Step step1Manager(Step slaveStep) {
        return stepBuilderFactory.get("step1.manager")
            .<String, String>partitioner("step1", demoPartitioner())
            .step(slaveStep)
            .gridSize(numerohilos())
            .taskExecutor(taskExecutor())
            .build();
    }

    @Bean(name = "demoPartitioner", destroyMethod = "")
    public Partitioner demoPartitioner() {
        RangePartitioner partitioner = new RangePartitioner();
        //partitioner.setDataSource(dataSource);
        // partitioner.partition(20);
        return partitioner;
    }

    // slave step
    @Bean
    public Step slaveStep(ItemReader<beangenerico> demoReader)
    {
        return stepBuilderFactory.get("slaveStep")
                .chunk(1)
                //.reader(pagingItemReader(null, null))
                .reader(demoReader)
                .processor(compositeProcessor())
                .writer(new ListDelegateWriter())
                .build();
    }


    @Bean
    public CompositeItemProcessor compositeProcessor() {
        List<ItemProcessor> delegates = new ArrayList<>(3);
        delegates.add(new CustomerItemProcessor());
        delegates.add(new AccountsItemProcessor());
        delegates.add(new beanDataItemProccesor());

        CompositeItemProcessor processor = new CompositeItemProcessor();

        processor.setDelegates(delegates);

        return processor;
    }


    @Bean(name = "demoWriter")
    @StepScope
    public ItemWriter< beangenerico> CustomItemWriter() {
        // TODO Auto-generated method stub
        CustomItemWriter wri = new CustomItemWriter();
        return wri;
    }


    @Bean(name = "demoReader")
    @StepScope
    public ItemReader<beangenerico> formiikreader(@Value("#{stepExecutionContext['fromId']}") int minValue,@Value("#{stepExecutionContext['toId']}") int maxValue){
        myReader fr = new myReader(minValue,maxValue);
        return fr;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor("spring_batch");
    }

    @Bean
    public Job job(@Qualifier("demoPartitionStep") Step demoPartitionStep) {
        return this.jobBuilderFactory.get("job")
                .start(demoPartitionStep)
                .build();
    }

}

这是 spring 批次的 CustomerItemProcessor

@Component
public class CustomerItemProcessor implements ItemProcessor<beangenerico,ThreadLocal<CopyOnWriteArrayList<beanCustomer>>> {
 {

    @Autowired
    private CustomerDAO customerDAO;

    private ThreadLocal<CopyOnWriteArrayList<beanCustomer>> listbean = new ThreadLocal<CopyOnWriteArrayList<beanCustomer>>();

    public ThreadLocal<CopyOnWriteArrayList<beanCustomer>> process(beangenerico rangos) throws Exception {
        System.out.println("entro a customitemprocessor");
            // TODO Auto-generated method stub

            listbean.set(new CopyOnWriteArrayList<beanCustomer>());

            System.out.println("rangos:"+rangos.getIni()+"-"+rangos.getFin()); //si trae datos
            listbean = customerDAO.getAccAgentes(rangos);

            if(listbean != null) {
                return listbean;
            } else {
                return null;
            }

    }

    @Autowired
    public void setCustomerDAO(CustomerDAO customerDAO) {
        this.customerDAO = customerDAO;
    }

}

这是我的界面 Dao

public interface CustomerDAO {
    ThreadLocal<CopyOnWriteArrayList<beanCustomer>> getAccAgentes(beangenerico bean);
}

这是 DAO 实现:

@Repository("customerDAO")
public class CustomerDAOImpl  implements CustomerDAO{ 

    private String SP_SQL = "{call mysp(?, ?)}";

    @Autowired
    @Qualifier("sqlserverDataSource")
    DataSource dataSource;

    @Autowired
    JdbcTemplate jdbcTemplate;

    private ThreadLocal<CopyOnWriteArrayList<beanCustomer>> customerList2=new ThreadLocal<CopyOnWriteArrayList<beanCustomer>>();
    private beanCustomer b = null; 


    public  ThreadLocal<CopyOnWriteArrayList<beanCustomer>> getAccAgentes(beangenerico bean) {  
            // TODO Auto-generated method stub
            try {

                customerList2.set(new CopyOnWriteArrayList<beanCustomer>());
                System.out.println("entro a metodo");
                if(getJdbcTemplate().getDataSource()!=null) {
                    System.out.println("success con"); //with junit test is ok
                }else {
                    System.out.println("null conn");
                }
                return getJdbcTemplate().query(
                        SP_SQL,
                        new Object [] {bean.getIni(),bean.getFin()}, new ResultSetExtractor<ThreadLocal<CopyOnWriteArrayList<beanCustomer>>>() {    

                            @Override
                            public ThreadLocal<CopyOnWriteArrayList<beanCustomer>> extractData(ResultSet rs)
                                    throws SQLException, DataAccessException {
                                // TODO Auto-generated method stub
                                while(rs.next()){
                                b = new beanCustomer();
                                b.setIduser(rs.getString("iduser_co"));
                                b.setAccount(rs.getString("account_co"));
                                b.setTypeUser(rs.getString("type_idag"));
                                customerList2.get().add(b);
                                }
                                return customerList2;
                            }
                        });
            }catch(Exception e) {}

            System.out.println("size lista: "+customerList2.get().size());
            return customerList2;

    }

    @Autowired
    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    public JdbcTemplate getJdbcTemplate() {
        if (null == jdbcTemplate) {
            jdbcTemplate = new JdbcTemplate(dataSource);
        }
        return jdbcTemplate;
    }   

}

堆栈跟踪:

2020-04-28 11:15:03 - Repeat is complete according to policy and result value.
entro a customitemprocessor
rangos:71-80
2020-04-28 11:15:03 - Applying contribution: [StepContribution: read=1, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING]
2020-04-28 11:15:03 - Rollback for RuntimeException: java.lang.NullPointerException: null
2020-04-28 11:15:03 - Initiating transaction rollback on application exception
java.lang.NullPointerException: null
    at com.company.batch.procesos.CustomerItemProcessor.process(CustomerItemProcessor.java:69)
    at com.company.batch.procesos.CustomerItemProcessor.process(CustomerItemProcessor.java:1)
    at org.springframework.batch.item.support.CompositeItemProcessor.processItem(CompositeItemProcessor.java:63)
    at org.springframework.batch.item.support.CompositeItemProcessor.process(CompositeItemProcessor.java:52)
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134)
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319)
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210)
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
    at org.springframework.batch.core.step.tasklet.TaskletStep.doInChunkContext(TaskletStep.java:273)
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:138)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:135)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
2020-04-28 11:15:03 - Initiating transaction rollback
2020-04-28 11:15:03 - Rolling back resourceless transaction on [org.springframework.batch.support.transaction.ResourcelessTransactionManager$ResourcelessTransaction@1337e94]
2020-04-28 11:15:03 - Handling exception: java.lang.NullPointerException, caused by: java.lang.NullPointerException: null
2020-04-28 11:15:03 - Handling fatal exception explicitly (rethrowing first of 1): java.lang.NullPointerException: null
2020-04-28 11:15:03 - Encountered an error executing step slaveStep in job job
java.lang.NullPointerException: null
    at com.company.batch.procesos.CustomerItemProcessor.process(CustomerItemProcessor.java:69)
    at com.company.batch.procesos.CustomerItemProcessor.process(CustomerItemProcessor.java:1)
    at org.springframework.batch.item.support.CompositeItemProcessor.processItem(CompositeItemProcessor.java:63)

这里

listbean = customerDAO.getAccAgentes(rangos);

提前致谢

在定义 CompositeItemProcessor 时,您正在自己创建 CustomerItemProcessor ,这导致无法注入其依赖的 beans 并导致 NPE。您应该从 Spring 上下文中获取 CustomerItemProcessor 而不是自己创建它。类似于:

    @Bean
    public CompositeItemProcessor compositeProcessor(CustomerItemProcessor customerItemProcessor) {
        List<ItemProcessor> delegates = new ArrayList<>(3);
        delegates.add(customerItemProcessor);

        CompositeItemProcessor processor = new CompositeItemProcessor();

        processor.setDelegates(delegates);

        return processor;
    }

同样适用于CompositeItemProcessor的其他委托bean,如AccountsItemProcessor

好的,我发现问题是没有在 Config 中实例化 bean,这里将我的解决方案放在代码 ConfigJob 中,希望对其他人有所帮助。

</p> <pre><code>@ComponentScan({"com.company.batch.config","com.company.batch.dao","com.company.batch.mapper","com.company.batch.model","com.company.batch.particion","com.company.batch.procesos","com.company.batch.reader","com.company.batch.writers"}) public class ConfigJob { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("sqlserverDataSource") private DataSource dataSource; @Bean(name = "demoPartitionStep") public Step step1Manager(Step slaveStep) { return stepBuilderFactory.get("step1.manager") .<String, String>partitioner("step1", demoPartitioner()) .step(slaveStep) .gridSize(numerohilos()) .taskExecutor(taskExecutor()) .build(); } @Bean(name = "demoPartitioner", destroyMethod = "") public Partitioner demoPartitioner() { RangePartitioner partitioner = new RangePartitioner(); //partitioner.setDataSource(dataSource); // partitioner.partition(20); return partitioner; } // slave step @Bean public Step slaveStep(ItemReader<beangenerico> demoReader) { return stepBuilderFactory.get("slaveStep") .chunk(1) //.reader(pagingItemReader(null, null)) .reader(demoReader) .processor(compositeProcessor()) .writer(new ListDelegateWriter()) .build(); } @Bean public CompositeItemProcessor compositeProcessor() { List<ItemProcessor> delegates = new ArrayList<>(3); delegates.add(new CustomerItemProcessor()); delegates.add(new AccountsItemProcessor()); delegates.add(new beanDataItemProccesor()); CompositeItemProcessor processor = new CompositeItemProcessor(); processor.setDelegates(delegates); return processor; } @Bean public CustomerItemProcessor CustomerProccesor(){ return new CustomerItemProcessor(); } @Bean public AccountsItemProcessor AccountsItemProcessor(){ return new AccountsItemProcessor(); } @Bean public beanDataItemProccesor beanDataItemProccesor(){ return new beanDataItemProccesor(); } @Bean(name = "demoWriter") @StepScope public ItemWriter< beangenerico> CustomItemWriter() { // TODO Auto-generated method stub CustomItemWriter wri = new CustomItemWriter(); return wri; } @Bean(name = "demoReader") @StepScope public ItemReader<beangenerico> formiikreader(@Value("#{stepExecutionContext['fromId']}") int minValue,@Value("#{stepExecutionContext['toId']}") int maxValue){ myReader fr = new myReader(minValue,maxValue); return fr; } @Bean public TaskExecutor taskExecutor() { return new SimpleAsyncTaskExecutor("spring_batch"); } @Bean public Job job(@Qualifier("demoPartitionStep") Step demoPartitionStep) { return this.jobBuilderFactory.get("job") .start(demoPartitionStep) .build(); } }