远程分块工作器中的 FlatFileItemWriter

FlatFileItemWriter in Remote Chunking worker

我正在寻找 spring-在 Worker 中使用 FlatFileItemWriter 进行批量远程分块的示例实现。

我正在使用 RabbitMQ,Spring批处理,Spring 集成。

<spring-cloud.version>Greenwich.SR1</spring-cloud.version>

当我尝试时,我得到 org.springframework.batch.integration.chunk.AsynchronousFailureException:在处理程序中检测到故障或中断:>org.springframework.batch.item.WriterNotOpenException:写入器必须打开才能写入
异常。


        @Bean
    public FlatFileItemWriter<MyResponse> flatWriter() {
        FlatFileItemWriter<MyResponse> writer = new FlatFileItemWriter<>();

        writer.setResource(new FileSystemResource("writer_queue_remote.csv"));
        writer.setAppendAllowed(true);

        writer.setHeaderCallback(new FlatFileHeaderCallback() {

            @Override
            public void writeHeader(Writer writer) throws IOException {
                List<String> headersList = Arrays.asList("id","name");
                String headers = headersList.stream().collect(Collectors.joining(","));
                writer.write(headers);
            }
        });

        writer.setLineAggregator(new DelimitedLineAggregator<MyResponse>() {
            {
                setDelimiter(",");
                setFieldExtractor(new MyFieldExtractor());
            }
        });

        return writer;
    }


    // Worker Integrationflow
    @Bean
    public IntegrationFlow chunkIntegrationFlow() {
        // TODO Auto generated method stub
        return this.remoteChunkingWorkerBuilder.itemProcessor(chunkProcessor()).itemWriter(flatWriter())
                .inputChannel(requestChannel()).outputChannel(repliesChannel()).build();
    }

我正在将 "MyResponse" 对象写入文件。

请通过指向示例实现或指导我纠正此异常来帮助我。 (异常原因是ItemWriter.OutputState没有初始化)

在远程分块设置中,工作端没有 Spring 批处理步骤来处理编写器的生命周期(open/close,等等)。所以你需要在你的 chunkIntegrationFlow 中使用它之前打开 writer。