如何实现阻塞ThreadPoolExecutor
How to implement blocking ThreadPoolExecutor
我需要实施屏蔽 ThreadPoolExecutor
。
这是我们企业应用中非常关键的需求。
如果 ThreadPoolExecutor.submit()
或 ThreadPoolExecutor.execute()
方法会阻塞,直到线程被释放以执行新任务。
但在当前实现中,如果所有池线程都忙,ThreadPoolExecutor.submit()
和 ThreadPoolExecutor.execute()
方法将抛出 RejectedExecutionException 异常。
例如下面的代码抛出 RejectedExecutionException
:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class BlockingTPE {
public static void main(String[] args) {
ArrayBlockingQueue queue = new ArrayBlockingQueue(3);
ThreadPoolExecutor tpExe = new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, queue);
int numJobs = 50;
for (int i = 1; i <= numJobs; i++) {
try {
tpExe.submit(new WorkerThread(i));
System.out.println("Added#" + (i));
} catch (RejectedExecutionException e) {
e.printStackTrace();
}
}
}
}
class WorkerThread implements Runnable {
int jobId;
public WorkerThread(int jobId) {
this.jobId = jobId;
}
public void run() {
try {
Thread.sleep(1000);
}
catch (Exception excep) {
}
}
}
正如 ThreadPoolExecutor
的 javadoc 所述:
Creates a new ThreadPoolExecutor with the given initial parameters and
default thread factory and rejected execution handler.
被拒绝的执行程序处理程序是 AbortPolicy
的一个实例,如果队列不接受另一个任务,它将被调用。 javadoc 的行为:
Always throws RejectedExecutionException.
因此阻塞队列对您没有任何影响。我以这种方式更改了您的代码,它运行没有任何问题:
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 3, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(3));
try {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
int numJobs = 50;
for (int i = 1; i <= numJobs; i++) {
try {
executor.submit(new WorkerThread(i));
System.out.println("Added#" + (i));
} catch (RejectedExecutionException e) {
e.printStackTrace();
}
}
} finally {
executor.shutdown();
}
}
您必须做出的决定是:
- 使用未绑定队列来支持所有延迟任务。例如
LinkedBlockingQueue
.
- 使用绑定队列,让当前线程执行不适合整个队列的任务。例如,请参阅我在回答中发布的代码。
- 如果有界队列已满则丢弃任务。例如,使用
ThreadPoolExecutor.DiscardPolicy
作为被拒绝的执行处理程序。
我需要实施屏蔽 ThreadPoolExecutor
。
这是我们企业应用中非常关键的需求。
如果 ThreadPoolExecutor.submit()
或 ThreadPoolExecutor.execute()
方法会阻塞,直到线程被释放以执行新任务。
但在当前实现中,如果所有池线程都忙,ThreadPoolExecutor.submit()
和 ThreadPoolExecutor.execute()
方法将抛出 RejectedExecutionException 异常。
例如下面的代码抛出 RejectedExecutionException
:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class BlockingTPE {
public static void main(String[] args) {
ArrayBlockingQueue queue = new ArrayBlockingQueue(3);
ThreadPoolExecutor tpExe = new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, queue);
int numJobs = 50;
for (int i = 1; i <= numJobs; i++) {
try {
tpExe.submit(new WorkerThread(i));
System.out.println("Added#" + (i));
} catch (RejectedExecutionException e) {
e.printStackTrace();
}
}
}
}
class WorkerThread implements Runnable {
int jobId;
public WorkerThread(int jobId) {
this.jobId = jobId;
}
public void run() {
try {
Thread.sleep(1000);
}
catch (Exception excep) {
}
}
}
正如 ThreadPoolExecutor
的 javadoc 所述:
Creates a new ThreadPoolExecutor with the given initial parameters and default thread factory and rejected execution handler.
被拒绝的执行程序处理程序是 AbortPolicy
的一个实例,如果队列不接受另一个任务,它将被调用。 javadoc 的行为:
Always throws RejectedExecutionException.
因此阻塞队列对您没有任何影响。我以这种方式更改了您的代码,它运行没有任何问题:
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 3, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(3));
try {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
int numJobs = 50;
for (int i = 1; i <= numJobs; i++) {
try {
executor.submit(new WorkerThread(i));
System.out.println("Added#" + (i));
} catch (RejectedExecutionException e) {
e.printStackTrace();
}
}
} finally {
executor.shutdown();
}
}
您必须做出的决定是:
- 使用未绑定队列来支持所有延迟任务。例如
LinkedBlockingQueue
. - 使用绑定队列,让当前线程执行不适合整个队列的任务。例如,请参阅我在回答中发布的代码。
- 如果有界队列已满则丢弃任务。例如,使用
ThreadPoolExecutor.DiscardPolicy
作为被拒绝的执行处理程序。