在 SpringBatch 中使用 ClassifierCompositeItemProcessor 时,通过 StepBuilderFactory stream() 将文件名动态传递给 FlatFileItemWriter
Pass filenames dynamically to FlatFileItemWriter through StepBuilderFactory stream() when using ClassifierCompositeItemProcessor in SpringBatch
我正在使用 ClassifierCompositeItemProcessor
处理具有多格式行的多个输入文件。但是当使用 StepBuilderFactory
流写入文件时,我无法动态传递 Resource
文件名。文件名应该是相应的输入文件名。任何帮助将不胜感激。
输入文件 1 (data-111111-12323.txt)
1#9999999#00001#2#RecordType1
2#00002#June#Statement#2020#9#RecordType2
3#7777777#RecordType3
输入文件 2 (data-22222-23244.txt)
1#435435#00002#2#RecordType1
2#345435#July#Statement#2021#9#RecordType2
3#645456#RecordType3
预期输出文件 1 (data-111111-12323.txt)
1#9999999#00001#2#RecordType1#mobilenumber1
2#00002#June#Statement#2020#9#RecordType2#mobilenumber2
3#7777777#RecordType3#mobilenumber3
预期输出文件 2 (data-22222-23244.txt)
1#9999999#00001#2#RecordType1#mobilenumber1
2#00002#June#Statement#2020#9#RecordType2#mobilenumber2
3#7777777#RecordType3#mobilenumber3
步骤
public Step partitionStep() throws Exception {
ItemReader reader = context.getBean(FlatFileItemReader.class);
ClassifierCompositeItemWriter writer = context.getBean(ClassifierCompositeItemWriter.class);
return stepBuilderFactory.get("statementProcessingStep.slave").<String, String>chunk(12).reader(reader).processor(processor()).writer(writer)
.stream(recordType0FlatFileItemWriter())
.stream(recordType1FlatFileItemWriter())
.build();
}
处理器
@Bean
@StepScope
public ItemProcessor processor() {
ClassifierCompositeItemProcessor<? extends RecordType, ? extends RecordType> processor = new ClassifierCompositeItemProcessor<>();
SubclassClassifier classifier = new SubclassClassifier();
Map typeMap = new HashMap();
typeMap.put(RecordType0.class, recordType0Processor);
typeMap.put(RecordType1.class, recordType1Processor);
classifier.setTypeMap(typeMap);
processor.setClassifier(classifier);
return processor;
}
作家
@Bean
public FlatFileItemWriter<RecordType1> recordType1FlatFileItemWriter() throws Exception{
FlatFileItemWriter<RecordType1> writer = new FlatFileItemWriter<>();
writer.setResource( new FileSystemResource("record1.txt")); //This filename should be dynamic
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<RecordType1>() {{
setDelimiter("#");
setFieldExtractor(new BeanWrapperFieldExtractor<RecordType1>() {
{
setNames(new String[] { "RecordType", "ID1", "ID2", "ID3"});
}
});
}});
return writer;
}
您可以使您的项目 reader/writer 步进范围并使用后期绑定从作业参数或 step/job 执行上下文中注入值。例如:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
.build();
}
您可以在参考文档的 Late Binding of Job and Step Attributes 部分找到更多详细信息。
我正在使用 ClassifierCompositeItemProcessor
处理具有多格式行的多个输入文件。但是当使用 StepBuilderFactory
流写入文件时,我无法动态传递 Resource
文件名。文件名应该是相应的输入文件名。任何帮助将不胜感激。
输入文件 1 (data-111111-12323.txt)
1#9999999#00001#2#RecordType1
2#00002#June#Statement#2020#9#RecordType2
3#7777777#RecordType3
输入文件 2 (data-22222-23244.txt)
1#435435#00002#2#RecordType1
2#345435#July#Statement#2021#9#RecordType2
3#645456#RecordType3
预期输出文件 1 (data-111111-12323.txt)
1#9999999#00001#2#RecordType1#mobilenumber1
2#00002#June#Statement#2020#9#RecordType2#mobilenumber2
3#7777777#RecordType3#mobilenumber3
预期输出文件 2 (data-22222-23244.txt)
1#9999999#00001#2#RecordType1#mobilenumber1
2#00002#June#Statement#2020#9#RecordType2#mobilenumber2
3#7777777#RecordType3#mobilenumber3
步骤
public Step partitionStep() throws Exception {
ItemReader reader = context.getBean(FlatFileItemReader.class);
ClassifierCompositeItemWriter writer = context.getBean(ClassifierCompositeItemWriter.class);
return stepBuilderFactory.get("statementProcessingStep.slave").<String, String>chunk(12).reader(reader).processor(processor()).writer(writer)
.stream(recordType0FlatFileItemWriter())
.stream(recordType1FlatFileItemWriter())
.build();
}
处理器
@Bean
@StepScope
public ItemProcessor processor() {
ClassifierCompositeItemProcessor<? extends RecordType, ? extends RecordType> processor = new ClassifierCompositeItemProcessor<>();
SubclassClassifier classifier = new SubclassClassifier();
Map typeMap = new HashMap();
typeMap.put(RecordType0.class, recordType0Processor);
typeMap.put(RecordType1.class, recordType1Processor);
classifier.setTypeMap(typeMap);
processor.setClassifier(classifier);
return processor;
}
作家
@Bean
public FlatFileItemWriter<RecordType1> recordType1FlatFileItemWriter() throws Exception{
FlatFileItemWriter<RecordType1> writer = new FlatFileItemWriter<>();
writer.setResource( new FileSystemResource("record1.txt")); //This filename should be dynamic
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<RecordType1>() {{
setDelimiter("#");
setFieldExtractor(new BeanWrapperFieldExtractor<RecordType1>() {
{
setNames(new String[] { "RecordType", "ID1", "ID2", "ID3"});
}
});
}});
return writer;
}
您可以使您的项目 reader/writer 步进范围并使用后期绑定从作业参数或 step/job 执行上下文中注入值。例如:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
.build();
}
您可以在参考文档的 Late Binding of Job and Step Attributes 部分找到更多详细信息。