我可以使用 FlatFileItemWriter 在 Spring 批次中写入多格式文件吗?
Can I use FlatFileItemWriter to write multi-format file in Spring Batch?
我正在使用 FlatFileItemReader
读取多格式文件并将每一行映射到 ItemProcessor
中相应的 bean 类型并执行数据扩充。但是当我尝试使用 FlatFileItemWriter
将这些记录写入文件时,我无法为不同的记录类型分配单独的 BeanWrapperFieldExtractor
。我该如何解决这个问题?
输入文件格式
1#9999999#00001#2#RecordType1
2#00002#June#Statement#2020#9#RecordType2
3#7777777#RecordType3
预期的输出文件格式
1#9999999#00001#2#RecordType1#mobilenumber1
2#00002#June#Statement#2020#9#RecordType2#mobilenumber2
3#7777777#RecordType3#mobilenumber3
物品处理器
public class RecordTypeItemProcessor implements ItemProcessor<RecordType, RecordType> {
@Override
public RecordType process(RecordType recordType) throws Exception {
if (recordType instanceof RecordType1) {
RecordType1 recordType1 = (RecordType1) recordType;
//enrichment logic
return recordType1;
} else if (recordType instanceof RecordType2) {
RecordType2 recordType2 = (RecordType2) recordType;
//enrichment logic
return recordType2;
}
else
return null;
}
}```
您可以使用 ClassifierCompositeItemProcessor
和 ClassifierCompositeItemWriter
。这个想法是根据项目的类型使用 Classifier
对项目进行分类,并调用相应的项目 processor/writer。这种方法比在 processor/writer.
中使用多个 instance of
检查更干净。
有一些内置的 Classifier
实现(我认为 SubclassClassifier
对您来说是个不错的选择),但如果需要,您可以创建自己的实现。
您可以在 ClassifierCompositeItemProcessorTests and ClassifierCompositeItemWriterTests 中找到示例。
Mahmoud 建议的解决方案终于奏效了 :) 但是,有一些注意事项。 ClassifierCompositeItemWriter
旨在写入不同的文件。即,如果有 3 种不同的记录类型,将有 3 个不同的输出文件。但在我的用例中,我希望在单个文件中以相同的顺序输出。因此,我在每个 Writer bean 中提到了相同的文件名并添加了`writer.setAppendAllowed(true);但在这之后,不同的记录类型被组合在一起,排序顺序发生了变化。因此,我将块大小从 50 减少到 3。3 是记录类型的总数。但是,这会影响性能。通过这样的一些调整,我终于得到了想要的输出。这是我的实现(仅供参考,需要更多清理)
Configuration.java (StepBuilderFactory)
...chunk(3).reader(reader).processor(processor()).writer(writer).stream(recordType1FlatFileItemWriter()).stream(recordType2FlatFileItemWriter()).build();
处理器步骤
@Bean
@StepScope
public ItemProcessor processor() {
ClassifierCompositeItemProcessor<? extends RecordType, ? extends RecordType> processor = new ClassifierCompositeItemProcessor<>();
ItemProcessor<RecordType1, RecordType1> recordType1Processor = new ItemProcessor<RecordType1, RecordType1>() {
@Nullable
@Override
public RecordType1 process(RecordType1 recordType1) throws Exception {
//Processing logic
return recordType1;
}
};
ItemProcessor<RecordType2, RecordType2> recordType2Processor = new ItemProcessor<RecordType2, RecordType2>() {
@Nullable
@Override
public RecordType2 process(RecordType2 recordType2) throws Exception {
//Processing logic
return recordType2;
}
};
SubclassClassifier classifier = new SubclassClassifier();
Map typeMap = new HashMap();
typeMap.put(RecordType1.class, recordType1Processor);
typeMap.put(RecordType2.class, recordType2Processor);
classifier.setTypeMap(typeMap);
processor.setClassifier(classifier);
return processor;
}
作者步骤
@Bean
@StepScope
public ClassifierCompositeItemWriter writer(@Value("#{stepExecutionContext[fileName]}") Resource file) throws Exception {
ClassifierCompositeItemWriter<String> writer = new ClassifierCompositeItemWriter<>();
SubclassClassifier classifier = new SubclassClassifier<>();
Map typeMap = new HashMap<>();
typeMap.put(RecordType1.class, recordType1FlatFileItemWriter());
typeMap.put(RecordType2.class, recordType2FlatFileItemWriter());
typeMap.put(RecordType3.class, recordType3FlatFileItemWriter());
classifier.setTypeMap(typeMap);
writer.setClassifier(classifier);
return writer;
}
作家
@Bean
public FlatFileItemWriter<RecordType1> recordType1FlatFileItemWriter() throws Exception{
FlatFileItemWriter<RecordType1> writer = new FlatFileItemWriter<>();
writer.setResource( new FileSystemResource(outputFolder + "test.txt"));
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<RecordType1>() {{
setDelimiter("#");
setFieldExtractor(new BeanWrapperFieldExtractor<RecordType1>() {
{
setNames(new String[] { "RecordType", "ID1", "ID2", "ID3");
}
});
}});
return writer;
}
@Bean
public FlatFileItemWriter<RecordType2> recordType2FlatFileItemWriter() throws Exception{
FlatFileItemWriter<RecordType2> writer = new FlatFileItemWriter<>();
writer.setResource( new FileSystemResource(outputFolder + "test.txt"));
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<RecordType2>() {{
setDelimiter("#");
setFieldExtractor(new BeanWrapperFieldExtractor<RecordType2>() {
{
setNames(new String[] { "RecordType", "ID9", "ID8",);
}
});
}});
return writer;
}
我正在使用 FlatFileItemReader
读取多格式文件并将每一行映射到 ItemProcessor
中相应的 bean 类型并执行数据扩充。但是当我尝试使用 FlatFileItemWriter
将这些记录写入文件时,我无法为不同的记录类型分配单独的 BeanWrapperFieldExtractor
。我该如何解决这个问题?
输入文件格式
1#9999999#00001#2#RecordType1
2#00002#June#Statement#2020#9#RecordType2
3#7777777#RecordType3
预期的输出文件格式
1#9999999#00001#2#RecordType1#mobilenumber1
2#00002#June#Statement#2020#9#RecordType2#mobilenumber2
3#7777777#RecordType3#mobilenumber3
物品处理器
public class RecordTypeItemProcessor implements ItemProcessor<RecordType, RecordType> {
@Override
public RecordType process(RecordType recordType) throws Exception {
if (recordType instanceof RecordType1) {
RecordType1 recordType1 = (RecordType1) recordType;
//enrichment logic
return recordType1;
} else if (recordType instanceof RecordType2) {
RecordType2 recordType2 = (RecordType2) recordType;
//enrichment logic
return recordType2;
}
else
return null;
}
}```
您可以使用 ClassifierCompositeItemProcessor
和 ClassifierCompositeItemWriter
。这个想法是根据项目的类型使用 Classifier
对项目进行分类,并调用相应的项目 processor/writer。这种方法比在 processor/writer.
instance of
检查更干净。
有一些内置的 Classifier
实现(我认为 SubclassClassifier
对您来说是个不错的选择),但如果需要,您可以创建自己的实现。
您可以在 ClassifierCompositeItemProcessorTests and ClassifierCompositeItemWriterTests 中找到示例。
Mahmoud 建议的解决方案终于奏效了 :) 但是,有一些注意事项。 ClassifierCompositeItemWriter
旨在写入不同的文件。即,如果有 3 种不同的记录类型,将有 3 个不同的输出文件。但在我的用例中,我希望在单个文件中以相同的顺序输出。因此,我在每个 Writer bean 中提到了相同的文件名并添加了`writer.setAppendAllowed(true);但在这之后,不同的记录类型被组合在一起,排序顺序发生了变化。因此,我将块大小从 50 减少到 3。3 是记录类型的总数。但是,这会影响性能。通过这样的一些调整,我终于得到了想要的输出。这是我的实现(仅供参考,需要更多清理)
Configuration.java (StepBuilderFactory)
...chunk(3).reader(reader).processor(processor()).writer(writer).stream(recordType1FlatFileItemWriter()).stream(recordType2FlatFileItemWriter()).build();
处理器步骤
@Bean
@StepScope
public ItemProcessor processor() {
ClassifierCompositeItemProcessor<? extends RecordType, ? extends RecordType> processor = new ClassifierCompositeItemProcessor<>();
ItemProcessor<RecordType1, RecordType1> recordType1Processor = new ItemProcessor<RecordType1, RecordType1>() {
@Nullable
@Override
public RecordType1 process(RecordType1 recordType1) throws Exception {
//Processing logic
return recordType1;
}
};
ItemProcessor<RecordType2, RecordType2> recordType2Processor = new ItemProcessor<RecordType2, RecordType2>() {
@Nullable
@Override
public RecordType2 process(RecordType2 recordType2) throws Exception {
//Processing logic
return recordType2;
}
};
SubclassClassifier classifier = new SubclassClassifier();
Map typeMap = new HashMap();
typeMap.put(RecordType1.class, recordType1Processor);
typeMap.put(RecordType2.class, recordType2Processor);
classifier.setTypeMap(typeMap);
processor.setClassifier(classifier);
return processor;
}
作者步骤
@Bean
@StepScope
public ClassifierCompositeItemWriter writer(@Value("#{stepExecutionContext[fileName]}") Resource file) throws Exception {
ClassifierCompositeItemWriter<String> writer = new ClassifierCompositeItemWriter<>();
SubclassClassifier classifier = new SubclassClassifier<>();
Map typeMap = new HashMap<>();
typeMap.put(RecordType1.class, recordType1FlatFileItemWriter());
typeMap.put(RecordType2.class, recordType2FlatFileItemWriter());
typeMap.put(RecordType3.class, recordType3FlatFileItemWriter());
classifier.setTypeMap(typeMap);
writer.setClassifier(classifier);
return writer;
}
作家
@Bean
public FlatFileItemWriter<RecordType1> recordType1FlatFileItemWriter() throws Exception{
FlatFileItemWriter<RecordType1> writer = new FlatFileItemWriter<>();
writer.setResource( new FileSystemResource(outputFolder + "test.txt"));
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<RecordType1>() {{
setDelimiter("#");
setFieldExtractor(new BeanWrapperFieldExtractor<RecordType1>() {
{
setNames(new String[] { "RecordType", "ID1", "ID2", "ID3");
}
});
}});
return writer;
}
@Bean
public FlatFileItemWriter<RecordType2> recordType2FlatFileItemWriter() throws Exception{
FlatFileItemWriter<RecordType2> writer = new FlatFileItemWriter<>();
writer.setResource( new FileSystemResource(outputFolder + "test.txt"));
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<RecordType2>() {{
setDelimiter("#");
setFieldExtractor(new BeanWrapperFieldExtractor<RecordType2>() {
{
setNames(new String[] { "RecordType", "ID9", "ID8",);
}
});
}});
return writer;
}