远程分块工作器中的 FlatFileItemWriter
FlatFileItemWriter in Remote Chunking worker
我正在寻找 spring-在 Worker 中使用 FlatFileItemWriter 进行批量远程分块的示例实现。
我正在使用 RabbitMQ,Spring批处理,Spring 集成。
与
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
当我尝试时,我得到 org.springframework.batch.integration.chunk.AsynchronousFailureException:在处理程序中检测到故障或中断:>org.springframework.batch.item.WriterNotOpenException:写入器必须打开才能写入
异常。
@Bean
public FlatFileItemWriter<MyResponse> flatWriter() {
FlatFileItemWriter<MyResponse> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource("writer_queue_remote.csv"));
writer.setAppendAllowed(true);
writer.setHeaderCallback(new FlatFileHeaderCallback() {
@Override
public void writeHeader(Writer writer) throws IOException {
List<String> headersList = Arrays.asList("id","name");
String headers = headersList.stream().collect(Collectors.joining(","));
writer.write(headers);
}
});
writer.setLineAggregator(new DelimitedLineAggregator<MyResponse>() {
{
setDelimiter(",");
setFieldExtractor(new MyFieldExtractor());
}
});
return writer;
}
// Worker Integrationflow
@Bean
public IntegrationFlow chunkIntegrationFlow() {
// TODO Auto generated method stub
return this.remoteChunkingWorkerBuilder.itemProcessor(chunkProcessor()).itemWriter(flatWriter())
.inputChannel(requestChannel()).outputChannel(repliesChannel()).build();
}
我正在将 "MyResponse" 对象写入文件。
请通过指向示例实现或指导我纠正此异常来帮助我。 (异常原因是ItemWriter.OutputState没有初始化)
在远程分块设置中,工作端没有 Spring 批处理步骤来处理编写器的生命周期(open/close,等等)。所以你需要在你的 chunkIntegrationFlow
中使用它之前打开 writer。
我正在寻找 spring-在 Worker 中使用 FlatFileItemWriter 进行批量远程分块的示例实现。
我正在使用 RabbitMQ,Spring批处理,Spring 集成。
与
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
当我尝试时,我得到 org.springframework.batch.integration.chunk.AsynchronousFailureException:在处理程序中检测到故障或中断:>org.springframework.batch.item.WriterNotOpenException:写入器必须打开才能写入
异常。
@Bean
public FlatFileItemWriter<MyResponse> flatWriter() {
FlatFileItemWriter<MyResponse> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource("writer_queue_remote.csv"));
writer.setAppendAllowed(true);
writer.setHeaderCallback(new FlatFileHeaderCallback() {
@Override
public void writeHeader(Writer writer) throws IOException {
List<String> headersList = Arrays.asList("id","name");
String headers = headersList.stream().collect(Collectors.joining(","));
writer.write(headers);
}
});
writer.setLineAggregator(new DelimitedLineAggregator<MyResponse>() {
{
setDelimiter(",");
setFieldExtractor(new MyFieldExtractor());
}
});
return writer;
}
// Worker Integrationflow
@Bean
public IntegrationFlow chunkIntegrationFlow() {
// TODO Auto generated method stub
return this.remoteChunkingWorkerBuilder.itemProcessor(chunkProcessor()).itemWriter(flatWriter())
.inputChannel(requestChannel()).outputChannel(repliesChannel()).build();
}
我正在将 "MyResponse" 对象写入文件。
请通过指向示例实现或指导我纠正此异常来帮助我。 (异常原因是ItemWriter.OutputState没有初始化)
在远程分块设置中,工作端没有 Spring 批处理步骤来处理编写器的生命周期(open/close,等等)。所以你需要在你的 chunkIntegrationFlow
中使用它之前打开 writer。