如何覆盖 executorService 关闭方法
how to override executorService shutdown method
我正在创建自己的线程池和可以并行执行可调用接口的未来对象。 Executor 提供了 shutdown 方法来停止所有来自 运行 的工作线程。如果我正在创建如下所示的线程池,我应该如何实现关闭方法以在所有线程执行完毕后停止?
我的自定义线程池看起来像
class MyThreadPool implements java.util.concurrent.Executor
{
private final java.util.concurrent.BlockingQueue<Callable> queue;
public MyThreadPool(int numThreads) {
queue = new java.util.concurrent.LinkedBlockingQueue<>();
for (int i=0 ; i<numThreads ; i++) {
new Thread(new Runnable(){
@Override
public void run() {
while(true) {
queue.take().call();
}
}
}).start();
}
}
@Override
public <T> Future<T> submit(Callable<T> callable) {
FutureTask<T> future = new FutureTask(callable);
queue.put(future);
return future;
}
public void shutdown(){ }
}
我想不出一种方法来保留线程列表,然后检查它们是否空闲?
您绝对应该持有对您正在创建的线程的引用。例如,设置类型为 List<Thread>
的字段 threads
并从构造函数中将线程添加到此列表。
之后,您可以在 Thread#join()
的帮助下实现 shutdown()
:
public void shutdown() {
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) { /* NOP */ }
}
}
不要忘记将 while (true)
替换为适当的条件(您在 shutdown()
中切换)并考虑使用 BlockingQueue#poll(long, TimeUnit)
而不是 take()
.
编辑: 类似于:
public class MyThreadPool implements Executor {
private List<Thread> threads = new ArrayList<>();
private BlockingDeque<Callable> tasks = new LinkedBlockingDeque<>();
private volatile boolean running = true;
public MyThreadPool(int numberOfThreads) {
for (int i = 0; i < numberOfThreads; i++) {
Thread t = new Thread(() -> {
while (running) {
try {
Callable c = tasks.poll(5L, TimeUnit.SECONDS);
if (c != null) {
c.call();
}
} catch (Exception e) { /* NOP */ }
}
});
t.start();
threads.add(t);
}
}
public void shutdown() {
running = false;
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) { /* NOP */ }
}
}
// ...
}
我正在创建自己的线程池和可以并行执行可调用接口的未来对象。 Executor 提供了 shutdown 方法来停止所有来自 运行 的工作线程。如果我正在创建如下所示的线程池,我应该如何实现关闭方法以在所有线程执行完毕后停止?
我的自定义线程池看起来像
class MyThreadPool implements java.util.concurrent.Executor
{
private final java.util.concurrent.BlockingQueue<Callable> queue;
public MyThreadPool(int numThreads) {
queue = new java.util.concurrent.LinkedBlockingQueue<>();
for (int i=0 ; i<numThreads ; i++) {
new Thread(new Runnable(){
@Override
public void run() {
while(true) {
queue.take().call();
}
}
}).start();
}
}
@Override
public <T> Future<T> submit(Callable<T> callable) {
FutureTask<T> future = new FutureTask(callable);
queue.put(future);
return future;
}
public void shutdown(){ }
}
我想不出一种方法来保留线程列表,然后检查它们是否空闲?
您绝对应该持有对您正在创建的线程的引用。例如,设置类型为 List<Thread>
的字段 threads
并从构造函数中将线程添加到此列表。
之后,您可以在 Thread#join()
的帮助下实现 shutdown()
:
public void shutdown() {
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) { /* NOP */ }
}
}
不要忘记将 while (true)
替换为适当的条件(您在 shutdown()
中切换)并考虑使用 BlockingQueue#poll(long, TimeUnit)
而不是 take()
.
编辑: 类似于:
public class MyThreadPool implements Executor {
private List<Thread> threads = new ArrayList<>();
private BlockingDeque<Callable> tasks = new LinkedBlockingDeque<>();
private volatile boolean running = true;
public MyThreadPool(int numberOfThreads) {
for (int i = 0; i < numberOfThreads; i++) {
Thread t = new Thread(() -> {
while (running) {
try {
Callable c = tasks.poll(5L, TimeUnit.SECONDS);
if (c != null) {
c.call();
}
} catch (Exception e) { /* NOP */ }
}
});
t.start();
threads.add(t);
}
}
public void shutdown() {
running = false;
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) { /* NOP */ }
}
}
// ...
}