FlatFileItemWriter - Writer 必须先打开才能写入
FlatFileItemWriter - Writer must be open before it can be written to
我有一个 Spring批处理作业,我跳过所有重复项写入平面文件。
但是,只要存在重复项,FlatFileItemWriter 就会抛出以下错误:
Writer must be open before it can be written to
下面是Writer & SkipListener的配置-
@Bean(name = "duplicateItemWriter")
public FlatFileItemWriter<InventoryFileItem> dupItemWriter(){
return new FlatFileItemWriterBuilder<InventoryFileItem>()
.name("duplicateItemWriter")
.resource(new FileSystemResource("duplicateItem.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.append(true)
.shouldDeleteIfExists(true)
.build();
}
public class StepSkipListener implements SkipListener<InventoryFileItem, InventoryItem> {
private FlatFileItemWriter<InventoryFileItem> skippedItemsWriter;
public StepSkipListener(FlatFileItemWriter<InventoryFileItem> skippedItemsWriter) {
this.skippedItemsWriter = skippedItemsWriter;
}
@Override
public void onSkipInProcess(InventoryFileItem item, Throwable t) {
System.out.println(item.getBibNum() + " Process - " + t.getMessage());
try {
skippedItemsWriter.write(Collections.singletonList(item));
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
整个作业定义如下,我正在使用 SkipListener 中的 duplicateItemWriter。
@Bean(name = "fileLoadJob")
@Autowired
public Job fileLoadJob(JobBuilderFactory jobs, StepBuilderFactory steps,
FlatFileItemReader<inventoryFileItem> fileItemReader,
CompositeItemProcessor compositeItemProcessor,
@Qualifier(value = "itemWriter") ItemWriter<InventoryItem> itemWriter,
StepSkipListener skipListener) {
return jobs.get("libraryFileLoadJob")
.start(steps.get("step").<InventoryFileItem, InventoryItem>chunk(chunkSize)
.reader(FileItemReader)
.processor(compositeItemProcessor)
.writer(itemWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(Integer.parseInt(skipLimit))
.listener(skipListener)
.build())
.build();
}
我还尝试将所有数据写入 FlatFileItemWriter - 但效果不佳。但是,如果写入数据库,则没有问题。
我使用的Spring-批处理版本是 - 4.3.3
我也提到了以下主题:
- Spring Batch WriterNotOpenException
- FlatfileItemWriter with Compositewriter example
这只是粗略的疏忽,我错过了 FlatFileItemWriter 需要一个流。
提出这个问题我有点失望,但我发布了答案以防万一它对某人有帮助。
解决方案很简单,只需在作业定义中添加 stream(dupItemWriter)
。
FlatfileItemWriter with Compositewriter example
@Bean(name = "fileLoadJob")
@Autowired
public Job fileLoadJob(JobBuilderFactory jobs, StepBuilderFactory steps,
FlatFileItemReader<inventoryFileItem> fileItemReader,
CompositeItemProcessor compositeItemProcessor,
@Qualifier(value = "itemWriter") ItemWriter<InventoryItem> itemWriter,
@Qualifier(value = "duplicateItemWriter")FlatFileItemWriter<InventoryFileItem> dupItemWriter,
StepSkipListener skipListener) {
return jobs.get("libraryFileLoadJob")
.start(steps.get("step").<InventoryFileItem, InventoryItem>chunk(chunkSize)
.reader(FileItemReader)
.processor(compositeItemProcessor)
.writer(itemWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(Integer.parseInt(skipLimit))
.listener(skipListener)
.stream(dupItemWriter)
.build())
.build();
}
并非绝对有必要包含 .stream(dupItemWriter)
您也可以改为调用作者的 .open()
方法。
在我的例子中,我创建了 dynamic/programmatic 个 ItemWriters,因此将它们添加到流中是不可行的。
.stream(writer-1)
.stream(writer-2)
.stream(writer-N)
相反,我自己调用了 .open() 方法:
FlatFileItemWriter<OutputContact> itemWriter = new FlatFileItemWriter<>();
itemWriter.setResource(outPutResource);
itemWriter.setAppendAllowed(true);
itemWriter.setLineAggregator(lineAggregator);
itemWriter.setHeaderCallback(writer -> writer.write("ACCT,MEMBER,SOURCE"));
itemWriter.open(new ExecutionContext());
我有一个 Spring批处理作业,我跳过所有重复项写入平面文件。
但是,只要存在重复项,FlatFileItemWriter 就会抛出以下错误:
Writer must be open before it can be written to
下面是Writer & SkipListener的配置-
@Bean(name = "duplicateItemWriter")
public FlatFileItemWriter<InventoryFileItem> dupItemWriter(){
return new FlatFileItemWriterBuilder<InventoryFileItem>()
.name("duplicateItemWriter")
.resource(new FileSystemResource("duplicateItem.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.append(true)
.shouldDeleteIfExists(true)
.build();
}
public class StepSkipListener implements SkipListener<InventoryFileItem, InventoryItem> {
private FlatFileItemWriter<InventoryFileItem> skippedItemsWriter;
public StepSkipListener(FlatFileItemWriter<InventoryFileItem> skippedItemsWriter) {
this.skippedItemsWriter = skippedItemsWriter;
}
@Override
public void onSkipInProcess(InventoryFileItem item, Throwable t) {
System.out.println(item.getBibNum() + " Process - " + t.getMessage());
try {
skippedItemsWriter.write(Collections.singletonList(item));
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
整个作业定义如下,我正在使用 SkipListener 中的 duplicateItemWriter。
@Bean(name = "fileLoadJob")
@Autowired
public Job fileLoadJob(JobBuilderFactory jobs, StepBuilderFactory steps,
FlatFileItemReader<inventoryFileItem> fileItemReader,
CompositeItemProcessor compositeItemProcessor,
@Qualifier(value = "itemWriter") ItemWriter<InventoryItem> itemWriter,
StepSkipListener skipListener) {
return jobs.get("libraryFileLoadJob")
.start(steps.get("step").<InventoryFileItem, InventoryItem>chunk(chunkSize)
.reader(FileItemReader)
.processor(compositeItemProcessor)
.writer(itemWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(Integer.parseInt(skipLimit))
.listener(skipListener)
.build())
.build();
}
我还尝试将所有数据写入 FlatFileItemWriter - 但效果不佳。但是,如果写入数据库,则没有问题。
我使用的Spring-批处理版本是 - 4.3.3 我也提到了以下主题:
- Spring Batch WriterNotOpenException
- FlatfileItemWriter with Compositewriter example
这只是粗略的疏忽,我错过了 FlatFileItemWriter 需要一个流。 提出这个问题我有点失望,但我发布了答案以防万一它对某人有帮助。
解决方案很简单,只需在作业定义中添加 stream(dupItemWriter)
。
FlatfileItemWriter with Compositewriter example
@Bean(name = "fileLoadJob")
@Autowired
public Job fileLoadJob(JobBuilderFactory jobs, StepBuilderFactory steps,
FlatFileItemReader<inventoryFileItem> fileItemReader,
CompositeItemProcessor compositeItemProcessor,
@Qualifier(value = "itemWriter") ItemWriter<InventoryItem> itemWriter,
@Qualifier(value = "duplicateItemWriter")FlatFileItemWriter<InventoryFileItem> dupItemWriter,
StepSkipListener skipListener) {
return jobs.get("libraryFileLoadJob")
.start(steps.get("step").<InventoryFileItem, InventoryItem>chunk(chunkSize)
.reader(FileItemReader)
.processor(compositeItemProcessor)
.writer(itemWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(Integer.parseInt(skipLimit))
.listener(skipListener)
.stream(dupItemWriter)
.build())
.build();
}
并非绝对有必要包含 .stream(dupItemWriter)
您也可以改为调用作者的 .open()
方法。
在我的例子中,我创建了 dynamic/programmatic 个 ItemWriters,因此将它们添加到流中是不可行的。
.stream(writer-1)
.stream(writer-2)
.stream(writer-N)
相反,我自己调用了 .open() 方法:
FlatFileItemWriter<OutputContact> itemWriter = new FlatFileItemWriter<>();
itemWriter.setResource(outPutResource);
itemWriter.setAppendAllowed(true);
itemWriter.setLineAggregator(lineAggregator);
itemWriter.setHeaderCallback(writer -> writer.write("ACCT,MEMBER,SOURCE"));
itemWriter.open(new ExecutionContext());