我可以使用 FlatFileItemWriter 在 Spring 批次中写入多格式文件吗?

Can I use FlatFileItemWriter to write multi-format file in Spring Batch?

我正在使用 FlatFileItemReader 读取多格式文件并将每一行映射到 ItemProcessor 中相应的 bean 类型并执行数据扩充。但是当我尝试使用 FlatFileItemWriter 将这些记录写入文件时,我无法为不同的记录类型分配单独的 BeanWrapperFieldExtractor。我该如何解决这个问题?

输入文件格式

1#9999999#00001#2#RecordType1
2#00002#June#Statement#2020#9#RecordType2
3#7777777#RecordType3

预期的输出文件格式

1#9999999#00001#2#RecordType1#mobilenumber1
2#00002#June#Statement#2020#9#RecordType2#mobilenumber2
3#7777777#RecordType3#mobilenumber3

物品处理器

public class RecordTypeItemProcessor implements ItemProcessor<RecordType, RecordType> {

    @Override
    public RecordType process(RecordType recordType) throws Exception {
        if (recordType instanceof RecordType1) {
            RecordType1 recordType1 = (RecordType1) recordType;
            //enrichment logic
            return recordType1;
        } else if (recordType instanceof RecordType2) {
            RecordType2 recordType2 = (RecordType2) recordType;
            //enrichment logic
            return recordType2;
        }
        else
            return null;
    }
}```

您可以使用 ClassifierCompositeItemProcessorClassifierCompositeItemWriter。这个想法是根据项目的类型使用 Classifier 对项目进行分类,并调用相应的项目 processor/writer。这种方法比在 processor/writer.

中使用多个 instance of 检查更干净。

有一些内置的 Classifier 实现(我认为 SubclassClassifier 对您来说是个不错的选择),但如果需要,您可以创建自己的实现。

您可以在 ClassifierCompositeItemProcessorTests and ClassifierCompositeItemWriterTests 中找到示例。

Mahmoud 建议的解决方案终于奏效了 :) 但是,有一些注意事项。 ClassifierCompositeItemWriter 旨在写入不同的文件。即,如果有 3 种不同的记录类型,将有 3 个不同的输出文件。但在我的用例中,我希望在单个文件中以相同的顺序输出。因此,我在每个 Writer bean 中提到了相同的文件名并添加了`writer.setAppendAllowed(true);但在这之后,不同的记录类型被组合在一起,排序顺序发生了变化。因此,我将块大小从 50 减少到 3。3 是记录类型的总数。但是,这会影响性能。通过这样的一些调整,我终于得到了想要的输出。这是我的实现(仅供参考,需要更多清理)

Configuration.java (StepBuilderFactory)

...chunk(3).reader(reader).processor(processor()).writer(writer).stream(recordType1FlatFileItemWriter()).stream(recordType2FlatFileItemWriter()).build();

处理器步骤

@Bean
@StepScope
public ItemProcessor processor() {
            ClassifierCompositeItemProcessor<? extends RecordType, ? extends RecordType> processor = new ClassifierCompositeItemProcessor<>();
            ItemProcessor<RecordType1, RecordType1> recordType1Processor = new ItemProcessor<RecordType1, RecordType1>() {
                @Nullable
                @Override
                public RecordType1 process(RecordType1 recordType1) throws Exception {
                    //Processing logic
                    return recordType1;
                }
            };

            ItemProcessor<RecordType2, RecordType2> recordType2Processor = new ItemProcessor<RecordType2, RecordType2>() {
                @Nullable
                @Override
                public RecordType2 process(RecordType2 recordType2) throws Exception {
                    //Processing logic
                    return recordType2;
                }
            };

        SubclassClassifier classifier = new SubclassClassifier();
        Map typeMap = new HashMap();
        typeMap.put(RecordType1.class, recordType1Processor);
        typeMap.put(RecordType2.class, recordType2Processor);
        classifier.setTypeMap(typeMap);
        processor.setClassifier(classifier);
        return processor;
    }

作者步骤

    @Bean
    @StepScope
    public ClassifierCompositeItemWriter writer(@Value("#{stepExecutionContext[fileName]}") Resource file) throws Exception {
        ClassifierCompositeItemWriter<String> writer = new ClassifierCompositeItemWriter<>();
        SubclassClassifier classifier = new SubclassClassifier<>();
        Map typeMap = new HashMap<>();
        typeMap.put(RecordType1.class, recordType1FlatFileItemWriter());
        typeMap.put(RecordType2.class, recordType2FlatFileItemWriter());
        typeMap.put(RecordType3.class, recordType3FlatFileItemWriter());
        classifier.setTypeMap(typeMap);
        writer.setClassifier(classifier);
        return writer;
    }

作家

    @Bean
    public FlatFileItemWriter<RecordType1> recordType1FlatFileItemWriter() throws Exception{
        FlatFileItemWriter<RecordType1> writer = new FlatFileItemWriter<>();
        writer.setResource( new FileSystemResource(outputFolder + "test.txt"));
        writer.setAppendAllowed(true);
        writer.setLineAggregator(new DelimitedLineAggregator<RecordType1>() {{
            setDelimiter("#");
            setFieldExtractor(new BeanWrapperFieldExtractor<RecordType1>() {
                {
                    setNames(new String[] { "RecordType", "ID1", "ID2", "ID3");
                }
            });
        }});
        return  writer;
    }

    @Bean
    public FlatFileItemWriter<RecordType2> recordType2FlatFileItemWriter() throws Exception{
        FlatFileItemWriter<RecordType2> writer = new FlatFileItemWriter<>();
        writer.setResource( new FileSystemResource(outputFolder + "test.txt"));
        writer.setAppendAllowed(true);
        writer.setLineAggregator(new DelimitedLineAggregator<RecordType2>() {{
            setDelimiter("#");
            setFieldExtractor(new BeanWrapperFieldExtractor<RecordType2>() {
                {
                    setNames(new String[] { "RecordType", "ID9", "ID8",);
                }
            });
        }});
        return  writer;
    }