Spring 批处理 Spring 引导在子进程使用 AsyncItemProcessor 之前终止

Spring batch with Spring Boot terminates before children process with AsyncItemProcessor

我正在使用 Spring 带有 AsyncItemProcessor 的批处理,但出现异常行为。让我先展示代码:

按照Spring Batch project上所示的简单示例进行操作:

@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
    public static void main(String[] args) throws Exception {// NOSONAR
        System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
        //SpringApplication.run(PerfilEletricoApp.class, args);
    }
}

-- 编辑

如果我只是让主进程休眠,请给 slf4j 几秒钟的时间来刷新日志,一切都会按预期进行。

@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {

    public static void main(String[] args) throws Exception {// NOSONAR
        //System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
        ConfigurableApplicationContext context = SpringApplication.run(PerfilEletricoApp.class, args);

        Thread.sleep(1000 * 5);
        System.exit(SpringApplication.exit(context));
    }

}

-- 编辑结束

我正在读取一个带有字段的文本文件,然后使用 AsyncItemProcessor 进行多线程处理,其中包括 URL 上的 Http GET 以获取一些数据,我还使用了NoOpWriter 在写入部分什么都不做。我将 GET 的结果保存在作业的处理器部分(使用 log.trace / log.warn)。

@Configuration
public class HttpClientConfigurer {
    // [... property and configs omitted] 
    @Bean
    public CloseableHttpClient createHttpClient() {
      // ... creates and returns a poolable http client etc
    }
}

至于工作:

@Configuration
public class BatchJobConfigurer {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Value("${async.tps:10}")
    private Integer tps;

    @Value("${com.bemobi.perfilelerico.sourcedir:/AppServer/perfil-eletrico/source-dir/}")
    private String sourceDir;

    @Bean
    public ItemReader<String> reader() {
        MultiResourceItemReader<String> reader = new MultiResourceItemReader<>();
        reader.setResources( new Resource[] { new FileSystemResource(sourceDir)});
        reader.setDelegate((ResourceAwareItemReaderItemStream<? extends String>) flatItemReader());
        return reader;
    }

    @Bean
    public ItemReader<String> flatItemReader() {
        FlatFileItemReader<String> itemReader = new FlatFileItemReader<>();
        itemReader.setLineMapper(new DefaultLineMapper<String>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[] { "sample-field-001"});
            }});
            setFieldSetMapper(new SimpleStringFieldSetMapper<>());
        }});
        return itemReader;
    }


    @Bean
    public ItemProcessor asyncItemProcessor(){
        AsyncItemProcessor<String, OiPaggoResponse> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(processor());
        asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemProcessor<String,OiPaggoResponse> processor(){
        return new PerfilEletricoItemProcessor();
    }

    /**
     * Using a NoOpItemWriter<T> so we satisfy spring batch flow but don't use writer for anything else.
     * @return a NoOpItemWriter<OiPaggoResponse>
     */
    @Bean
    public ItemWriter<OiPaggoResponse> writer() {
        return new NoOpItemWriter<>();
    }

    @Bean
    protected Step step1() throws Exception {
/*
 Problem starts here, If Use the processor() everything ends nicely, but if I insist on the asyncItemProcessor(), the job ends and the logs from processor are not stored on the disk.
*/
        return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
                .reader(reader())
                .processor(asyncItemProcessor())   
                .build();
    }

    @Bean
    public Job job() throws Exception {
        return this.jobs.get("consulta-perfil-eletrico").start(step1()).build();
    }

    @Bean(name = "asyncExecutor")
    public TaskExecutor getAsyncExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(tps);
        executor.setMaxPoolSize(tps);
        executor.setQueueCapacity(tps * 1000);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("AsyncExecutor-");
        return executor;
    }
}

-- 使用 AsyncItemWriter 更新(工作版本)

   /*Wrapped Writer*/
   @Bean
    public ItemWriter asyncItemWriter(){
        AsyncItemWriter<OiPaggoResponse> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(writer());
        return asyncItemWriter;
    }

    /*AsyncItemWriter defined on the steps*/
    @Bean
    protected Step step1() throws Exception {
        return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
                .reader(reader())
                .processor(asyncItemProcessor())
                .writer(asyncItemWriter())
                .build();
    }

-- 关于为什么 AsyncItemProcessor 在向上下文发送 OK-Completed 信号之前不等待所有子项完成的任何想法?

问题是 AsyncItemProcessor 正在创建没有人在等待的 Future。将您的 NoOpItemWriter 包裹在 AsyncItemWriter 中,以便有人在等待 Future。这将使作业按预期完成。