FlatFileItemWriter - Writer 必须先打开才能写入

FlatFileItemWriter - Writer must be open before it can be written to

我有一个 Spring批处理作业,我跳过所有重复项写入平面文件。

但是,只要存在重复项,FlatFileItemWriter 就会抛出以下错误:

Writer must be open before it can be written to

下面是Writer & SkipListener的配置-

    @Bean(name = "duplicateItemWriter")
    public FlatFileItemWriter<InventoryFileItem> dupItemWriter(){

        return new FlatFileItemWriterBuilder<InventoryFileItem>()
                .name("duplicateItemWriter")
                .resource(new FileSystemResource("duplicateItem.txt"))
                .lineAggregator(new PassThroughLineAggregator<>())
                .append(true)
                .shouldDeleteIfExists(true)
                .build();
    }


public class StepSkipListener implements SkipListener<InventoryFileItem, InventoryItem> {

    private FlatFileItemWriter<InventoryFileItem> skippedItemsWriter;
    
    public StepSkipListener(FlatFileItemWriter<InventoryFileItem> skippedItemsWriter) {
        this.skippedItemsWriter = skippedItemsWriter;
    }
    @Override
    public void onSkipInProcess(InventoryFileItem item, Throwable t) {
        System.out.println(item.getBibNum() + " Process - " + t.getMessage());
        
        try {
            skippedItemsWriter.write(Collections.singletonList(item));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }


整个作业定义如下,我正在使用 SkipListener 中的 duplicateItemWriter。

@Bean(name = "fileLoadJob")
    @Autowired
    public Job fileLoadJob(JobBuilderFactory jobs, StepBuilderFactory steps,
            FlatFileItemReader<inventoryFileItem> fileItemReader,
            CompositeItemProcessor compositeItemProcessor,
            @Qualifier(value = "itemWriter") ItemWriter<InventoryItem> itemWriter,
            StepSkipListener skipListener) {

        return jobs.get("libraryFileLoadJob")
                .start(steps.get("step").<InventoryFileItem, InventoryItem>chunk(chunkSize)
                        .reader(FileItemReader)
                        .processor(compositeItemProcessor)
                        .writer(itemWriter)
                        .faultTolerant()
                        .skip(Exception.class)
                        .skipLimit(Integer.parseInt(skipLimit))
                        .listener(skipListener)
                        .build())
                .build();
    }

我还尝试将所有数据写入 FlatFileItemWriter - 但效果不佳。但是,如果写入数据库,则没有问题。

我使用的Spring-批处理版本是 - 4.3.3 我也提到了以下主题:

这只是粗略的疏忽,我错过了 FlatFileItemWriter 需要一个流。 提出这个问题我有点失望,但我发布了答案以防万一它对某人有帮助。

解决方案很简单,只需在作业定义中添加 stream(dupItemWriter)

FlatfileItemWriter with Compositewriter example

@Bean(name = "fileLoadJob")
    @Autowired
    public Job fileLoadJob(JobBuilderFactory jobs, StepBuilderFactory steps,
            FlatFileItemReader<inventoryFileItem> fileItemReader,
            CompositeItemProcessor compositeItemProcessor,
            @Qualifier(value = "itemWriter") ItemWriter<InventoryItem> itemWriter,
@Qualifier(value = "duplicateItemWriter")FlatFileItemWriter<InventoryFileItem> dupItemWriter,
            StepSkipListener skipListener) {

        return jobs.get("libraryFileLoadJob")
                .start(steps.get("step").<InventoryFileItem, InventoryItem>chunk(chunkSize)
                        .reader(FileItemReader)
                        .processor(compositeItemProcessor)
                        .writer(itemWriter)
                        .faultTolerant()
                        .skip(Exception.class)
                        .skipLimit(Integer.parseInt(skipLimit))
                        .listener(skipListener)
                        .stream(dupItemWriter)
                        .build())
                .build();
    }

并非绝对有必要包含 .stream(dupItemWriter) 您也可以改为调用作者的 .open() 方法。

在我的例子中,我创建了 dynamic/programmatic 个 ItemWriters,因此将它们添加到流中是不可行的。

.stream(writer-1)
.stream(writer-2)
.stream(writer-N)

相反,我自己调用了 .open() 方法:

FlatFileItemWriter<OutputContact> itemWriter = new FlatFileItemWriter<>();
itemWriter.setResource(outPutResource);
itemWriter.setAppendAllowed(true);
itemWriter.setLineAggregator(lineAggregator);
itemWriter.setHeaderCallback(writer -> writer.write("ACCT,MEMBER,SOURCE"));
itemWriter.open(new ExecutionContext());