运行 如果开始新的 运行 相同的作业 - Spring 批处理,作业将停止

Running job stops if start new running the same job - Spring Batch

问题

我有点困惑,因为当通过 HTTP 请求开始执行 Spring 批处理作业时,如果我收到另一个 HTTP 请求来启动同一个作业,但在作业执行时使用不同的参数, 正在执行的作业停止未完成并开始处理新作业。

上下文

我开发了一个 API REST 来加载和处理 Excel 文件的内容。 Web 服务公开两个端点,一个用于在数据库中加载、验证和存储 Excel 文件的内容,另一个用于开始处理存储在数据库中的记录。

它是如何工作的

一些代码

@PostMapping(produces = {APPLICATION_JSON_VALUE})
public ResponseEntity<Page<ExcelLoad>> post(@RequestParam("file") MultipartFile multipartFile)
{
    return super.getResponse().returnPage(service.upload(multipartFile));
}

@GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
public DeferredResult<ResponseEntity<Void>> get(@RequestParam("id") Integer idCarga)
{
    DeferredResult<ResponseEntity<Void>> response = new DeferredResult<>(1000L);
    response.onTimeout(() -> response.setResult(super.getResponse().returnVoid()));

    ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));

    return response;
}

我使用DeferredResult在收到请求后向客户端发送响应,而无需等待作业完成

public void startJob(int idCarga)
{
    JobParameters params = new JobParametersBuilder()
            .addString("mainJob", String.valueOf(System.currentTimeMillis()))
            .addString("idCarga", String.valueOf(idCarga))
            .toJobParameters();

    try
    {
        jobLauncher.run(job, params);
    }
    catch (JobExecutionException e)
    {
        log.error("---ERROR: {}", e.getMessage());
    }
}
@Bean
public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
{
    return stepBuilderFactory.get("step")
            .<List<ExcelLoad>, Invoice>chunk(10)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant().skipPolicy(new ExceptionSkipPolicy())
            .listener(stepSkipListener)
            .build();
}

@Bean
public Job mainJob(Step mainStep)
{
    return jobBuilderFactory.get("mainJob")
                            .listener(mainJobExecutionListener)
                            .incrementer(new RunIdIncrementer())
                            .start(mainStep)
                            .build();
}

执行一些测试我观察到以下行为:

  1. 如果我请求端点/进程在不同的时间处理每个文件:在这种情况下,存储在临时table中的所有记录都被处理:

    • 记录已处理文件 1:3606(预期 3606)。
    • 记录已处理 file2:1776(预计 1776)。
  2. 如果我向端点 /process 发出请求以首先处理 file1,并且在它完成之前我发出另一个请求来处理 file2:在这种情况下,并非所有记录都存储在临时 table 被处理:

    • 记录处理文件1:1080(预计3606)
    • 记录处理文件 2:1774(预计 1776)

JobLauncher 不会停止作业执行,它只会启动它们。 Spring Batch 提供的默认作业启动器是 SimpleJobLauncher,它将作业启动委托给 TaskExecutor。现在,根据您使用的任务执行器实现及其启动并发任务的配置方式,您可以看到不同的行为。例如,当你启动一个新的作业执行并向任务执行器提交一个新任务时,任务执行器可以决定如果所有工作人员都忙则拒绝本次提交,或者将其放入等待队列,或者停止另一个任务并提交新的那一个。这些策略取决于几个参数(TaskExecutor 实施、幕后使用的队列类型、RejectedExecutionHandler 实施等)。

在你的情况下,你似乎使用了以下内容:

ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));

因此您需要检查此池的行为,了解它如何处理新任务提交(我想这就是停止您的工作的原因,但您需要确认这一点)。也就是说,我不明白你为什么需要这个。如果您的要求如下:

I use DeferredResult to send a response to the client after receiving the request without waiting for the job to finish

然后您可以在作业启动器中使用异步任务执行器实现(如 ThreadPoolTaskExecutor),请参阅 Running Jobs from within a Web Container

感谢@Mahmoud Ben Hassine 的帮助,我得以解决问题。为了帮助实施,以防万一有人提出这个问题,我分享了代码,就我而言,它已经解决了这个问题:

  • 控制器
@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job job;

@GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
public void get(@RequestParam("id") Integer idCarga) throws JobExecutionException
{
    JobParameters params = new JobParametersBuilder()
            .addString("mainJob", String.valueOf(System.currentTimeMillis()))
            .addString("idCarga", String.valueOf(idCarga))
            .toJobParameters();

    jobLauncher.run(job, params);
}
  • 批量配置、作业和步骤
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer
{
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private StepSkipListener stepSkipListener;

    @Autowired
    private MainJobExecutionListener mainJobExecutionListener;

    @Bean
    public TaskExecutor taskExecutor()
    {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setThreadNamePrefix("batch-thread-");

        return taskExecutor;
    }

    @Bean
    public JobLauncher jobLauncher() throws Exception
    {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.setTaskExecutor(taskExecutor());
        jobLauncher.afterPropertiesSet();

        return jobLauncher;
    }

    @Bean
    public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
    {
        return stepBuilderFactory.get("step")
                .<List<ExcelLoad>, Invoice>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .faultTolerant().skipPolicy(new ExceptionSkipPolicy())
                .listener(stepSkipListener)
                .build();
    }

    @Bean
    public Job mainJob(Step mainStep)
    {
        return jobBuilderFactory.get("mainJob")
                                .listener(mainJobExecutionListener)
                                .incrementer(new RunIdIncrementer())
                                .start(mainStep)
                                .build();
    }
}

如果在应用此代码后,正如我遇到的那样,您在将记录插入数据库时​​也遇到了问题,您可以查看此 question,我也将适用于我的代码放在这里。