Spring Boot:How 我们可以实现多个@Scheduled 任务,每个任务都有自己的线程池吗?

Spring Boot:How can we implement multiple @Scheduled tasks with each having its own thread pool?

我想实现多个@Scheduled(固定延迟)任务,每个任务都有自己的线程池。

@Scheduled(fixedDelayString = "30000")
public void createOrderSchedule() {
    //create 10 orders concurrently; wait for all to be finished
    createOrder(10);
}

@Scheduled(fixedDelayString = "30000")
public void processOrderSchedule() {
    //process 10 orders concurrently; wait for all to be finished
}

@Scheduled(fixedDelayString = "30000")
public void notifySchedule() {
    //send notification for 10 orders concurrently; wait for all to be finished
}

我设法为每个调度程序创建了不同的 ThreadPoolTaskExecutor,如下所示:

@Bean("orderPool")
public ThreadPoolTaskExecutor createOrderTaskExecutor() {
    ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
    pool.setCorePoolSize(5);
    pool.setMaxPoolSize(10);
    pool.setThreadNamePrefix("order-thread-pool-");
    pool.setWaitForTasksToCompleteOnShutdown(true);
    return pool;
}

..

我为每个任务提供了 @Async

@Async("orderPool")
public void createOrder(Integer noOforders) {..}

和任务调度程序配置

@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
    ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
    threadPoolTaskScheduler.setPoolSize(3);
    return threadPoolTaskScheduler;
}

我用 CompletableFuture.allOf(..).join(); 等待每个任务完成,但它会阻塞其他 @Scheduled 个任务。

综上所述,我想实现以下目标:

  1. 每个 @Scheduled 任务应该 运行 独立而不阻塞其他 @Scheduled 任务。
  2. 每个 @Scheduled 任务都应该有自己的线程池,这样它就可以同时 运行 多个子任务(比如 10 个)。
  3. 每个 @Scheduled 任务必须等待每个触发器完成而不被再次调用。

我怎样才能做到这一点?

在连续将近 18 小时后,我能够实现我在​​上述问题中提出的问题。抱歉来晚了。

因此,流 API 提供了 IntStream 等接口来并行传输元素。这导致我并行创建了 n 个订单。 (同时,在不同的调度程序中并行处理 k 订单。依此类推。)

IntStream.range(0, inputIds.size())
        .parallel().forEach(index -> createOrder(inputIds.get(index)));

就是这么简单。解决了 1 个用例。现在我希望这个调度程序有它自己的池。发现 IntStream.parallel() 使用 ForkJoinPool,它是我们自己的 ExecutorService 的继承者,令我惊讶的是,spring 为 ForkJoinPool 提供了一个预配置的工厂 bean,即, ForkJoinPoolFactoryBean。所以我创建了一个名为 createOrderExecutor.

的 bean
@Bean("createOrderExecutor")
public ForkJoinPoolFactoryBean createOrderExecutor() {
    ForkJoinPoolFactoryBean createOrderPoolFactoryBean = new ForkJoinPoolFactoryBean();
    createOrderPoolFactoryBean.setParallelism(10);
    createOrderPoolFactoryBean.setAsyncMode(true);
    createOrderPoolFactoryBean.setUncaughtExceptionHandler(null);
    createOrderPoolFactoryBean.setThreadFactory(p -> {
        final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(p);
        worker.setName("create-order-pool-" + worker.getPoolIndex());
        return worker;
    });
    return createOrderPoolFactoryBean;
}

我在我的调度程序中自动装配了这个 bean class,并同时提交了所有订单,如下所示。

createOrderExecutor.getObject().submit(() -> IntStream.range(0, inputIds.size())
                .parallel().forEach(index -> createOrder(inputIds.get(index))));

那里。第二个用例解决了。现在这不会等待所有并行任务完成,它只会异步触发它们。现在,ForkJoinTask(that submit() returns) 提供了一个get()方法,它等待计算完成& returns结果。 (但我不需要结果,我宁愿用try-catch包围它们。而且,我会等待完成。)

@Scheduled(fixedDelayString = "5000")
public void createOrderScheduler() {
        createOrderExecutor.getObject().submit(() -> IntStream.range(0, inputIds.size())
                .parallel().forEach(index -> createOrder(inputIds.get(index)))).get();
}

这解决了我的最后一个用例。我为应用程序中的所有调度程序执行了此操作。

相信我,我尝试了在线提供的几乎所有 CompletableFuture 的实现,但无法实现所有这些。