Spring 批处理 Spring 引导在子进程使用 AsyncItemProcessor 之前终止
Spring batch with Spring Boot terminates before children process with AsyncItemProcessor
我正在使用 Spring 带有 AsyncItemProcessor 的批处理,但出现异常行为。让我先展示代码:
按照Spring Batch project上所示的简单示例进行操作:
@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
public static void main(String[] args) throws Exception {// NOSONAR
System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
//SpringApplication.run(PerfilEletricoApp.class, args);
}
}
-- 编辑
如果我只是让主进程休眠,请给 slf4j 几秒钟的时间来刷新日志,一切都会按预期进行。
@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
public static void main(String[] args) throws Exception {// NOSONAR
//System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
ConfigurableApplicationContext context = SpringApplication.run(PerfilEletricoApp.class, args);
Thread.sleep(1000 * 5);
System.exit(SpringApplication.exit(context));
}
}
-- 编辑结束
我正在读取一个带有字段的文本文件,然后使用 AsyncItemProcessor 进行多线程处理,其中包括 URL 上的 Http GET 以获取一些数据,我还使用了NoOpWriter 在写入部分什么都不做。我将 GET 的结果保存在作业的处理器部分(使用 log.trace / log.warn)。
@Configuration
public class HttpClientConfigurer {
// [... property and configs omitted]
@Bean
public CloseableHttpClient createHttpClient() {
// ... creates and returns a poolable http client etc
}
}
至于工作:
@Configuration
public class BatchJobConfigurer {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Value("${async.tps:10}")
private Integer tps;
@Value("${com.bemobi.perfilelerico.sourcedir:/AppServer/perfil-eletrico/source-dir/}")
private String sourceDir;
@Bean
public ItemReader<String> reader() {
MultiResourceItemReader<String> reader = new MultiResourceItemReader<>();
reader.setResources( new Resource[] { new FileSystemResource(sourceDir)});
reader.setDelegate((ResourceAwareItemReaderItemStream<? extends String>) flatItemReader());
return reader;
}
@Bean
public ItemReader<String> flatItemReader() {
FlatFileItemReader<String> itemReader = new FlatFileItemReader<>();
itemReader.setLineMapper(new DefaultLineMapper<String>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "sample-field-001"});
}});
setFieldSetMapper(new SimpleStringFieldSetMapper<>());
}});
return itemReader;
}
@Bean
public ItemProcessor asyncItemProcessor(){
AsyncItemProcessor<String, OiPaggoResponse> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(processor());
asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
return asyncItemProcessor;
}
@Bean
public ItemProcessor<String,OiPaggoResponse> processor(){
return new PerfilEletricoItemProcessor();
}
/**
* Using a NoOpItemWriter<T> so we satisfy spring batch flow but don't use writer for anything else.
* @return a NoOpItemWriter<OiPaggoResponse>
*/
@Bean
public ItemWriter<OiPaggoResponse> writer() {
return new NoOpItemWriter<>();
}
@Bean
protected Step step1() throws Exception {
/*
Problem starts here, If Use the processor() everything ends nicely, but if I insist on the asyncItemProcessor(), the job ends and the logs from processor are not stored on the disk.
*/
return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
.reader(reader())
.processor(asyncItemProcessor())
.build();
}
@Bean
public Job job() throws Exception {
return this.jobs.get("consulta-perfil-eletrico").start(step1()).build();
}
@Bean(name = "asyncExecutor")
public TaskExecutor getAsyncExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(tps);
executor.setMaxPoolSize(tps);
executor.setQueueCapacity(tps * 1000);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("AsyncExecutor-");
return executor;
}
}
-- 使用 AsyncItemWriter 更新(工作版本)
/*Wrapped Writer*/
@Bean
public ItemWriter asyncItemWriter(){
AsyncItemWriter<OiPaggoResponse> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(writer());
return asyncItemWriter;
}
/*AsyncItemWriter defined on the steps*/
@Bean
protected Step step1() throws Exception {
return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
.reader(reader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.build();
}
--
关于为什么 AsyncItemProcessor 在向上下文发送 OK-Completed 信号之前不等待所有子项完成的任何想法?
问题是 AsyncItemProcessor
正在创建没有人在等待的 Future
。将您的 NoOpItemWriter
包裹在 AsyncItemWriter
中,以便有人在等待 Future
。这将使作业按预期完成。
我正在使用 Spring 带有 AsyncItemProcessor 的批处理,但出现异常行为。让我先展示代码:
按照Spring Batch project上所示的简单示例进行操作:
@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
public static void main(String[] args) throws Exception {// NOSONAR
System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
//SpringApplication.run(PerfilEletricoApp.class, args);
}
}
-- 编辑
如果我只是让主进程休眠,请给 slf4j 几秒钟的时间来刷新日志,一切都会按预期进行。
@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
public static void main(String[] args) throws Exception {// NOSONAR
//System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
ConfigurableApplicationContext context = SpringApplication.run(PerfilEletricoApp.class, args);
Thread.sleep(1000 * 5);
System.exit(SpringApplication.exit(context));
}
}
-- 编辑结束
我正在读取一个带有字段的文本文件,然后使用 AsyncItemProcessor 进行多线程处理,其中包括 URL 上的 Http GET 以获取一些数据,我还使用了NoOpWriter 在写入部分什么都不做。我将 GET 的结果保存在作业的处理器部分(使用 log.trace / log.warn)。
@Configuration
public class HttpClientConfigurer {
// [... property and configs omitted]
@Bean
public CloseableHttpClient createHttpClient() {
// ... creates and returns a poolable http client etc
}
}
至于工作:
@Configuration
public class BatchJobConfigurer {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Value("${async.tps:10}")
private Integer tps;
@Value("${com.bemobi.perfilelerico.sourcedir:/AppServer/perfil-eletrico/source-dir/}")
private String sourceDir;
@Bean
public ItemReader<String> reader() {
MultiResourceItemReader<String> reader = new MultiResourceItemReader<>();
reader.setResources( new Resource[] { new FileSystemResource(sourceDir)});
reader.setDelegate((ResourceAwareItemReaderItemStream<? extends String>) flatItemReader());
return reader;
}
@Bean
public ItemReader<String> flatItemReader() {
FlatFileItemReader<String> itemReader = new FlatFileItemReader<>();
itemReader.setLineMapper(new DefaultLineMapper<String>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "sample-field-001"});
}});
setFieldSetMapper(new SimpleStringFieldSetMapper<>());
}});
return itemReader;
}
@Bean
public ItemProcessor asyncItemProcessor(){
AsyncItemProcessor<String, OiPaggoResponse> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(processor());
asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
return asyncItemProcessor;
}
@Bean
public ItemProcessor<String,OiPaggoResponse> processor(){
return new PerfilEletricoItemProcessor();
}
/**
* Using a NoOpItemWriter<T> so we satisfy spring batch flow but don't use writer for anything else.
* @return a NoOpItemWriter<OiPaggoResponse>
*/
@Bean
public ItemWriter<OiPaggoResponse> writer() {
return new NoOpItemWriter<>();
}
@Bean
protected Step step1() throws Exception {
/*
Problem starts here, If Use the processor() everything ends nicely, but if I insist on the asyncItemProcessor(), the job ends and the logs from processor are not stored on the disk.
*/
return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
.reader(reader())
.processor(asyncItemProcessor())
.build();
}
@Bean
public Job job() throws Exception {
return this.jobs.get("consulta-perfil-eletrico").start(step1()).build();
}
@Bean(name = "asyncExecutor")
public TaskExecutor getAsyncExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(tps);
executor.setMaxPoolSize(tps);
executor.setQueueCapacity(tps * 1000);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("AsyncExecutor-");
return executor;
}
}
-- 使用 AsyncItemWriter 更新(工作版本)
/*Wrapped Writer*/
@Bean
public ItemWriter asyncItemWriter(){
AsyncItemWriter<OiPaggoResponse> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(writer());
return asyncItemWriter;
}
/*AsyncItemWriter defined on the steps*/
@Bean
protected Step step1() throws Exception {
return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
.reader(reader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.build();
}
-- 关于为什么 AsyncItemProcessor 在向上下文发送 OK-Completed 信号之前不等待所有子项完成的任何想法?
问题是 AsyncItemProcessor
正在创建没有人在等待的 Future
。将您的 NoOpItemWriter
包裹在 AsyncItemWriter
中,以便有人在等待 Future
。这将使作业按预期完成。