从在 Spring 批次中实现 ItemWriter 的 BillerOrderWriter 调用时,记录不会写入文件

Records are not written in files when invoked from BillerOrderWriter which implements ItemWriter in Spring Batch

我正在尝试使用一个写入器写入成功的记录,而在另一个写入器中写入失败的记录。

我已经编写了实现 ItemWriter 的 BillerOrderWriter class。我放了一些日志语句,我可以看到它写成功 billerOrderId 或失败 billerOrderId 。但是,它似乎没有调用 DatabaseToCsvFileJobConfig 或 SuccessfulOrdersToCsvFileJobConfig 。


public class BillerOrderWriter implements ItemWriter<BillerOrder>{
    
    private static Logger log = LoggerFactory.getLogger("BillerOrderWriter.class");
    
    @Autowired
    SuccessfulOrdersToCsvFileJobConfig successfulOrdersToCsvFileJobConfig;
    
    @Autowired
    DatabaseToCsvFileJobConfig databaseToCsvFileJobConfig;
    
    @Override
    public void write(List<? extends BillerOrder> items) throws Exception {
                
        for (BillerOrder item : items) {
            log.info("item = " + item.toString());
            if (item.getResult().equals("SUCCESS")) {
                log.info(" Success billerOrderId = " + item.getBillerOrderId());
                successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter();
            } else {
                log.info("Failed billerOrderId = " + item.getBillerOrderId());
                databaseToCsvFileJobConfig.databaseCsvItemWriter();
            }
        }   
    }
}

这里是 BatchConfig class。

 @Bean
  public BillerOrderWriter billerOrderWriter() {
      return new BillerOrderWriter();
  }
  
 
  
  @Bean
  public Job importJobOrder(JobCompletionNotificationListner listener, Step step1) {
      return jobBuilderFactory.get("importJobOrder")
              .incrementer(new RunIdIncrementer())
              .listener(listener)
              .flow(step1)
              .end()
              .build(); 
  }
  
  @Bean(name="step1")
  public Step step1(BillerOrderWriter billerOrderWriter) {
      return stepBuilderFactory.get("step1")             
              .<BillerOrder, BillerOrder> chunk(10)
              .reader((ItemReader<? extends BillerOrder>) reader())
              .processor(processor())
              .writer(billerOrderWriter)
              .build();
  }
 

这是我的成功作家和失败作家 class .


@Configuration
public class SuccessfulOrdersToCsvFileJobConfig {
    private static Logger log = LoggerFactory.getLogger("SuccessfulOrdersToCsvFileJobConfig.class");
    
    @Bean
    public ItemWriter<BillerOrder> successfulDatabaseCsvItemWriter() {
        log.info("Entering SuccessfulOrdersToCsvFileJobConfig...");
        FlatFileItemWriter<BillerOrder> csvFileWriter = new FlatFileItemWriter<>();
        
        String exportFileHeader = "BillerOrderId;SuccessMessage";
        OrderWriter headerWriter = new OrderWriter(exportFileHeader);
        csvFileWriter.setHeaderCallback(headerWriter);
        
        String exportFilePath = "/tmp/SuccessBillerOrderIdForRetry.csv";
        csvFileWriter.setResource(new FileSystemResource(exportFilePath));
        
        LineAggregator<BillerOrder> lineAggregator = createOrderLineAggregator();
        csvFileWriter.setLineAggregator(lineAggregator);
        
        return csvFileWriter;
        
    }
    
    private LineAggregator<BillerOrder> createOrderLineAggregator() {
        log.info("Entering createOrderLineAggregator...");
        DelimitedLineAggregator<BillerOrder> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(";");
        
        FieldExtractor<BillerOrder> fieldExtractor = createOrderFieldExtractor();
        lineAggregator.setFieldExtractor(fieldExtractor);
        
        return lineAggregator;
    }
    
    private FieldExtractor<BillerOrder> createOrderFieldExtractor() {
        log.info("Entering createOrderFieldExtractor...");
        BeanWrapperFieldExtractor<BillerOrder> extractor = new BeanWrapperFieldExtractor<>();
        extractor.setNames(new String[] {"billerOrderId","successMessage"});
        return extractor;
    }
}

@Configuration
public class DatabaseToCsvFileJobConfig {
    private static Logger log = LoggerFactory.getLogger("DatabaseToCsvFileJobConfig.class");
    
    @Bean
    public ItemWriter<BillerOrder> databaseCsvItemWriter() {
        log.info("Entering databaseCsvItemWriter...");
        FlatFileItemWriter<BillerOrder> csvFileWriter = new FlatFileItemWriter<>();
        
        String exportFileHeader = "BillerOrderId;ErrorMessage";
        OrderWriter headerWriter = new OrderWriter(exportFileHeader);
        csvFileWriter.setHeaderCallback(headerWriter);
        
        String exportFilePath = "/tmp/FailedBillerOrderIdForRetry.csv";
        csvFileWriter.setResource(new FileSystemResource(exportFilePath));
        
        LineAggregator<BillerOrder> lineAggregator = createOrderLineAggregator();
        csvFileWriter.setLineAggregator(lineAggregator);
        
        return csvFileWriter;
        
    }
    
    private LineAggregator<BillerOrder> createOrderLineAggregator() {
        log.info("Entering createOrderLineAggregator...");
        DelimitedLineAggregator<BillerOrder> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(";");
        
        FieldExtractor<BillerOrder> fieldExtractor = createOrderFieldExtractor();
        lineAggregator.setFieldExtractor(fieldExtractor);
        
        return lineAggregator;
    }
    
    private FieldExtractor<BillerOrder> createOrderFieldExtractor() {
        log.info("Entering createOrderFieldExtractor...");
        BeanWrapperFieldExtractor<BillerOrder> extractor = new BeanWrapperFieldExtractor<>();
        extractor.setNames(new String[] {"billerOrderId","errorMessage"});
        return extractor;
    }
    
    
}

这是我的作业完成侦听器 class。

@Component
public class JobCompletionNotificationListner extends JobExecutionListenerSupport {

    private static final org.slf4j.Logger log = LoggerFactory.getLogger(JobCompletionNotificationListner.class);
    
    @Override
    public void afterJob(JobExecution jobExecution) {
        log.info("In afterJob ...");
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
                DatabaseToCsvFileJobConfig databaseToCsvFileJobConfig = new DatabaseToCsvFileJobConfig();
                SuccessfulOrdersToCsvFileJobConfig successfulOrdersToCsvFileJobConfig = new SuccessfulOrdersToCsvFileJobConfig();
        } 
    }
}

在您的 BillerOrderWriter#write 方法中,应该编写代码来执行将项目实际写入数据接收器的操作。但在您的情况下,您正在调用 successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter();databaseToCsvFileJobConfig.databaseCsvItemWriter(); 来创建项目编写器 bean。您应该注入那些委托编写器并在需要时调用他们的 write 方法,例如:

public class BillerOrderWriter implements ItemWriter<BillerOrder>{

   private ItemWriter<BillerOrder> successfulDatabaseCsvItemWriter;
   private ItemWriter<BillerOrder> databaseCsvItemWriter;
   // constructor with successfulDatabaseCsvItemWriter + databaseCsvItemWriter

   @Override
   public void write(List<? extends BillerOrder> items) throws Exception {
            
      for (BillerOrder item : items) {
        if (item.getResult().equals("SUCCESS")) {
            successfulDatabaseCsvItemWriter.write(Collections.singletonList(item));
        } else {
            databaseCsvItemWriter.write(Collections.singletonList(item));
        }
      }   
   }
}

我写的不是 BillerOrderWriter,而是 BillerOrderClassifier class。

public class BillerOrderClassifier implements Classifier<BillerOrder, ItemWriter<? super BillerOrder>> {
 
    private static final long serialVersionUID = 1L;
     
    private ItemWriter<BillerOrder> successItemWriter;
    private ItemWriter<BillerOrder> failedItemWriter;
 
    public BillerOrderClassifier(ItemWriter<BillerOrder> successItemWriter, ItemWriter<BillerOrder> failedItemWriter) {
        this.successItemWriter = successItemWriter;
        this.failedItemWriter = failedItemWriter;
    }
 
    @Override
    public ItemWriter<? super BillerOrder> classify(BillerOrder billerOrder) {
        return billerOrder.getResult().equals("SUCCESS") ? successItemWriter : failedItemWriter;
    }

}

在 BatchConfiguration 中,我写了 classifierBillerOrderCompositeItemWriter 方法。

@Bean
  public ClassifierCompositeItemWriter<BillerOrder> classifierBillerOrderCompositeItemWriter() throws Exception {
      ClassifierCompositeItemWriter<BillerOrder> compositeItemWriter = new ClassifierCompositeItemWriter<>();
      compositeItemWriter.setClassifier(new BillerOrderClassifier(successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter(), databaseToCsvFileJobConfig.databaseCsvItemWriter()));
      return compositeItemWriter;
  }

@Bean(name="step1")
  public Step step1() throws Exception{
    return stepBuilderFactory.get("step1")           
                  .<BillerOrder, BillerOrder> chunk(10)
                  .reader((ItemReader<? extends BillerOrder>) reader())
                  .processor(processor())
                  .writer(classifierBillerOrderCompositeItemWriter())
                  .stream(successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter())
                  .stream(databaseToCsvFileJobConfig.databaseCsvItemWriter())
                  .build();
  }