Spring 批处理:FlatFileItemWriter header 从未调用过
Spring batch : FlatFileItemWriter header never called
我的 FlatFileItemWriter 回调有一个奇怪的问题。
我有一个实现 FlatFileFooterCallback 和 FlatFileHeaderCallback 的自定义 ItemWriter。因此,我在我的 FlatFileItemWriter 中设置 header 和页脚回调,如下所示:
ItemWriter Bean
@Bean
@StepScope
public ItemWriter<CityItem> writer(FlatFileItemWriter<CityProcessed> flatWriter, @Value("#{jobExecutionContext[inputFile]}") String inputFile) {
CityItemWriter itemWriter = new CityItemWriter();
flatWriter.setHeaderCallback(itemWriter);
flatWriter.setFooterCallback(itemWriter);
itemWriter.setDelegate(flatWriter);
itemWriter.setInputFileName(inputFile);
return itemWriter;
}
FlatFileItemWriter Bean
@Bean
@StepScope
public FlatFileItemWriter<CityProcessed> flatFileWriterArchive(@Value("#{jobExecutionContext[outputFileArchive]}") String outputFile) {
FlatFileItemWriter<CityProcessed> flatWriter = new FlatFileItemWriter<CityProcessed>();
FileSystemResource isr;
isr = new FileSystemResource(new File(outputFile));
flatWriter.setResource(isr);
DelimitedLineAggregator<CityProcessed> aggregator = new DelimitedLineAggregator<CityProcessed>();
aggregator.setDelimiter(";");
BeanWrapperFieldExtractor<CityProcessed> beanWrapper = new BeanWrapperFieldExtractor<CityProcessed>();
beanWrapper.setNames(new String[]{
"country", "name", "population", "popUnder25", "pop25To50", "pop50to75", "popMoreThan75"
});
aggregator.setFieldExtractor(beanWrapper);
flatWriter.setLineAggregator(aggregator);
flatWriter.setEncoding("ISO-8859-1");
return flatWriter;
}
步豆
@Bean
public Step stepImport(StepBuilderFactory stepBuilderFactory, ItemReader<CityFile> reader, ItemWriter<CityItem> writer, ItemProcessor<CityFile, CityItem> processor,
@Qualifier("flatFileWriterArchive") FlatFileItemWriter<CityProcessed> flatFileWriterArchive, ExecutionContextPromotionListener executionContextListener) {
return stepBuilderFactory.get("stepImport").<CityFile, CityItem> chunk(10).reader(reader(null)).processor(processor).writer(writer).stream(flatFileWriterArchive)
.listener(executionContextListener).build();
}
经典的内容在我的writeFooter、writeHeader和write方法中
ItemWriter代码
public class CityItemWriter implements ItemWriter<CityItem>, FlatFileFooterCallback, FlatFileHeaderCallback, ItemStream {
private FlatFileItemWriter<CityProcessed> writer;
private static int totalUnknown = 0;
private static int totalSup10000 = 0;
private static int totalInf10000 = 0;
private String inputFileName = "-";
public void setDelegate(FlatFileItemWriter<CityProcessed> delegate) {
writer = delegate;
}
public void setInputFileName(String name) {
inputFileName = name;
}
private Predicate<String> isNullValue() {
return p -> p == null;
}
@Override
public void write(List<? extends CityItem> cities) throws Exception {
List<CityProcessed> citiesCSV = new ArrayList<>();
for (CityItem item : cities) {
String populationAsString = "";
String less25AsString = "";
String more25AsString = "";
/*
* Some processing to get total Unknown/Sup 10000/Inf 10000
* and other data
*/
// Write in CSV file
CityProcessed cre = new CityProcessed();
cre.setCountry(item.getCountry());
cre.setName(item.getName());
cre.setPopulation(populationAsString);
cre.setLess25(less25AsString);
cre.setMore25(more25AsString);
citiesCSV.add(cre);
}
writer.write(citiesCSV);
}
@Override
public void writeFooter(Writer fileWriter) throws IOException {
String newLine = "\r\n";
String totalUnknown= "Subtotal:;Unknown;" + String.valueOf(nbUnknown) + newLine;
String totalSup10000 = ";Sum Sup 10000;" + String.valueOf(nbSup10000) + newLine;
String totalInf10000 = ";Sum Inf 10000;" + String.valueOf(nbInf10000) + newLine;
String total = "Total:;;" + String.valueOf(nbSup10000 + nbInf10000 + nbUnknown) + newLine;
fileWriter.write(newLine);
fileWriter.write(totalUnknown);
fileWriter.write(totalSup10000);
fileWriter.write(totalInf10000);
fileWriter.write(total );
}
@Override
public void writeHeader(Writer fileWriter) throws IOException {
String newLine = "\r\n";
String firstLine= "FILE PROCESSED ON: ;" + new SimpleDateFormat("MM/dd/yyyy").format(new Date()) + newLine;
String secondLine= "Filename: ;" + inputFileName + newLine;
String colNames= "Country;Name;Population...;...having less than 25;...having more than 25";
fileWriter.write(firstLine);
fileWriter.write(secondLine);
fileWriter.write(newLine);
fileWriter.write(colNames);
}
@Override
public void close() throws ItemStreamException {
writer.close();
}
@Override
public void open(ExecutionContext context) throws ItemStreamException {
writer.open(context);
}
@Override
public void update(ExecutionContext context) throws ItemStreamException {
writer.update(context);
}
}
当我 运行 我的批处理时,我只有每个城市的数据(写入方法部分)和页脚行。如果我注释掉 write 方法和页脚回调的全部内容,我仍然没有 header 行。我试图在我的 header 回调中添加一个 System.out.println() 文本,它看起来从未被调用过。
这是我的批处理生成的 CSV 文件的示例:
France;Paris;2240621;Unknown;Unknown
France;Toulouse;439553;Unknown;Unknown
Spain;Barcelona;1620943;Unknown;Unknown
Spain;Madrid;3207247;Unknown;Unknown
[...]
Subtotal:;Unknown;2
;Sum Sup 10000;81
;Sum Inf 10000;17
Total:;;100
奇怪的是,我的 header 曾经工作过,当时我添加了页脚和 header 回调。我没有更改它们,也没有看到我在代码中对 "broke" 我的 header 回调做了什么......当然,我没有保存我的第一个代码。因为我现在才看到我的header不见了(我查了我最后几个文件,好像我的header不见了一段时间但我没看到),我不能只需删除我的修改即可看到 when/why 它发生了。
你有解决这个问题的办法吗?
谢谢
按原样使用 Java 配置时,最好 return 可能的最具体的类型(与通常在 java 编程中被告知要做的相反) .在这种情况下,您的编写器正在 returning ItemWriter
,但属于步骤范围。因此,创建了一个代理,它只能看到您的 java 配置 return 的类型,在本例中是 ItemWriter
,并且不会公开 ItemStream
上的方法界面。如果你 return CityItemWriter
,我希望一切顺利。
我的 FlatFileItemWriter 回调有一个奇怪的问题。 我有一个实现 FlatFileFooterCallback 和 FlatFileHeaderCallback 的自定义 ItemWriter。因此,我在我的 FlatFileItemWriter 中设置 header 和页脚回调,如下所示:
ItemWriter Bean
@Bean
@StepScope
public ItemWriter<CityItem> writer(FlatFileItemWriter<CityProcessed> flatWriter, @Value("#{jobExecutionContext[inputFile]}") String inputFile) {
CityItemWriter itemWriter = new CityItemWriter();
flatWriter.setHeaderCallback(itemWriter);
flatWriter.setFooterCallback(itemWriter);
itemWriter.setDelegate(flatWriter);
itemWriter.setInputFileName(inputFile);
return itemWriter;
}
FlatFileItemWriter Bean
@Bean
@StepScope
public FlatFileItemWriter<CityProcessed> flatFileWriterArchive(@Value("#{jobExecutionContext[outputFileArchive]}") String outputFile) {
FlatFileItemWriter<CityProcessed> flatWriter = new FlatFileItemWriter<CityProcessed>();
FileSystemResource isr;
isr = new FileSystemResource(new File(outputFile));
flatWriter.setResource(isr);
DelimitedLineAggregator<CityProcessed> aggregator = new DelimitedLineAggregator<CityProcessed>();
aggregator.setDelimiter(";");
BeanWrapperFieldExtractor<CityProcessed> beanWrapper = new BeanWrapperFieldExtractor<CityProcessed>();
beanWrapper.setNames(new String[]{
"country", "name", "population", "popUnder25", "pop25To50", "pop50to75", "popMoreThan75"
});
aggregator.setFieldExtractor(beanWrapper);
flatWriter.setLineAggregator(aggregator);
flatWriter.setEncoding("ISO-8859-1");
return flatWriter;
}
步豆
@Bean
public Step stepImport(StepBuilderFactory stepBuilderFactory, ItemReader<CityFile> reader, ItemWriter<CityItem> writer, ItemProcessor<CityFile, CityItem> processor,
@Qualifier("flatFileWriterArchive") FlatFileItemWriter<CityProcessed> flatFileWriterArchive, ExecutionContextPromotionListener executionContextListener) {
return stepBuilderFactory.get("stepImport").<CityFile, CityItem> chunk(10).reader(reader(null)).processor(processor).writer(writer).stream(flatFileWriterArchive)
.listener(executionContextListener).build();
}
经典的内容在我的writeFooter、writeHeader和write方法中
ItemWriter代码
public class CityItemWriter implements ItemWriter<CityItem>, FlatFileFooterCallback, FlatFileHeaderCallback, ItemStream {
private FlatFileItemWriter<CityProcessed> writer;
private static int totalUnknown = 0;
private static int totalSup10000 = 0;
private static int totalInf10000 = 0;
private String inputFileName = "-";
public void setDelegate(FlatFileItemWriter<CityProcessed> delegate) {
writer = delegate;
}
public void setInputFileName(String name) {
inputFileName = name;
}
private Predicate<String> isNullValue() {
return p -> p == null;
}
@Override
public void write(List<? extends CityItem> cities) throws Exception {
List<CityProcessed> citiesCSV = new ArrayList<>();
for (CityItem item : cities) {
String populationAsString = "";
String less25AsString = "";
String more25AsString = "";
/*
* Some processing to get total Unknown/Sup 10000/Inf 10000
* and other data
*/
// Write in CSV file
CityProcessed cre = new CityProcessed();
cre.setCountry(item.getCountry());
cre.setName(item.getName());
cre.setPopulation(populationAsString);
cre.setLess25(less25AsString);
cre.setMore25(more25AsString);
citiesCSV.add(cre);
}
writer.write(citiesCSV);
}
@Override
public void writeFooter(Writer fileWriter) throws IOException {
String newLine = "\r\n";
String totalUnknown= "Subtotal:;Unknown;" + String.valueOf(nbUnknown) + newLine;
String totalSup10000 = ";Sum Sup 10000;" + String.valueOf(nbSup10000) + newLine;
String totalInf10000 = ";Sum Inf 10000;" + String.valueOf(nbInf10000) + newLine;
String total = "Total:;;" + String.valueOf(nbSup10000 + nbInf10000 + nbUnknown) + newLine;
fileWriter.write(newLine);
fileWriter.write(totalUnknown);
fileWriter.write(totalSup10000);
fileWriter.write(totalInf10000);
fileWriter.write(total );
}
@Override
public void writeHeader(Writer fileWriter) throws IOException {
String newLine = "\r\n";
String firstLine= "FILE PROCESSED ON: ;" + new SimpleDateFormat("MM/dd/yyyy").format(new Date()) + newLine;
String secondLine= "Filename: ;" + inputFileName + newLine;
String colNames= "Country;Name;Population...;...having less than 25;...having more than 25";
fileWriter.write(firstLine);
fileWriter.write(secondLine);
fileWriter.write(newLine);
fileWriter.write(colNames);
}
@Override
public void close() throws ItemStreamException {
writer.close();
}
@Override
public void open(ExecutionContext context) throws ItemStreamException {
writer.open(context);
}
@Override
public void update(ExecutionContext context) throws ItemStreamException {
writer.update(context);
}
}
当我 运行 我的批处理时,我只有每个城市的数据(写入方法部分)和页脚行。如果我注释掉 write 方法和页脚回调的全部内容,我仍然没有 header 行。我试图在我的 header 回调中添加一个 System.out.println() 文本,它看起来从未被调用过。
这是我的批处理生成的 CSV 文件的示例:
France;Paris;2240621;Unknown;Unknown
France;Toulouse;439553;Unknown;Unknown
Spain;Barcelona;1620943;Unknown;Unknown
Spain;Madrid;3207247;Unknown;Unknown
[...]
Subtotal:;Unknown;2
;Sum Sup 10000;81
;Sum Inf 10000;17
Total:;;100
奇怪的是,我的 header 曾经工作过,当时我添加了页脚和 header 回调。我没有更改它们,也没有看到我在代码中对 "broke" 我的 header 回调做了什么......当然,我没有保存我的第一个代码。因为我现在才看到我的header不见了(我查了我最后几个文件,好像我的header不见了一段时间但我没看到),我不能只需删除我的修改即可看到 when/why 它发生了。
你有解决这个问题的办法吗?
谢谢
按原样使用 Java 配置时,最好 return 可能的最具体的类型(与通常在 java 编程中被告知要做的相反) .在这种情况下,您的编写器正在 returning ItemWriter
,但属于步骤范围。因此,创建了一个代理,它只能看到您的 java 配置 return 的类型,在本例中是 ItemWriter
,并且不会公开 ItemStream
上的方法界面。如果你 return CityItemWriter
,我希望一切顺利。