正在 spring 批处理程序中执行查询

Executing query inside spring batch processor

在我的项目中,我需要在 Spring Batch 处理器中执行查询以验证某些字段。

我该怎么做?

编辑:添加来源:

这是步骤的定义:

@Bean
public Step step1(JdbcBatchItemWriter<CaricoDTO> step1Writer) {
    return stepBuilderFactory.get("step1").<CaricoDTO, CaricoDTO>chunk(10).reader(multiResourceItemReader())
            .processor(processorStep1()).writer(step1Writer).build();

}

这是multiResourceItemReader的定义:

@Bean
public MultiResourceItemReader<CaricoDTO> multiResourceItemReader() {
    MultiResourceItemReader<CaricoDTO> resourceItemReader = new MultiResourceItemReader<CaricoDTO>();
    ArrayList<Integer> indexesToRemove = new ArrayList<Integer>();
    Resource[] inputResources = null;
    PathMatchingResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();
    try {
        inputResources = patternResolver.getResources(inputPath);
    } catch (IOException e) {
        e.printStackTrace();
    }

    resourceItemReader.setResources(inputResources);
    resourceItemReader.setDelegate(step1Reader());
    resourceItemReader.setComparator(new FileComparator());
    return resourceItemReader;
}

这是step1Reader:

@Bean
public FlatFileItemReader<CaricoDTO> step1Reader() {
    FlatFileItemReader<CaricoDTO> reader = new FlatFileItemReader<CaricoDTO>();
    reader.setLinesToSkip(1);
    reader.setLineMapper(new DefaultLineMapper<CaricoDTO>() {
        {
            setLineTokenizer(new DelimitedLineTokenizer("|") {
                {
                    setNames(new String[] { ..... });
                }
            });
            setFieldSetMapper(new BeanWrapperFieldSetMapper<CaricoDTO>() {
                {
                    setTargetType(CaricoDTO.class);
                }
            });
        }
    });
    return reader;
}

这是我的处理器 bean:

@Bean
public CaricoDTOItemProcessorStep1 processorStep1() {
    CaricoDTOItemProcessorStep1 processorStep1 = new CaricoDTOItemProcessorStep1();
    return processorStep1;
}

这是我的处理器定义:

public class CaricoDTOItemProcessorStep1 implements ItemProcessor<CaricoDTO, CaricoDTO> {

    private String fileName;
    private static final Logger log = LoggerFactory.getLogger(CaricoDTOItemProcessorStep1.class);
    
    @Override
    public CaricoDTO process(CaricoDTO carico) throws Exception {

        carico.setDataCaricamento(new Date(System.currentTimeMillis()));
        carico.setFileName(carico.getResource().getFilename());
        return carico;
    }

    public String getFileName() {
        return fileName;
    }

    public void setFileName(String fileName) {
        this.fileName = fileName;
    }

}

这是我的作家:

@Bean
public JdbcBatchItemWriter<CaricoDTO> step1Writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<CaricoDTO>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql(....)
            .dataSource(dataSource).build();
}

考虑到主要 OP 中的一些细节,我假设通用解决方案会被接受。

给定一个经典的 Spring Batch 项目设置,您应该可以访问连接到目标 javax.sql.DataSourceorg.springframework.jdbc.core.JdbcTemplate bean可以简单地注入您的批处理协作者组件之一:

  • org.springframework.batch.item.ItemReader
  • org.springframework.batch.item.ItemWriter
  • org.springframework.batch.item.ItemProcessor

假设您需要在继续项目处理(并可选择丢弃它)之前执行一些验证,您可以注入 JdbcTemplate bean,执行查询以验证模型不变量,然后相应地进行:

public class CaricoDTOItemProcessorStep1 implements ItemProcessor<CaricoDTO, CaricoDTO> {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;

    private String fileName;
    
    private static final Logger log = LoggerFactory.getLogger(CaricoDTOItemProcessorStep1.class);
    
    @Override
    public CaricoDTO process(CaricoDTO item) throws Exception {
        boolean someValue = jdbcTemplate.queryForObject("SELECT some_field FROM some_table WHERE some_other_field = 0", (rs, rowNum) -> rs.getBoolean(0));
        if (someValue) {
            carico.setDataCaricamento(new Date(System.currentTimeMillis()));
            carico.setFileName(carico.getResource().getFilename());
            return carico;
        } else {
            return null; // causes the item to be discarded from processing
        }
    }
}

不用说,您的查询将取决于您的域用例并且需要更新。