WorkerThread:等待处理完成(BlockingQueue)
WorkerThread: Wait for processing done (BlockingQueue)
我正在构建一个多线程应用程序,使用 WorkerThreads 处理来自 BlockingQueues 的任务。工作人员看起来如下(作为抽象 class。subclasses 实现 processItem())。
abstract class WorkerThread extends Thread {
BlockingQueue<Task> q;
int tasksInSystem; // globally available
public void run() {
while(!interrupted()) {
Task t = q.take();
process(t);
tasksInSystem--;
}
}
abstract void process(Task t);
}
特别的是我想等待所有任务完成。
我的第一个想法是:
- 计算每个添加的任务
- 处理完成后减少计数器。
但是:
但是有不同类型的任务和不同的工作人员实现和多个队列。所以我将不得不维护大量不同的计数器。
我想要的:
q.waitForEmptyAndCompleted()
这将需要队列跟踪任务 "in flight" 并要求工作进程在完成时发出信号(而不是 tasksInsystem---;
)。
工作人员无法增加该计数器,因为他必须在从队列中取出任务后对其进行计数。但是另一个线程可能会在调用 take() 之后变为 运行,这样工作人员就无法事先增加计数器。
因此,counter increase 和 take() 必须绑定在一起 (atomar)。这让我找到了一个专门的 BlockingQueue。
我没有找到预制的解决方案。所以我最好的猜测是实现我自己的 BlockingQueue。有什么我可以改用的东西吗(避免自己实现和测试线程安全的阻塞队列)?或者你有什么想法以不同的方式实现等待调用吗?
好的,因为一般的 ExecutorService
还不够,也许 ForkJoinPool
会起作用,它不会显式公开队列,但根据您的描述应该非常容易使用。
关键方法是awaitQuiescence(long timeout, TimeUnit unit)
,它将等待所有提交的任务执行完毕。
我正在构建一个多线程应用程序,使用 WorkerThreads 处理来自 BlockingQueues 的任务。工作人员看起来如下(作为抽象 class。subclasses 实现 processItem())。
abstract class WorkerThread extends Thread {
BlockingQueue<Task> q;
int tasksInSystem; // globally available
public void run() {
while(!interrupted()) {
Task t = q.take();
process(t);
tasksInSystem--;
}
}
abstract void process(Task t);
}
特别的是我想等待所有任务完成。 我的第一个想法是:
- 计算每个添加的任务
- 处理完成后减少计数器。
但是: 但是有不同类型的任务和不同的工作人员实现和多个队列。所以我将不得不维护大量不同的计数器。
我想要的:
q.waitForEmptyAndCompleted()
这将需要队列跟踪任务 "in flight" 并要求工作进程在完成时发出信号(而不是 tasksInsystem---;
)。
工作人员无法增加该计数器,因为他必须在从队列中取出任务后对其进行计数。但是另一个线程可能会在调用 take() 之后变为 运行,这样工作人员就无法事先增加计数器。
因此,counter increase 和 take() 必须绑定在一起 (atomar)。这让我找到了一个专门的 BlockingQueue。
我没有找到预制的解决方案。所以我最好的猜测是实现我自己的 BlockingQueue。有什么我可以改用的东西吗(避免自己实现和测试线程安全的阻塞队列)?或者你有什么想法以不同的方式实现等待调用吗?
好的,因为一般的 ExecutorService
还不够,也许 ForkJoinPool
会起作用,它不会显式公开队列,但根据您的描述应该非常容易使用。
关键方法是awaitQuiescence(long timeout, TimeUnit unit)
,它将等待所有提交的任务执行完毕。