当 ThreadPoolExecutor 没有活跃的 worker 时执行动作
Execute action when ThreadPoolExecutor has no active workers
我有一个缓存线程池,其中以相当不可预测的方式生成新任务。这些任务不会生成任何结果(它们是 Runnables
而不是 Callables
)。
我想要一个动作在池中没有活跃的工作人员时执行。
但是我不想关闭池(显然使用 awaitTermination
),因为当新任务到达时我将不得不再次重新初始化它(因为它可能无法预测地到达,即使在关闭期间) .
我想到了以下可能的方法:
有一个额外的线程(在池外),每当产生新任务并且 ThreadPoolExecutor
没有活跃的工作人员时就会产生该线程。然后它应该不断检查 getActiveWorkers()
直到它 returns 0 如果是,则执行所需的操作。
有一些线程安全队列(哪个?),其中添加了每个新生成的任务的 Future
。每当队列中至少有一个条目时,产生一个额外的线程(池外)等待队列为空并执行所需的操作。
实施 PriorityBlockingQueue
以与池一起使用,并为工作线程分配比执行所需操作的线程(现在来自池内)更高的优先级。
我的问题:
我想知道是否有一些更干净的解决方案,它使用一些不错的同步对象(比如 CountDownLatch
,但是不能在这里使用,因为我事先不知道任务的数量)?
如果我是你,我会为你的线程池实现一个 装饰器 来跟踪计划任务并轻微修改 运行 的任务。这样,无论何时调度 Runnable
,您都可以调度另一个能够跟踪其自身进程的 decoarated Runnable
。
这个装饰器看起来像:
class RunnableDecorator implements Runnable {
private final Runnable delegate;
// this task counter must be increased on any
// scheduling of a task by the thread pool
private final AtomicInteger taskCounter;
// Constructor omitted
@Override
public void run() {
try {
delegate.run();
} finally {
if (taskCounter.decrementAndGet() == 0) {
// spawn idle action
}
}
}
}
当然,线程池必须在每次调度任务时增加计数器。因此,此逻辑必须 而不是 添加到 Runnable
,而是添加到 ThreadPool
。最后,由您决定是否要 运行 同一线程中的 空闲操作 或者是否要提供对正在执行的线程池的引用 运行一个新线程。如果您选择后者,请注意空闲操作的完成将触发另一个空闲操作。但是,您也可以提供一种 原始调度 的方法。您还可以将修饰添加到线程队列,但这会使提供这种原始调度变得更加困难。
这种方法是非阻塞的,不会过多地干扰您的代码库。请注意,线程池在创建时不会启动操作,因此根据定义为空。
如果您查看 Executors.newCachedThreadPool()
背后的源代码,您可以了解它是如何使用 ThreadPoolExecutor
创建的。使用它,覆盖 execute
和 afterExecute
方法以添加计数器。这样递增和递减逻辑被隔离在一个位置。例如:
ExecutorService executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()) {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public void execute(Runnable r) {
counter.incrementAndGet();
super.execute(r);
}
@Override
public void afterExecute(Runnable r, Throwable t) {
if (counter.decrementAndGet() == 0) {
// thread pool is idle - do something
}
super.afterExecute(r, t);
}
};
我有一个缓存线程池,其中以相当不可预测的方式生成新任务。这些任务不会生成任何结果(它们是 Runnables
而不是 Callables
)。
我想要一个动作在池中没有活跃的工作人员时执行。
但是我不想关闭池(显然使用 awaitTermination
),因为当新任务到达时我将不得不再次重新初始化它(因为它可能无法预测地到达,即使在关闭期间) .
我想到了以下可能的方法:
有一个额外的线程(在池外),每当产生新任务并且
ThreadPoolExecutor
没有活跃的工作人员时就会产生该线程。然后它应该不断检查getActiveWorkers()
直到它 returns 0 如果是,则执行所需的操作。有一些线程安全队列(哪个?),其中添加了每个新生成的任务的
Future
。每当队列中至少有一个条目时,产生一个额外的线程(池外)等待队列为空并执行所需的操作。实施
PriorityBlockingQueue
以与池一起使用,并为工作线程分配比执行所需操作的线程(现在来自池内)更高的优先级。
我的问题:
我想知道是否有一些更干净的解决方案,它使用一些不错的同步对象(比如 CountDownLatch
,但是不能在这里使用,因为我事先不知道任务的数量)?
如果我是你,我会为你的线程池实现一个 装饰器 来跟踪计划任务并轻微修改 运行 的任务。这样,无论何时调度 Runnable
,您都可以调度另一个能够跟踪其自身进程的 decoarated Runnable
。
这个装饰器看起来像:
class RunnableDecorator implements Runnable {
private final Runnable delegate;
// this task counter must be increased on any
// scheduling of a task by the thread pool
private final AtomicInteger taskCounter;
// Constructor omitted
@Override
public void run() {
try {
delegate.run();
} finally {
if (taskCounter.decrementAndGet() == 0) {
// spawn idle action
}
}
}
}
当然,线程池必须在每次调度任务时增加计数器。因此,此逻辑必须 而不是 添加到 Runnable
,而是添加到 ThreadPool
。最后,由您决定是否要 运行 同一线程中的 空闲操作 或者是否要提供对正在执行的线程池的引用 运行一个新线程。如果您选择后者,请注意空闲操作的完成将触发另一个空闲操作。但是,您也可以提供一种 原始调度 的方法。您还可以将修饰添加到线程队列,但这会使提供这种原始调度变得更加困难。
这种方法是非阻塞的,不会过多地干扰您的代码库。请注意,线程池在创建时不会启动操作,因此根据定义为空。
如果您查看 Executors.newCachedThreadPool()
背后的源代码,您可以了解它是如何使用 ThreadPoolExecutor
创建的。使用它,覆盖 execute
和 afterExecute
方法以添加计数器。这样递增和递减逻辑被隔离在一个位置。例如:
ExecutorService executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()) {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public void execute(Runnable r) {
counter.incrementAndGet();
super.execute(r);
}
@Override
public void afterExecute(Runnable r, Throwable t) {
if (counter.decrementAndGet() == 0) {
// thread pool is idle - do something
}
super.afterExecute(r, t);
}
};