Spring Boot:How 我们可以实现多个@Scheduled 任务,每个任务都有自己的线程池吗?
Spring Boot:How can we implement multiple @Scheduled tasks with each having its own thread pool?
我想实现多个@Scheduled(固定延迟)任务,每个任务都有自己的线程池。
@Scheduled(fixedDelayString = "30000")
public void createOrderSchedule() {
//create 10 orders concurrently; wait for all to be finished
createOrder(10);
}
@Scheduled(fixedDelayString = "30000")
public void processOrderSchedule() {
//process 10 orders concurrently; wait for all to be finished
}
@Scheduled(fixedDelayString = "30000")
public void notifySchedule() {
//send notification for 10 orders concurrently; wait for all to be finished
}
我设法为每个调度程序创建了不同的 ThreadPoolTaskExecutor
,如下所示:
@Bean("orderPool")
public ThreadPoolTaskExecutor createOrderTaskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);
pool.setMaxPoolSize(10);
pool.setThreadNamePrefix("order-thread-pool-");
pool.setWaitForTasksToCompleteOnShutdown(true);
return pool;
}
..
我为每个任务提供了 @Async
。
@Async("orderPool")
public void createOrder(Integer noOforders) {..}
和任务调度程序配置
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(3);
return threadPoolTaskScheduler;
}
我用 CompletableFuture.allOf(..).join();
等待每个任务完成,但它会阻塞其他 @Scheduled
个任务。
综上所述,我想实现以下目标:
- 每个
@Scheduled
任务应该 运行 独立而不阻塞其他 @Scheduled
任务。
- 每个
@Scheduled
任务都应该有自己的线程池,这样它就可以同时 运行 多个子任务(比如 10 个)。
- 每个
@Scheduled
任务必须等待每个触发器完成而不被再次调用。
我怎样才能做到这一点?
在连续将近 18 小时后,我能够实现我在上述问题中提出的问题。抱歉来晚了。
因此,流 API 提供了 IntStream
等接口来并行传输元素。这导致我并行创建了 n
个订单。 (同时,在不同的调度程序中并行处理 k
订单。依此类推。)
IntStream.range(0, inputIds.size())
.parallel().forEach(index -> createOrder(inputIds.get(index)));
就是这么简单。解决了 1 个用例。现在我希望这个调度程序有它自己的池。发现 IntStream.parallel()
使用 ForkJoinPool
,它是我们自己的 ExecutorService
的继承者,令我惊讶的是,spring 为 ForkJoinPool
提供了一个预配置的工厂 bean,即, ForkJoinPoolFactoryBean
。所以我创建了一个名为 createOrderExecutor
.
的 bean
@Bean("createOrderExecutor")
public ForkJoinPoolFactoryBean createOrderExecutor() {
ForkJoinPoolFactoryBean createOrderPoolFactoryBean = new ForkJoinPoolFactoryBean();
createOrderPoolFactoryBean.setParallelism(10);
createOrderPoolFactoryBean.setAsyncMode(true);
createOrderPoolFactoryBean.setUncaughtExceptionHandler(null);
createOrderPoolFactoryBean.setThreadFactory(p -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(p);
worker.setName("create-order-pool-" + worker.getPoolIndex());
return worker;
});
return createOrderPoolFactoryBean;
}
我在我的调度程序中自动装配了这个 bean class,并同时提交了所有订单,如下所示。
createOrderExecutor.getObject().submit(() -> IntStream.range(0, inputIds.size())
.parallel().forEach(index -> createOrder(inputIds.get(index))));
那里。第二个用例解决了。现在这不会等待所有并行任务完成,它只会异步触发它们。现在,ForkJoinTask
(that submit()
returns) 提供了一个get()
方法,它等待计算完成& returns结果。 (但我不需要结果,我宁愿用try-catch
包围它们。而且,我会等待完成。)
@Scheduled(fixedDelayString = "5000")
public void createOrderScheduler() {
createOrderExecutor.getObject().submit(() -> IntStream.range(0, inputIds.size())
.parallel().forEach(index -> createOrder(inputIds.get(index)))).get();
}
这解决了我的最后一个用例。我为应用程序中的所有调度程序执行了此操作。
相信我,我尝试了在线提供的几乎所有 CompletableFuture
的实现,但无法实现所有这些。
我想实现多个@Scheduled(固定延迟)任务,每个任务都有自己的线程池。
@Scheduled(fixedDelayString = "30000")
public void createOrderSchedule() {
//create 10 orders concurrently; wait for all to be finished
createOrder(10);
}
@Scheduled(fixedDelayString = "30000")
public void processOrderSchedule() {
//process 10 orders concurrently; wait for all to be finished
}
@Scheduled(fixedDelayString = "30000")
public void notifySchedule() {
//send notification for 10 orders concurrently; wait for all to be finished
}
我设法为每个调度程序创建了不同的 ThreadPoolTaskExecutor
,如下所示:
@Bean("orderPool")
public ThreadPoolTaskExecutor createOrderTaskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);
pool.setMaxPoolSize(10);
pool.setThreadNamePrefix("order-thread-pool-");
pool.setWaitForTasksToCompleteOnShutdown(true);
return pool;
}
..
我为每个任务提供了 @Async
。
@Async("orderPool")
public void createOrder(Integer noOforders) {..}
和任务调度程序配置
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(3);
return threadPoolTaskScheduler;
}
我用 CompletableFuture.allOf(..).join();
等待每个任务完成,但它会阻塞其他 @Scheduled
个任务。
综上所述,我想实现以下目标:
- 每个
@Scheduled
任务应该 运行 独立而不阻塞其他@Scheduled
任务。 - 每个
@Scheduled
任务都应该有自己的线程池,这样它就可以同时 运行 多个子任务(比如 10 个)。 - 每个
@Scheduled
任务必须等待每个触发器完成而不被再次调用。
我怎样才能做到这一点?
在连续将近 18 小时后,我能够实现我在上述问题中提出的问题。抱歉来晚了。
因此,流 API 提供了 IntStream
等接口来并行传输元素。这导致我并行创建了 n
个订单。 (同时,在不同的调度程序中并行处理 k
订单。依此类推。)
IntStream.range(0, inputIds.size())
.parallel().forEach(index -> createOrder(inputIds.get(index)));
就是这么简单。解决了 1 个用例。现在我希望这个调度程序有它自己的池。发现 IntStream.parallel()
使用 ForkJoinPool
,它是我们自己的 ExecutorService
的继承者,令我惊讶的是,spring 为 ForkJoinPool
提供了一个预配置的工厂 bean,即, ForkJoinPoolFactoryBean
。所以我创建了一个名为 createOrderExecutor
.
@Bean("createOrderExecutor")
public ForkJoinPoolFactoryBean createOrderExecutor() {
ForkJoinPoolFactoryBean createOrderPoolFactoryBean = new ForkJoinPoolFactoryBean();
createOrderPoolFactoryBean.setParallelism(10);
createOrderPoolFactoryBean.setAsyncMode(true);
createOrderPoolFactoryBean.setUncaughtExceptionHandler(null);
createOrderPoolFactoryBean.setThreadFactory(p -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(p);
worker.setName("create-order-pool-" + worker.getPoolIndex());
return worker;
});
return createOrderPoolFactoryBean;
}
我在我的调度程序中自动装配了这个 bean class,并同时提交了所有订单,如下所示。
createOrderExecutor.getObject().submit(() -> IntStream.range(0, inputIds.size())
.parallel().forEach(index -> createOrder(inputIds.get(index))));
那里。第二个用例解决了。现在这不会等待所有并行任务完成,它只会异步触发它们。现在,ForkJoinTask
(that submit()
returns) 提供了一个get()
方法,它等待计算完成& returns结果。 (但我不需要结果,我宁愿用try-catch
包围它们。而且,我会等待完成。)
@Scheduled(fixedDelayString = "5000")
public void createOrderScheduler() {
createOrderExecutor.getObject().submit(() -> IntStream.range(0, inputIds.size())
.parallel().forEach(index -> createOrder(inputIds.get(index)))).get();
}
这解决了我的最后一个用例。我为应用程序中的所有调度程序执行了此操作。
相信我,我尝试了在线提供的几乎所有 CompletableFuture
的实现,但无法实现所有这些。