处理线程池中的优先级
Handling priority in ThreadPools
目前,我们的一个应用程序中有 2 个线程池:
- 第一个是用来处理定时任务的
- 第二个处理每个计划任务的并行处理 运行
需要设置两个不同的池来自以下想法:如果多个计划任务在主(第一个)池中排队,并且它触发了同一个池中的子任务(并行处理),这将导致竞争条件也会让其他计划任务排队"behind",所以实际上什么都不会结束并且会发生死锁。
如果子任务的优先级高于计划任务怎么办?他们会 "jump" 排队并暂停计划任务以完成吗?或者那不会发生?有什么办法可以强制这种行为吗?或者当 ThreadPoolExecutor 已经 运行 正在执行任务时不能暂停任务?
池 1 在 Spring 的应用程序上下文 XML 配置文件中定义为:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:task="http://www.springframework.org/schema/task" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-3.0.xsd">
<context:annotation-config />
<context:component-scan base-package="cl.waypoint.mailer.reportes" />
<task:annotation-driven scheduler="myScheduler" />
<task:scheduler id="myScheduler" pool-size="2" />
<aop:aspectj-autoproxy />
</beans>
池2在代码中定义如下:
public static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 30, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(), new ThreadFactory() {
final AtomicLong count = new AtomicLong(0);
private String namePreffix = "TempAndDoor";
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
t.setName(MessageFormat.format("{0}-{1}", namePreffix, count.getAndIncrement()));
return t;
}
});
如果我理解正确 myScheduler (pool1) 用于安排任务,一旦时间开始,它将任务提交给 执行者(pool2)。
提出问题
如果子任务的优先级高于计划任务怎么办?
不清楚您如何区分它们或您的真正意思,但我的理解是子任务是计划任务。
我想你应该有一些像这样的代码:
@Scheduled(cron="*/5 * * * * MON-FRI")
public void doSomething() {
executor.submit(aTaskThatRunsInParallelOrWhatYouCallASubTask);
}
其中 executor 是您要在其中创建的静态对象。(不应该全部大写和结尾 ;))
根据它的长名称,你提交给执行者的是"sub tasks"或者执行者醒来提交的任务。在这种情况下,您的 myScheduler 将始终及时唤醒,因为执行的是非阻塞的。
另一方面,因为你有一个 LinkedBlockingDeque 这意味着按顺序执行,你的 executor 可能会得到备份。
另一个问题:
他们会"jump"排队并暂停计划任务以完成吗?
它们不会跳转,调度程序启动,提交新任务并再次进入休眠状态。队列开始填满
下一题:
有什么方法可以强迫这种行为吗?或者当 ThreadPoolExecutor 已经是 运行 任务时不能暂停任务?
您可以一起取消任务,但您要跟踪所有提交的任务
你可以抓住未来
Future aFuture = executor.submit(aTaskThatRunsInParallelOrWhatYouCallASubTask);
并且您需要以某种方式知道您实际上想要取消任务,您需要调用
aFuture.cancel();
看你需要的东西比较复杂,建议看JMS,比较成熟,或者AKKA,可能比较容易掌握。
以下一段代码可能适合您。
com.job.Job
package com.job;
import java.util.concurrent.CountDownLatch;
public class Job implements Runnable {
public enum JobPriority {
HIGH, MEDIUM, LOW
}
private String jobName;
private JobPriority jobPriority;
private CountDownLatch cdl;
public Job(String jobName, JobPriority jobPriority) {
super();
this.jobName = jobName;
this.jobPriority = jobPriority;
}
@Override
public void run() {
try {
System.out.println("Job:" + jobName + " Priority:" + jobPriority);
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
cdl.countDown();
}
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public JobPriority getJobPriority() {
return jobPriority;
}
public void setJobPriority(JobPriority jobPriority) {
this.jobPriority = jobPriority;
}
public void initCdl(CountDownLatch countDownLatch) {
this.cdl = countDownLatch;
}
// standard setters and getters
}
com.scheduler.PriorityJobScheduler
package com.scheduler;
import java.util.Comparator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import com.job.Job;
public class PriorityJobScheduler {
private ExecutorService priorityJobPoolExecutor;
private ExecutorService priorityJobScheduler = Executors.newSingleThreadExecutor();
private PriorityBlockingQueue<Job> priorityQueue;
private CountDownLatch countDownLatch;
private int jobCount = 0;
public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
countDownLatch = new CountDownLatch(poolSize);
priorityQueue = new PriorityBlockingQueue<Job>(queueSize, Comparator.comparing(Job::getJobPriority));
priorityJobScheduler.execute(() -> {
while (true) {
try {
Job j = priorityQueue.take();
j.initCdl(countDownLatch);
priorityJobPoolExecutor.execute(j);
jobCount++;
if (jobCount >= poolSize) {
countDownLatch.await();
jobCount = 0 ;
countDownLatch = new CountDownLatch(poolSize);
}
} catch (InterruptedException e) {
// exception needs special handling
break;
}
}
});
}
public void scheduleJob(Job job) {
priorityQueue.add(job);
}
public void cleanUp() {
priorityJobScheduler.shutdown();
}
}
测试
import java.util.ArrayList;
import java.util.List;
import com.job.Job;
import com.job.Job.JobPriority;
import com.scheduler.PriorityJobScheduler;
public class Test {
private static int POOL_SIZE = 3;
private static int QUEUE_SIZE = 10000;
public static void main(String[] args) {
PriorityJobScheduler pjs = new PriorityJobScheduler(POOL_SIZE, QUEUE_SIZE);
for (int i = 0; i < 100; i++) {
Job job1 = new Job("Job" + i + "low", JobPriority.LOW);
Job job2 = new Job("Job" + i + "medium", JobPriority.MEDIUM);
Job job3 = new Job("Job" + i + "high", JobPriority.HIGH);
if (i < 30)
pjs.scheduleJob(job1);
else if (i < 60)
pjs.scheduleJob(job2);
else
pjs.scheduleJob(job3);
try {
Thread.sleep(5);
} catch (InterruptedException e) { // TODO
e.printStackTrace();
}
}
pjs.cleanUp();
}
}
来源:https://www.baeldung.com/java-priority-job-schedule
Baeldung 的原始示例在此代码段中没有使用 CountDownLatch。您也可以浏览 Baeldung 网站上的教程。如果您对在这种情况下使用 CountDownLatch 有任何疑问,请随时在评论部分提问。
目前,我们的一个应用程序中有 2 个线程池:
- 第一个是用来处理定时任务的
- 第二个处理每个计划任务的并行处理 运行
需要设置两个不同的池来自以下想法:如果多个计划任务在主(第一个)池中排队,并且它触发了同一个池中的子任务(并行处理),这将导致竞争条件也会让其他计划任务排队"behind",所以实际上什么都不会结束并且会发生死锁。
如果子任务的优先级高于计划任务怎么办?他们会 "jump" 排队并暂停计划任务以完成吗?或者那不会发生?有什么办法可以强制这种行为吗?或者当 ThreadPoolExecutor 已经 运行 正在执行任务时不能暂停任务?
池 1 在 Spring 的应用程序上下文 XML 配置文件中定义为:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:task="http://www.springframework.org/schema/task" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-3.0.xsd">
<context:annotation-config />
<context:component-scan base-package="cl.waypoint.mailer.reportes" />
<task:annotation-driven scheduler="myScheduler" />
<task:scheduler id="myScheduler" pool-size="2" />
<aop:aspectj-autoproxy />
</beans>
池2在代码中定义如下:
public static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 30, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(), new ThreadFactory() {
final AtomicLong count = new AtomicLong(0);
private String namePreffix = "TempAndDoor";
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
t.setName(MessageFormat.format("{0}-{1}", namePreffix, count.getAndIncrement()));
return t;
}
});
如果我理解正确 myScheduler (pool1) 用于安排任务,一旦时间开始,它将任务提交给 执行者(pool2)。
提出问题
如果子任务的优先级高于计划任务怎么办?
不清楚您如何区分它们或您的真正意思,但我的理解是子任务是计划任务。
我想你应该有一些像这样的代码:
@Scheduled(cron="*/5 * * * * MON-FRI")
public void doSomething() {
executor.submit(aTaskThatRunsInParallelOrWhatYouCallASubTask);
}
其中 executor 是您要在其中创建的静态对象。(不应该全部大写和结尾 ;))
根据它的长名称,你提交给执行者的是"sub tasks"或者执行者醒来提交的任务。在这种情况下,您的 myScheduler 将始终及时唤醒,因为执行的是非阻塞的。
另一方面,因为你有一个 LinkedBlockingDeque 这意味着按顺序执行,你的 executor 可能会得到备份。
另一个问题:
他们会"jump"排队并暂停计划任务以完成吗?
它们不会跳转,调度程序启动,提交新任务并再次进入休眠状态。队列开始填满
下一题:
有什么方法可以强迫这种行为吗?或者当 ThreadPoolExecutor 已经是 运行 任务时不能暂停任务?
您可以一起取消任务,但您要跟踪所有提交的任务
你可以抓住未来
Future aFuture = executor.submit(aTaskThatRunsInParallelOrWhatYouCallASubTask);
并且您需要以某种方式知道您实际上想要取消任务,您需要调用
aFuture.cancel();
看你需要的东西比较复杂,建议看JMS,比较成熟,或者AKKA,可能比较容易掌握。
以下一段代码可能适合您。
com.job.Job
package com.job;
import java.util.concurrent.CountDownLatch;
public class Job implements Runnable {
public enum JobPriority {
HIGH, MEDIUM, LOW
}
private String jobName;
private JobPriority jobPriority;
private CountDownLatch cdl;
public Job(String jobName, JobPriority jobPriority) {
super();
this.jobName = jobName;
this.jobPriority = jobPriority;
}
@Override
public void run() {
try {
System.out.println("Job:" + jobName + " Priority:" + jobPriority);
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
cdl.countDown();
}
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public JobPriority getJobPriority() {
return jobPriority;
}
public void setJobPriority(JobPriority jobPriority) {
this.jobPriority = jobPriority;
}
public void initCdl(CountDownLatch countDownLatch) {
this.cdl = countDownLatch;
}
// standard setters and getters
}
com.scheduler.PriorityJobScheduler
package com.scheduler;
import java.util.Comparator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import com.job.Job;
public class PriorityJobScheduler {
private ExecutorService priorityJobPoolExecutor;
private ExecutorService priorityJobScheduler = Executors.newSingleThreadExecutor();
private PriorityBlockingQueue<Job> priorityQueue;
private CountDownLatch countDownLatch;
private int jobCount = 0;
public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
countDownLatch = new CountDownLatch(poolSize);
priorityQueue = new PriorityBlockingQueue<Job>(queueSize, Comparator.comparing(Job::getJobPriority));
priorityJobScheduler.execute(() -> {
while (true) {
try {
Job j = priorityQueue.take();
j.initCdl(countDownLatch);
priorityJobPoolExecutor.execute(j);
jobCount++;
if (jobCount >= poolSize) {
countDownLatch.await();
jobCount = 0 ;
countDownLatch = new CountDownLatch(poolSize);
}
} catch (InterruptedException e) {
// exception needs special handling
break;
}
}
});
}
public void scheduleJob(Job job) {
priorityQueue.add(job);
}
public void cleanUp() {
priorityJobScheduler.shutdown();
}
}
测试
import java.util.ArrayList;
import java.util.List;
import com.job.Job;
import com.job.Job.JobPriority;
import com.scheduler.PriorityJobScheduler;
public class Test {
private static int POOL_SIZE = 3;
private static int QUEUE_SIZE = 10000;
public static void main(String[] args) {
PriorityJobScheduler pjs = new PriorityJobScheduler(POOL_SIZE, QUEUE_SIZE);
for (int i = 0; i < 100; i++) {
Job job1 = new Job("Job" + i + "low", JobPriority.LOW);
Job job2 = new Job("Job" + i + "medium", JobPriority.MEDIUM);
Job job3 = new Job("Job" + i + "high", JobPriority.HIGH);
if (i < 30)
pjs.scheduleJob(job1);
else if (i < 60)
pjs.scheduleJob(job2);
else
pjs.scheduleJob(job3);
try {
Thread.sleep(5);
} catch (InterruptedException e) { // TODO
e.printStackTrace();
}
}
pjs.cleanUp();
}
}
来源:https://www.baeldung.com/java-priority-job-schedule
Baeldung 的原始示例在此代码段中没有使用 CountDownLatch。您也可以浏览 Baeldung 网站上的教程。如果您对在这种情况下使用 CountDownLatch 有任何疑问,请随时在评论部分提问。