Spring 批处理:如何使用 FlatFileItemReader 读取 CSV 文件的页脚和验证
Spring Batch : How to read footer of CSV file and validation using FlatFileItemReader
我正在使用 Spring Batch 和 FlatFileItemReader 来读取 .CSV 文件。文件有 header(第一行)、详细信息和页脚(最后一行)。因此,我想通过页脚行验证详细信息总数。
这是我的示例 .csv 文件。
movie.csv
Name|Type|Year
Notting Hill|romantic comedy|1999
Toy Story 3|Animation|2010
Captain America: The First Avenger|Action|2011
3
来自示例文件
第一行是 header(我忽略它)。
第 2-4 行是详细信息行,最后是页脚。
我想阅读页脚并获取值(最后一行 = 3)
然后,获取详细信息的总数记录(在本例中我们有 3 行)
最后我将验证页脚 (3) 的总数和详细信息的总数记录 (3) 是否相等?
这是我的代码。
@Bean
@StepScope
public FlatFileItemReader<Movie> movieItemReader(String filePath) {
FlatFileItemReader<Movie> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1); //skip header line
reader.setResource(new PathResource(filePath));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer("|");
DefaultLineMapper<Movie> movieLineMapper = new DefaultLineMapper<>();
FieldSetMapper<Movie> movieMapper = movieFieldSetMapper();
movieLineMapper.setLineTokenizer(tokenizer);
movieLineMapper.setFieldSetMapper(movieFieldSetMapper);
movieLineMapper.afterPropertiesSet();
reader.setLineMapper(movieLineMapper);
return reader;
}
public FieldSetMapper<Movie> movieFieldSetMapper() {
BeanWrapperFieldSetMapper<Movie> movieMapper = new BeanWrapperFieldSetMapper<>();
movieMapper.setTargetType(Movie.class);
return movieMapper;
}
您可以在作业的业务逻辑之前使用面向块的步骤作为验证步骤。此步骤将使用 ItemReadListener
保存最后一项,并使用 StepExecutionListener
进行验证。这是一个简单的例子:
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.StepExecutionListenerSupport;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.PassThroughLineMapper;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
@StepScope
public FlatFileItemReader<String> itemReader() {
FlatFileItemReader<String> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1); //skip header line
reader.setResource(new ByteArrayResource("header\nitem1\nitem2\n2".getBytes()));
reader.setLineMapper(new PassThroughLineMapper());
return reader;
}
@Bean
public ItemWriter<String> itemWriter() {
return items -> {
for (String item : items) {
System.out.println("item = " + item);
}
};
}
@Bean
public Step step1() {
MyListener myListener = new MyListener();
return steps.get("step1")
.<String, String>chunk(5)
.reader(itemReader())
.writer(itemWriter())
.listener((ItemReadListener<String>) myListener)
.listener((StepExecutionListener) myListener)
.build();
}
@Bean
public Step step2() {
return steps.get("step2")
.tasklet((contribution, chunkContext) -> {
System.out.println("Total count is ok as validated by step1");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step1())
.next(step2())
.build();
}
static class MyListener extends StepExecutionListenerSupport implements ItemReadListener<String> {
private String lastItem;
@Override
public void beforeRead() {
}
@Override
public void afterRead(String item) {
this.lastItem = item;
}
@Override
public void onReadError(Exception ex) {
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
int readCount = stepExecution.getReadCount();
int totalCountInFooter = Integer.valueOf(this.lastItem); // TODO sanity checks (number format, etc)
System.out.println("readCount = " + (readCount - 1)); // substract footer from the read count
System.out.println("totalCountInFooter = " + totalCountInFooter);
// TODO do validation on readCount vs totalCountInFooter
return ExitStatus.COMPLETED; // return appropriate exit status according to validation result
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
此示例打印:
item = item1
item = item2
item = 2
readCount = 2
totalCountInFooter = 2
Total count is ok as validated by step1
希望对您有所帮助。
我正在使用 Spring Batch 和 FlatFileItemReader 来读取 .CSV 文件。文件有 header(第一行)、详细信息和页脚(最后一行)。因此,我想通过页脚行验证详细信息总数。
这是我的示例 .csv 文件。
movie.csv
Name|Type|Year
Notting Hill|romantic comedy|1999
Toy Story 3|Animation|2010
Captain America: The First Avenger|Action|2011
3
来自示例文件
第一行是 header(我忽略它)。
第 2-4 行是详细信息行,最后是页脚。
我想阅读页脚并获取值(最后一行 = 3)
然后,获取详细信息的总数记录(在本例中我们有 3 行)
最后我将验证页脚 (3) 的总数和详细信息的总数记录 (3) 是否相等?
这是我的代码。
@Bean
@StepScope
public FlatFileItemReader<Movie> movieItemReader(String filePath) {
FlatFileItemReader<Movie> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1); //skip header line
reader.setResource(new PathResource(filePath));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer("|");
DefaultLineMapper<Movie> movieLineMapper = new DefaultLineMapper<>();
FieldSetMapper<Movie> movieMapper = movieFieldSetMapper();
movieLineMapper.setLineTokenizer(tokenizer);
movieLineMapper.setFieldSetMapper(movieFieldSetMapper);
movieLineMapper.afterPropertiesSet();
reader.setLineMapper(movieLineMapper);
return reader;
}
public FieldSetMapper<Movie> movieFieldSetMapper() {
BeanWrapperFieldSetMapper<Movie> movieMapper = new BeanWrapperFieldSetMapper<>();
movieMapper.setTargetType(Movie.class);
return movieMapper;
}
您可以在作业的业务逻辑之前使用面向块的步骤作为验证步骤。此步骤将使用 ItemReadListener
保存最后一项,并使用 StepExecutionListener
进行验证。这是一个简单的例子:
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.StepExecutionListenerSupport;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.PassThroughLineMapper;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
@StepScope
public FlatFileItemReader<String> itemReader() {
FlatFileItemReader<String> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1); //skip header line
reader.setResource(new ByteArrayResource("header\nitem1\nitem2\n2".getBytes()));
reader.setLineMapper(new PassThroughLineMapper());
return reader;
}
@Bean
public ItemWriter<String> itemWriter() {
return items -> {
for (String item : items) {
System.out.println("item = " + item);
}
};
}
@Bean
public Step step1() {
MyListener myListener = new MyListener();
return steps.get("step1")
.<String, String>chunk(5)
.reader(itemReader())
.writer(itemWriter())
.listener((ItemReadListener<String>) myListener)
.listener((StepExecutionListener) myListener)
.build();
}
@Bean
public Step step2() {
return steps.get("step2")
.tasklet((contribution, chunkContext) -> {
System.out.println("Total count is ok as validated by step1");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step1())
.next(step2())
.build();
}
static class MyListener extends StepExecutionListenerSupport implements ItemReadListener<String> {
private String lastItem;
@Override
public void beforeRead() {
}
@Override
public void afterRead(String item) {
this.lastItem = item;
}
@Override
public void onReadError(Exception ex) {
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
int readCount = stepExecution.getReadCount();
int totalCountInFooter = Integer.valueOf(this.lastItem); // TODO sanity checks (number format, etc)
System.out.println("readCount = " + (readCount - 1)); // substract footer from the read count
System.out.println("totalCountInFooter = " + totalCountInFooter);
// TODO do validation on readCount vs totalCountInFooter
return ExitStatus.COMPLETED; // return appropriate exit status according to validation result
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
此示例打印:
item = item1
item = item2
item = 2
readCount = 2
totalCountInFooter = 2
Total count is ok as validated by step1
希望对您有所帮助。