Spring 批处理 - 读取字节流、处理、写入 2 个不同的 csv 文件,将它们转换为输入流并将其存储到 ECS,然后写入数据库

Spring Batch - Read a byte stream, process, write to 2 different csv files convert them to Input stream and store it to ECS and then write to Database

我有一个要求,我们通过 ECS S3 预签名 url 接收字节流形式的 csv 文件。我必须验证数据并将验证成功和失败的记录写入 2 个不同的 csv 文件,并通过将它们转换为 InputStream 将它们存储到 ECS S3 存储桶。还将成功记录写入数据库以及入站、成功和失败文件的预签名 urls。

我是 Spring Batch 的新手。我应该如何处理这个要求?

如果我选择 FlatFileItemReader 来读取,ItemProcessor 来处理数据,我应该如何写入不同的文件和数据库?

我应该使用 Tasklet 创建作业吗? TIA.

请在下面找到示例代码段。如果您遇到任何问题,请告诉我

 //Your InputOutPut DTO This is the key object
   Class BaseCSVDTO {
    // yourCSVMappedFields  
    private SuccessCSVObject successObject;
    private FailureCSVObject failureObject;
   }

   //Read the Files in reader as Normal better create a custom reader if you want to get more control
    @Bean
    public ItemReader<BaseCSVDTO> yourFlatFileItemReader() {
        
         //populate mapped fields automatically by Springbatch
    }

    @Bean
    public CSVProcessor csvValidationProcessor() {
        return new CSVProcessor();
    }
    
    Class CSVProcessor implements ItemProcessor<BaseCSVDTO, BaseCSVDTO> {
        @Override
        public BaseCSVDTO CSVProcessor(BaseCSVDTO eachCSVitem) throws Exception {
            //validateEachItem and put in Success or Failure Object
            //Example of Success
                SuccessCSVObject successObject = new SuccessCSVObject()
                eachCSVitem.setSuccessObject(successObject);
            //Same way for Failure object   
        }
    }

   @Bean
    public CompositeItemWriter compositeWriter() throws Exception {
        CompositeItemWriter compositeItemWriter = new CompositeItemWriter();
        List<ItemWriter> writers = new ArrayList<ItemWriter>();
        writers.add(successCSVWriter());
        writers.add(failureCSVWriter());
        compositeItemWriter.setDelegates(writers);
        return compositeItemWriter;
    }

    @Bean
    public YourItemWriter<BaseCSVDTO> successCSVWriter() {
        return new SuccessWriter();
    }

    @Bean
    public YourItemWriter<BaseCSVDTO> failureCSVWriter() {
        return new FailureWriter;
    }

    
    public class SuccessWriter implements ItemWriter<BaseCSVDTO> {
        @Override
        public void write(List<? extends BaseCSVDTO> items){
        for(BaseCSVDTO baseCSVDTO:items) {
            baseCSVDTO.getSuccessObject
          //write Success CSV 
        }
        }
    }

  public class FailureWriter implements ItemWriter<BaseCSVDTO> {
        @Override
        public void write(List<? extends BaseCSVDTO> items){
        for(BaseCSVDTO baseCSVDTO:items) {
          //write Success CSV 
          baseCSVDTO.getFailureObject
        }
        }
    }

    /// Finally Job step
    @Bean
    public Step executionStep() throws Exception {
        return stepBuilderFactory.get("executionStep").<BaseCSVDTO, BaseCSVDTO>chunk(chunkSize)
                .reader(yourFlatFileItemReader()).processor(csvValidationProcessor()).writer(compositeWriter())
                //.faultTolerant()
                //.skipLimit(skipErrorCount).skip(Exception.class)//.noSkip(FileNotFoundException.class)
                //.listener(validationListener())
                //.noRetry(Exception.class)
                //.noRollback(Exception.class)
                .build();
    }