Spring 使用 CompositeItemWriter 和 CompositeItemProcessor 进行批处理
Spring Batch Using CompositeItemWriter and CompositeItemProcessor
使用 Spring 批处理,我必须写两个不同的 table,但使用相同的 ItemReader。
我不知道如何使用一个 ItemReader 和一个 CompositeItemWriter。
这是作业配置:
public class JobConfiguration {
@Autowired
@SuppressWarnings("squid:S3305")
private ItemReaderSurveillance itemReaderSurveillance;
@Autowired
@SuppressWarnings("squid:S3305")
private ItemWriterSurveillance itemWriterSurveillance;
@Autowired
@SuppressWarnings("squid:S3305")
private ItemWriterSurveillanceEcheance itemWriterSurveillanceEcheance;
@Autowired
@SuppressWarnings("squid:S3305")
private CompositeItemProcessorSurveillance compositeItemProcessor;
@Bean(name = "importSurveillanceJob")
public Job job(JobBuilderFactory jobs) {
return jobs.get("importSurveillanceStep")
.listener(jobListener)
.start(stepTaskletCreationRepertoireReport())
.next(stepTaskletCreationRepertoireArchive())
.next(stepSurveillanceReadProcessWrite())
.next(stepZipFile())
.build();
}
@Bean
protected Step stepSurveillanceReadProcessWrite() {
return stepBuilderFactory.get("stepSurveillanceReadProcessWrite")
.<SurveillanceLineFile, CompositeResultSurveillance>chunk(Integer.valueOf(commitInterval))
.reader(itemReaderSurveillance)
.processor(compositeItemProcessor)
.writer(compositeItemWriter())
.faultTolerant()
.retryLimit(0)
.build();
}
@Bean
public CompositeItemWriter<CompositeResultSurveillance> compositeItemWriter(){
CompositeItemWriter compositeItemWriter = new CompositeItemWriter();
compositeItemWriter.setDelegates(Arrays.asList(itemWriterSurveillance, itemWriterSurveillanceEcheance));
return compositeItemWriter;
}
}
项目作者:
@Slf4j
@StepScope
@Component
public class ItemWriterSurveillance implements ItemWriter<FoaSurveillance>, StepExecutionListener {
String fileName;
JobExecution mJobExecution;
StepExecution stepExecution;
@Autowired
private FoaSurveillanceDao foaSurveillanceDao;
@Override
public void write(List<? extends FoaSurveillance> foaSurveillances) {
ExecutionContext executionContext = stepExecution.getExecutionContext();
// Process data
}
@Override
public void beforeStep(StepExecution stepExecution) {
mJobExecution = stepExecution.getJobExecution();
this.stepExecution = stepExecution;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return ExitStatus.COMPLETED;
}
}
@Slf4j
@StepScope
@Component
public class ItemWriterSurveillanceEcheance implements ItemWriter<FoaSurveillanceEcheance>, StepExecutionListener {
String fileName;
JobExecution mJobExecution;
StepExecution stepExecution;
@Autowired
private FoaSurveillanceEcheanceDao foaSurveillanceEcheanceDao;
@Override
public void write(List<? extends FoaSurveillanceEcheance> foaSurveillanceEcheances) {
ExecutionContext executionContext = stepExecution.getExecutionContext();
// Process data
}
@Override
public void beforeStep(StepExecution stepExecution) {
mJobExecution = stepExecution.getJobExecution();
this.stepExecution = stepExecution;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return ExitStatus.COMPLETED;
}
}
组合两个 itemProcessor :
@Slf4j
@Component
public class CompositeItemProcessorSurveillance implements ItemProcessor<SurveillanceLineFile, CompositeResultSurveillance>, StepExecutionListener {
private StepExecution stepExecution;
@Autowired
ItemProcessorSurveillance itemProcessorSurveillance;
@Autowired
ItemProcessorSurveillanceEcheance itemProcessorSurveillanceEcheance;
@Override
public CompositeResultSurveillance process(SurveillanceLineFile surveillanceLineFile) throws Exception {
CompositeResultSurveillance compositeResultSurveillance = new CompositeResultSurveillance();
compositeResultSurveillance.setFoaSurveillance(itemProcessorSurveillance.process(surveillanceLineFile));
compositeResultSurveillance.setFoaSurveillanceEcheance(itemProcessorSurveillanceEcheance.process(surveillanceLineFile));
return compositeResultSurveillance;
}
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
}
ItemProcessorSurveillance:
@Slf4j
@Component
public class ItemProcessorSurveillance implements ItemProcessor<SurveillanceLineFile, FoaSurveillance>, StepExecutionListener {
String fileName;
private StepExecution stepExecution;
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return ExitStatus.COMPLETED;
}
@Override
public FoaSurveillance process(SurveillanceLineFile surveillanceLineFile) throws Exception {
ExecutionContext executionContext = stepExecution.getExecutionContext();
// Process Data
}
处理器返回的 CompositeResult :
@Getter
@Setter
public class CompositeResultSurveillance {
private FoaSurveillance foaSurveillance;
private FoaSurveillanceEcheance foaSurveillanceEcheance;
}
现在我在 ItemProcessorSurveillance 上有一个 NPE,因为 stepExecution
在 process 方法上为 null。
我不知道出了什么问题。有帮助吗?
这是因为您的 ItemProcessorSurveillance
实现了两个接口:ItemProcessor
和 StepExecutionListener
但仅在该步骤中注册为 ItemProcessor
。它还应该注册为侦听器,以便在适当的时候调用 beforeStep
来设置 stepExecution
字段。
使用 Spring 批处理,我必须写两个不同的 table,但使用相同的 ItemReader。
我不知道如何使用一个 ItemReader 和一个 CompositeItemWriter。
这是作业配置:
public class JobConfiguration {
@Autowired
@SuppressWarnings("squid:S3305")
private ItemReaderSurveillance itemReaderSurveillance;
@Autowired
@SuppressWarnings("squid:S3305")
private ItemWriterSurveillance itemWriterSurveillance;
@Autowired
@SuppressWarnings("squid:S3305")
private ItemWriterSurveillanceEcheance itemWriterSurveillanceEcheance;
@Autowired
@SuppressWarnings("squid:S3305")
private CompositeItemProcessorSurveillance compositeItemProcessor;
@Bean(name = "importSurveillanceJob")
public Job job(JobBuilderFactory jobs) {
return jobs.get("importSurveillanceStep")
.listener(jobListener)
.start(stepTaskletCreationRepertoireReport())
.next(stepTaskletCreationRepertoireArchive())
.next(stepSurveillanceReadProcessWrite())
.next(stepZipFile())
.build();
}
@Bean
protected Step stepSurveillanceReadProcessWrite() {
return stepBuilderFactory.get("stepSurveillanceReadProcessWrite")
.<SurveillanceLineFile, CompositeResultSurveillance>chunk(Integer.valueOf(commitInterval))
.reader(itemReaderSurveillance)
.processor(compositeItemProcessor)
.writer(compositeItemWriter())
.faultTolerant()
.retryLimit(0)
.build();
}
@Bean
public CompositeItemWriter<CompositeResultSurveillance> compositeItemWriter(){
CompositeItemWriter compositeItemWriter = new CompositeItemWriter();
compositeItemWriter.setDelegates(Arrays.asList(itemWriterSurveillance, itemWriterSurveillanceEcheance));
return compositeItemWriter;
}
}
项目作者:
@Slf4j
@StepScope
@Component
public class ItemWriterSurveillance implements ItemWriter<FoaSurveillance>, StepExecutionListener {
String fileName;
JobExecution mJobExecution;
StepExecution stepExecution;
@Autowired
private FoaSurveillanceDao foaSurveillanceDao;
@Override
public void write(List<? extends FoaSurveillance> foaSurveillances) {
ExecutionContext executionContext = stepExecution.getExecutionContext();
// Process data
}
@Override
public void beforeStep(StepExecution stepExecution) {
mJobExecution = stepExecution.getJobExecution();
this.stepExecution = stepExecution;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return ExitStatus.COMPLETED;
}
}
@Slf4j
@StepScope
@Component
public class ItemWriterSurveillanceEcheance implements ItemWriter<FoaSurveillanceEcheance>, StepExecutionListener {
String fileName;
JobExecution mJobExecution;
StepExecution stepExecution;
@Autowired
private FoaSurveillanceEcheanceDao foaSurveillanceEcheanceDao;
@Override
public void write(List<? extends FoaSurveillanceEcheance> foaSurveillanceEcheances) {
ExecutionContext executionContext = stepExecution.getExecutionContext();
// Process data
}
@Override
public void beforeStep(StepExecution stepExecution) {
mJobExecution = stepExecution.getJobExecution();
this.stepExecution = stepExecution;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return ExitStatus.COMPLETED;
}
}
组合两个 itemProcessor :
@Slf4j
@Component
public class CompositeItemProcessorSurveillance implements ItemProcessor<SurveillanceLineFile, CompositeResultSurveillance>, StepExecutionListener {
private StepExecution stepExecution;
@Autowired
ItemProcessorSurveillance itemProcessorSurveillance;
@Autowired
ItemProcessorSurveillanceEcheance itemProcessorSurveillanceEcheance;
@Override
public CompositeResultSurveillance process(SurveillanceLineFile surveillanceLineFile) throws Exception {
CompositeResultSurveillance compositeResultSurveillance = new CompositeResultSurveillance();
compositeResultSurveillance.setFoaSurveillance(itemProcessorSurveillance.process(surveillanceLineFile));
compositeResultSurveillance.setFoaSurveillanceEcheance(itemProcessorSurveillanceEcheance.process(surveillanceLineFile));
return compositeResultSurveillance;
}
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
}
ItemProcessorSurveillance:
@Slf4j
@Component
public class ItemProcessorSurveillance implements ItemProcessor<SurveillanceLineFile, FoaSurveillance>, StepExecutionListener {
String fileName;
private StepExecution stepExecution;
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return ExitStatus.COMPLETED;
}
@Override
public FoaSurveillance process(SurveillanceLineFile surveillanceLineFile) throws Exception {
ExecutionContext executionContext = stepExecution.getExecutionContext();
// Process Data
}
处理器返回的 CompositeResult :
@Getter
@Setter
public class CompositeResultSurveillance {
private FoaSurveillance foaSurveillance;
private FoaSurveillanceEcheance foaSurveillanceEcheance;
}
现在我在 ItemProcessorSurveillance 上有一个 NPE,因为 stepExecution
在 process 方法上为 null。
我不知道出了什么问题。有帮助吗?
这是因为您的 ItemProcessorSurveillance
实现了两个接口:ItemProcessor
和 StepExecutionListener
但仅在该步骤中注册为 ItemProcessor
。它还应该注册为侦听器,以便在适当的时候调用 beforeStep
来设置 stepExecution
字段。