Spring 批处理 - 读取字节流、处理、写入 2 个不同的 csv 文件,将它们转换为输入流并将其存储到 ECS,然后写入数据库
Spring Batch - Read a byte stream, process, write to 2 different csv files convert them to Input stream and store it to ECS and then write to Database
我有一个要求,我们通过 ECS S3 预签名 url 接收字节流形式的 csv 文件。我必须验证数据并将验证成功和失败的记录写入 2 个不同的 csv 文件,并通过将它们转换为 InputStream 将它们存储到 ECS S3 存储桶。还将成功记录写入数据库以及入站、成功和失败文件的预签名 urls。
我是 Spring Batch 的新手。我应该如何处理这个要求?
如果我选择 FlatFileItemReader 来读取,ItemProcessor 来处理数据,我应该如何写入不同的文件和数据库?
或
我应该使用 Tasklet 创建作业吗? TIA.
请在下面找到示例代码段。如果您遇到任何问题,请告诉我
//Your InputOutPut DTO This is the key object
Class BaseCSVDTO {
// yourCSVMappedFields
private SuccessCSVObject successObject;
private FailureCSVObject failureObject;
}
//Read the Files in reader as Normal better create a custom reader if you want to get more control
@Bean
public ItemReader<BaseCSVDTO> yourFlatFileItemReader() {
//populate mapped fields automatically by Springbatch
}
@Bean
public CSVProcessor csvValidationProcessor() {
return new CSVProcessor();
}
Class CSVProcessor implements ItemProcessor<BaseCSVDTO, BaseCSVDTO> {
@Override
public BaseCSVDTO CSVProcessor(BaseCSVDTO eachCSVitem) throws Exception {
//validateEachItem and put in Success or Failure Object
//Example of Success
SuccessCSVObject successObject = new SuccessCSVObject()
eachCSVitem.setSuccessObject(successObject);
//Same way for Failure object
}
}
@Bean
public CompositeItemWriter compositeWriter() throws Exception {
CompositeItemWriter compositeItemWriter = new CompositeItemWriter();
List<ItemWriter> writers = new ArrayList<ItemWriter>();
writers.add(successCSVWriter());
writers.add(failureCSVWriter());
compositeItemWriter.setDelegates(writers);
return compositeItemWriter;
}
@Bean
public YourItemWriter<BaseCSVDTO> successCSVWriter() {
return new SuccessWriter();
}
@Bean
public YourItemWriter<BaseCSVDTO> failureCSVWriter() {
return new FailureWriter;
}
public class SuccessWriter implements ItemWriter<BaseCSVDTO> {
@Override
public void write(List<? extends BaseCSVDTO> items){
for(BaseCSVDTO baseCSVDTO:items) {
baseCSVDTO.getSuccessObject
//write Success CSV
}
}
}
public class FailureWriter implements ItemWriter<BaseCSVDTO> {
@Override
public void write(List<? extends BaseCSVDTO> items){
for(BaseCSVDTO baseCSVDTO:items) {
//write Success CSV
baseCSVDTO.getFailureObject
}
}
}
/// Finally Job step
@Bean
public Step executionStep() throws Exception {
return stepBuilderFactory.get("executionStep").<BaseCSVDTO, BaseCSVDTO>chunk(chunkSize)
.reader(yourFlatFileItemReader()).processor(csvValidationProcessor()).writer(compositeWriter())
//.faultTolerant()
//.skipLimit(skipErrorCount).skip(Exception.class)//.noSkip(FileNotFoundException.class)
//.listener(validationListener())
//.noRetry(Exception.class)
//.noRollback(Exception.class)
.build();
}
我有一个要求,我们通过 ECS S3 预签名 url 接收字节流形式的 csv 文件。我必须验证数据并将验证成功和失败的记录写入 2 个不同的 csv 文件,并通过将它们转换为 InputStream 将它们存储到 ECS S3 存储桶。还将成功记录写入数据库以及入站、成功和失败文件的预签名 urls。
我是 Spring Batch 的新手。我应该如何处理这个要求?
如果我选择 FlatFileItemReader 来读取,ItemProcessor 来处理数据,我应该如何写入不同的文件和数据库?
或
我应该使用 Tasklet 创建作业吗? TIA.
请在下面找到示例代码段。如果您遇到任何问题,请告诉我
//Your InputOutPut DTO This is the key object
Class BaseCSVDTO {
// yourCSVMappedFields
private SuccessCSVObject successObject;
private FailureCSVObject failureObject;
}
//Read the Files in reader as Normal better create a custom reader if you want to get more control
@Bean
public ItemReader<BaseCSVDTO> yourFlatFileItemReader() {
//populate mapped fields automatically by Springbatch
}
@Bean
public CSVProcessor csvValidationProcessor() {
return new CSVProcessor();
}
Class CSVProcessor implements ItemProcessor<BaseCSVDTO, BaseCSVDTO> {
@Override
public BaseCSVDTO CSVProcessor(BaseCSVDTO eachCSVitem) throws Exception {
//validateEachItem and put in Success or Failure Object
//Example of Success
SuccessCSVObject successObject = new SuccessCSVObject()
eachCSVitem.setSuccessObject(successObject);
//Same way for Failure object
}
}
@Bean
public CompositeItemWriter compositeWriter() throws Exception {
CompositeItemWriter compositeItemWriter = new CompositeItemWriter();
List<ItemWriter> writers = new ArrayList<ItemWriter>();
writers.add(successCSVWriter());
writers.add(failureCSVWriter());
compositeItemWriter.setDelegates(writers);
return compositeItemWriter;
}
@Bean
public YourItemWriter<BaseCSVDTO> successCSVWriter() {
return new SuccessWriter();
}
@Bean
public YourItemWriter<BaseCSVDTO> failureCSVWriter() {
return new FailureWriter;
}
public class SuccessWriter implements ItemWriter<BaseCSVDTO> {
@Override
public void write(List<? extends BaseCSVDTO> items){
for(BaseCSVDTO baseCSVDTO:items) {
baseCSVDTO.getSuccessObject
//write Success CSV
}
}
}
public class FailureWriter implements ItemWriter<BaseCSVDTO> {
@Override
public void write(List<? extends BaseCSVDTO> items){
for(BaseCSVDTO baseCSVDTO:items) {
//write Success CSV
baseCSVDTO.getFailureObject
}
}
}
/// Finally Job step
@Bean
public Step executionStep() throws Exception {
return stepBuilderFactory.get("executionStep").<BaseCSVDTO, BaseCSVDTO>chunk(chunkSize)
.reader(yourFlatFileItemReader()).processor(csvValidationProcessor()).writer(compositeWriter())
//.faultTolerant()
//.skipLimit(skipErrorCount).skip(Exception.class)//.noSkip(FileNotFoundException.class)
//.listener(validationListener())
//.noRetry(Exception.class)
//.noRollback(Exception.class)
.build();
}