运行 如果开始新的 运行 相同的作业 - Spring 批处理,作业将停止
Running job stops if start new running the same job - Spring Batch
问题
我有点困惑,因为当通过 HTTP 请求开始执行 Spring 批处理作业时,如果我收到另一个 HTTP 请求来启动同一个作业,但在作业执行时使用不同的参数, 正在执行的作业停止未完成并开始处理新作业。
上下文
我开发了一个 API REST 来加载和处理 Excel 文件的内容。 Web 服务公开两个端点,一个用于在数据库中加载、验证和存储 Excel 文件的内容,另一个用于开始处理存储在数据库中的记录。
它是如何工作的
POST /api/excel/upload
此端点接收 Excel 文件。收到请求后,每个文件都会分配一个唯一标识符,并验证其内容。如果内容正确,它会将其插入一个临时 table 等待处理。
GET /api/Excel/process?id=x
此端点接收要处理的文件的标识符。收到请求后,将启动 Spring 批处理作业以处理临时 table.
中的记录
一些代码
- 控制器
@PostMapping(produces = {APPLICATION_JSON_VALUE})
public ResponseEntity<Page<ExcelLoad>> post(@RequestParam("file") MultipartFile multipartFile)
{
return super.getResponse().returnPage(service.upload(multipartFile));
}
@GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
public DeferredResult<ResponseEntity<Void>> get(@RequestParam("id") Integer idCarga)
{
DeferredResult<ResponseEntity<Void>> response = new DeferredResult<>(1000L);
response.onTimeout(() -> response.setResult(super.getResponse().returnVoid()));
ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));
return response;
}
我使用DeferredResult在收到请求后向客户端发送响应,而无需等待作业完成
- 服务
public void startJob(int idCarga)
{
JobParameters params = new JobParametersBuilder()
.addString("mainJob", String.valueOf(System.currentTimeMillis()))
.addString("idCarga", String.valueOf(idCarga))
.toJobParameters();
try
{
jobLauncher.run(job, params);
}
catch (JobExecutionException e)
{
log.error("---ERROR: {}", e.getMessage());
}
}
- 批量
@Bean
public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
{
return stepBuilderFactory.get("step")
.<List<ExcelLoad>, Invoice>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant().skipPolicy(new ExceptionSkipPolicy())
.listener(stepSkipListener)
.build();
}
@Bean
public Job mainJob(Step mainStep)
{
return jobBuilderFactory.get("mainJob")
.listener(mainJobExecutionListener)
.incrementer(new RunIdIncrementer())
.start(mainStep)
.build();
}
执行一些测试我观察到以下行为:
如果我请求端点/进程在不同的时间处理每个文件:在这种情况下,存储在临时table中的所有记录都被处理:
- 记录已处理文件 1:3606(预期 3606)。
- 记录已处理 file2:1776(预计 1776)。
如果我向端点 /process 发出请求以首先处理 file1,并且在它完成之前我发出另一个请求来处理 file2:在这种情况下,并非所有记录都存储在临时 table 被处理:
- 记录处理文件1:1080(预计3606)
- 记录处理文件 2:1774(预计 1776)
JobLauncher
不会停止作业执行,它只会启动它们。 Spring Batch 提供的默认作业启动器是 SimpleJobLauncher
,它将作业启动委托给 TaskExecutor
。现在,根据您使用的任务执行器实现及其启动并发任务的配置方式,您可以看到不同的行为。例如,当你启动一个新的作业执行并向任务执行器提交一个新任务时,任务执行器可以决定如果所有工作人员都忙则拒绝本次提交,或者将其放入等待队列,或者停止另一个任务并提交新的那一个。这些策略取决于几个参数(TaskExecutor
实施、幕后使用的队列类型、RejectedExecutionHandler
实施等)。
在你的情况下,你似乎使用了以下内容:
ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));
因此您需要检查此池的行为,了解它如何处理新任务提交(我想这就是停止您的工作的原因,但您需要确认这一点)。也就是说,我不明白你为什么需要这个。如果您的要求如下:
I use DeferredResult to send a response to the client after receiving the request without waiting for the job to finish
然后您可以在作业启动器中使用异步任务执行器实现(如 ThreadPoolTaskExecutor
),请参阅 Running Jobs from within a Web Container。
感谢@Mahmoud Ben Hassine 的帮助,我得以解决问题。为了帮助实施,以防万一有人提出这个问题,我分享了代码,就我而言,它已经解决了这个问题:
- 控制器
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
public void get(@RequestParam("id") Integer idCarga) throws JobExecutionException
{
JobParameters params = new JobParametersBuilder()
.addString("mainJob", String.valueOf(System.currentTimeMillis()))
.addString("idCarga", String.valueOf(idCarga))
.toJobParameters();
jobLauncher.run(job, params);
}
- 批量配置、作业和步骤
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer
{
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private StepSkipListener stepSkipListener;
@Autowired
private MainJobExecutionListener mainJobExecutionListener;
@Bean
public TaskExecutor taskExecutor()
{
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(10);
taskExecutor.setThreadNamePrefix("batch-thread-");
return taskExecutor;
}
@Bean
public JobLauncher jobLauncher() throws Exception
{
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.setTaskExecutor(taskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
{
return stepBuilderFactory.get("step")
.<List<ExcelLoad>, Invoice>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant().skipPolicy(new ExceptionSkipPolicy())
.listener(stepSkipListener)
.build();
}
@Bean
public Job mainJob(Step mainStep)
{
return jobBuilderFactory.get("mainJob")
.listener(mainJobExecutionListener)
.incrementer(new RunIdIncrementer())
.start(mainStep)
.build();
}
}
如果在应用此代码后,正如我遇到的那样,您在将记录插入数据库时也遇到了问题,您可以查看此 question,我也将适用于我的代码放在这里。
问题
我有点困惑,因为当通过 HTTP 请求开始执行 Spring 批处理作业时,如果我收到另一个 HTTP 请求来启动同一个作业,但在作业执行时使用不同的参数, 正在执行的作业停止未完成并开始处理新作业。
上下文
我开发了一个 API REST 来加载和处理 Excel 文件的内容。 Web 服务公开两个端点,一个用于在数据库中加载、验证和存储 Excel 文件的内容,另一个用于开始处理存储在数据库中的记录。
它是如何工作的
POST /api/excel/upload 此端点接收 Excel 文件。收到请求后,每个文件都会分配一个唯一标识符,并验证其内容。如果内容正确,它会将其插入一个临时 table 等待处理。
GET /api/Excel/process?id=x 此端点接收要处理的文件的标识符。收到请求后,将启动 Spring 批处理作业以处理临时 table.
中的记录
一些代码
- 控制器
@PostMapping(produces = {APPLICATION_JSON_VALUE})
public ResponseEntity<Page<ExcelLoad>> post(@RequestParam("file") MultipartFile multipartFile)
{
return super.getResponse().returnPage(service.upload(multipartFile));
}
@GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
public DeferredResult<ResponseEntity<Void>> get(@RequestParam("id") Integer idCarga)
{
DeferredResult<ResponseEntity<Void>> response = new DeferredResult<>(1000L);
response.onTimeout(() -> response.setResult(super.getResponse().returnVoid()));
ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));
return response;
}
我使用DeferredResult在收到请求后向客户端发送响应,而无需等待作业完成
- 服务
public void startJob(int idCarga)
{
JobParameters params = new JobParametersBuilder()
.addString("mainJob", String.valueOf(System.currentTimeMillis()))
.addString("idCarga", String.valueOf(idCarga))
.toJobParameters();
try
{
jobLauncher.run(job, params);
}
catch (JobExecutionException e)
{
log.error("---ERROR: {}", e.getMessage());
}
}
- 批量
@Bean
public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
{
return stepBuilderFactory.get("step")
.<List<ExcelLoad>, Invoice>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant().skipPolicy(new ExceptionSkipPolicy())
.listener(stepSkipListener)
.build();
}
@Bean
public Job mainJob(Step mainStep)
{
return jobBuilderFactory.get("mainJob")
.listener(mainJobExecutionListener)
.incrementer(new RunIdIncrementer())
.start(mainStep)
.build();
}
执行一些测试我观察到以下行为:
如果我请求端点/进程在不同的时间处理每个文件:在这种情况下,存储在临时table中的所有记录都被处理:
- 记录已处理文件 1:3606(预期 3606)。
- 记录已处理 file2:1776(预计 1776)。
如果我向端点 /process 发出请求以首先处理 file1,并且在它完成之前我发出另一个请求来处理 file2:在这种情况下,并非所有记录都存储在临时 table 被处理:
- 记录处理文件1:1080(预计3606)
- 记录处理文件 2:1774(预计 1776)
JobLauncher
不会停止作业执行,它只会启动它们。 Spring Batch 提供的默认作业启动器是 SimpleJobLauncher
,它将作业启动委托给 TaskExecutor
。现在,根据您使用的任务执行器实现及其启动并发任务的配置方式,您可以看到不同的行为。例如,当你启动一个新的作业执行并向任务执行器提交一个新任务时,任务执行器可以决定如果所有工作人员都忙则拒绝本次提交,或者将其放入等待队列,或者停止另一个任务并提交新的那一个。这些策略取决于几个参数(TaskExecutor
实施、幕后使用的队列类型、RejectedExecutionHandler
实施等)。
在你的情况下,你似乎使用了以下内容:
ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));
因此您需要检查此池的行为,了解它如何处理新任务提交(我想这就是停止您的工作的原因,但您需要确认这一点)。也就是说,我不明白你为什么需要这个。如果您的要求如下:
I use DeferredResult to send a response to the client after receiving the request without waiting for the job to finish
然后您可以在作业启动器中使用异步任务执行器实现(如 ThreadPoolTaskExecutor
),请参阅 Running Jobs from within a Web Container。
感谢@Mahmoud Ben Hassine
- 控制器
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
public void get(@RequestParam("id") Integer idCarga) throws JobExecutionException
{
JobParameters params = new JobParametersBuilder()
.addString("mainJob", String.valueOf(System.currentTimeMillis()))
.addString("idCarga", String.valueOf(idCarga))
.toJobParameters();
jobLauncher.run(job, params);
}
- 批量配置、作业和步骤
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer
{
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private StepSkipListener stepSkipListener;
@Autowired
private MainJobExecutionListener mainJobExecutionListener;
@Bean
public TaskExecutor taskExecutor()
{
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(10);
taskExecutor.setThreadNamePrefix("batch-thread-");
return taskExecutor;
}
@Bean
public JobLauncher jobLauncher() throws Exception
{
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.setTaskExecutor(taskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
{
return stepBuilderFactory.get("step")
.<List<ExcelLoad>, Invoice>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant().skipPolicy(new ExceptionSkipPolicy())
.listener(stepSkipListener)
.build();
}
@Bean
public Job mainJob(Step mainStep)
{
return jobBuilderFactory.get("mainJob")
.listener(mainJobExecutionListener)
.incrementer(new RunIdIncrementer())
.start(mainStep)
.build();
}
}
如果在应用此代码后,正如我遇到的那样,您在将记录插入数据库时也遇到了问题,您可以查看此 question,我也将适用于我的代码放在这里。