ExecutorService:在线程中完成同步障碍时如何防止线程饥饿
ExecutorService: how to prevent thread starvation when synchronization barriers are done in the threads
我遇到的情况很难找到干净的解决方案。我会尽量详细解释。
我有一个树状结构:
NODE A
NODE A.1
NODE A.2
NODE A.2.a
NODE A.2.b
NODE A.3
NODE A.3.a
NODE A.3.b
NODE A.3.c
NODE B
NODE B.1
NODE B.2
我需要处理根节点:
public void process(final Node node) { ... }
一个节点的进程涉及两件事:
- some database queries
- the process of all children of these nodes
也就是说,一旦处理完NODE.2.a
和NODE.2.b
,就可以处理NODE.2
。我正在以递归方式处理节点,没什么了不起的。
到目前为止,还不错。现在我想声明一个全局执行器服务,具有固定数量的线程。我想并行处理一个节点的子节点。因此,NODE.2.a
和 NODE.2.b
可以分别在各自的线程中处理。代码看起来像这样:
// global executor service, shared between all process(Node) calls
final ExecutorService service = Executors.newFixedThreadPool(4);
public void process(final Node node) {
// database queries
...
// wait for children to be processed
final CountDownLatch latch = new CountDownLatch(node.children().size());
for (final Node child : node.children()) {
service.execute(() -> {
process(child);
latch.countDown();
});
}
latch.await();
}
这里的问题是:当达到一定深度时,所有线程都在 latch.await()
中停止。我们已经到了线程饥饿的情况。
这可以通过使执行程序服务不受限制来轻松解决,但我不喜欢该选项。我想控制 active
线程的数量。在我的例子中,active
线程的数量将等于内核的数量。拥有更多 active
个线程会导致从一个线程到另一个线程的交换,我想避免这种情况。
如何解决?
这在技术上是一个僵局。
我们首先将死锁视为两个或多个进程正在等待锁,但在这种情况下,线程正在相互等待。
有趣的是,这是一种产生自我僵局的怪兽的方式。如果你有一个线程池,它将提交任务并等待它们但永远不会执行,因为它正在等待自己完成执行它们!
标准答案叫做'Work Stealing Thread Pool'。
我遇到了完全相同的问题,如果没有那部分词汇,我花了很长时间才找到有关我快速解决的任何信息,这是在完全并发递归中执行的任何递归算法中的常见问题。
好的,这是其工作原理的草图。
用一个技巧创建一个相当标准的线程池。
当一个线程到达一个没有排队项目的结果就无法继续的点时,检查该项目是否已被另一个线程启动,如果没有则在当前线程中执行它(而不是等待),否则等待执行该项目以完成它的线程。
它非常简单,但可能需要您构建自己的线程池class,因为现成的解决方案通常不支持它。
当然要注意,典型的非平凡递归算法会分解成比并行处理单元更多的子任务。
因此,将子任务排入某个级别然后仅在单个线程中执行其余部分可能是有意义的。
也就是说,除非将物品放入队列和取消排队很便宜,否则您可以花时间将物品放入队列中以取回它们。这些操作可以在线程之间串行化并减少并行性。很难使用无锁队列,因为它们通常不允许从队列中间提取(如此处所要求的)。你应该限制并行的深度。
从好的方面来说,请注意在当前执行的线程中执行任务涉及的任务交换开销比停放当前线程和(在 OS 级别)在另一个工作线程中交换更少。
这是一个 Java 资源,它提供 WorkStealingThreadPool
。
请注意,我没有努力评估实施情况,也没有提供任何建议。在这种情况下,我一直在使用 C++,或者很乐意分享我的 template
.
也可以参考维基百科的说法:https://en.wikipedia.org/wiki/Work_stealing
您真正想要的可以使用
实现
int cores = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(cores);
还请记住,getActiveCount() 的 javadoc 表示它是一个近似值。
我不会使用容易出错的多个 CountDownLatch-es。据我了解,您想将树从叶节点并行遍历到父节点。
我会使用简单的 BFS/DFS 遍历树(那里不需要递归算法),一旦找到没有子节点的叶子,就将此节点放入阻塞队列。
队列可以被第二个线程轮询,这个线程会把新的任务调度到固定线程数的executor服务
一旦执行线程完成处理从队列中取出的当前节点,执行线程检查当前节点是否有父节点。如果有父节点,则重新放入队列中。您还应该检查 parent 是否已被处理。
像这样:
BlockingQueue blockingQueue = new BlockingQueue();
ExecutorService service =
new Thread(new Runnable(){
while (true) {
Task task = blockingQueue.poll();
service.execute(new Runnable(){
if (task.isProcessed()) {
return ;
}
... do you job
task.setProcessed(true);
Node node = task.getNode();
boolean allChildrenProcessed = true;
for (Node childeNode: node.getChildren()) {
allChildrenProcessed &= childeNode.isProcessed();
}
if (node.hasParent() && allChildrenProcessed) {
blockingQueue.put(node.getParent())
}
});
}
}).start();
Stack stack = new StackImpl();
stack.put(root);
while (node = stack.pop() != null) {
for (Node child: node.getChildren()) {
stack.push(child);
}
if (node.getChildren().isEmpty()) {
// add leaf node for processing
blockinQueue.add(node);
}
}
您似乎想要递归的替代方法。该替代方案是分散聚集。提交一个分叉所有任务的请求。当 NODE.2 执行时,它很简单 returns 因为子节点还没有完成。当最后一个 Task 完成时(2.1,2.b),完成处理开始处理 NODE.2。
我维护的 Data Parallel 开源产品完全可以做到这一点。无需获取整个产品,只需下载文档 (http://coopsoft.com/JavaDoc.html ) 并查看文件:Manual/DynamicJoin.html。如果这能解决您的问题,那么您就可以获取产品。
我遇到的情况很难找到干净的解决方案。我会尽量详细解释。
我有一个树状结构:
NODE A
NODE A.1
NODE A.2
NODE A.2.a
NODE A.2.b
NODE A.3
NODE A.3.a
NODE A.3.b
NODE A.3.c
NODE B
NODE B.1
NODE B.2
我需要处理根节点:
public void process(final Node node) { ... }
一个节点的进程涉及两件事:
- some database queries
- the process of all children of these nodes
也就是说,一旦处理完NODE.2.a
和NODE.2.b
,就可以处理NODE.2
。我正在以递归方式处理节点,没什么了不起的。
到目前为止,还不错。现在我想声明一个全局执行器服务,具有固定数量的线程。我想并行处理一个节点的子节点。因此,NODE.2.a
和 NODE.2.b
可以分别在各自的线程中处理。代码看起来像这样:
// global executor service, shared between all process(Node) calls
final ExecutorService service = Executors.newFixedThreadPool(4);
public void process(final Node node) {
// database queries
...
// wait for children to be processed
final CountDownLatch latch = new CountDownLatch(node.children().size());
for (final Node child : node.children()) {
service.execute(() -> {
process(child);
latch.countDown();
});
}
latch.await();
}
这里的问题是:当达到一定深度时,所有线程都在 latch.await()
中停止。我们已经到了线程饥饿的情况。
这可以通过使执行程序服务不受限制来轻松解决,但我不喜欢该选项。我想控制 active
线程的数量。在我的例子中,active
线程的数量将等于内核的数量。拥有更多 active
个线程会导致从一个线程到另一个线程的交换,我想避免这种情况。
如何解决?
这在技术上是一个僵局。 我们首先将死锁视为两个或多个进程正在等待锁,但在这种情况下,线程正在相互等待。 有趣的是,这是一种产生自我僵局的怪兽的方式。如果你有一个线程池,它将提交任务并等待它们但永远不会执行,因为它正在等待自己完成执行它们!
标准答案叫做'Work Stealing Thread Pool'。 我遇到了完全相同的问题,如果没有那部分词汇,我花了很长时间才找到有关我快速解决的任何信息,这是在完全并发递归中执行的任何递归算法中的常见问题。
好的,这是其工作原理的草图。 用一个技巧创建一个相当标准的线程池。 当一个线程到达一个没有排队项目的结果就无法继续的点时,检查该项目是否已被另一个线程启动,如果没有则在当前线程中执行它(而不是等待),否则等待执行该项目以完成它的线程。
它非常简单,但可能需要您构建自己的线程池class,因为现成的解决方案通常不支持它。
当然要注意,典型的非平凡递归算法会分解成比并行处理单元更多的子任务。 因此,将子任务排入某个级别然后仅在单个线程中执行其余部分可能是有意义的。
也就是说,除非将物品放入队列和取消排队很便宜,否则您可以花时间将物品放入队列中以取回它们。这些操作可以在线程之间串行化并减少并行性。很难使用无锁队列,因为它们通常不允许从队列中间提取(如此处所要求的)。你应该限制并行的深度。
从好的方面来说,请注意在当前执行的线程中执行任务涉及的任务交换开销比停放当前线程和(在 OS 级别)在另一个工作线程中交换更少。
这是一个 Java 资源,它提供 WorkStealingThreadPool
。
请注意,我没有努力评估实施情况,也没有提供任何建议。在这种情况下,我一直在使用 C++,或者很乐意分享我的 template
.
也可以参考维基百科的说法:https://en.wikipedia.org/wiki/Work_stealing
您真正想要的可以使用
实现int cores = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(cores);
还请记住,getActiveCount() 的 javadoc 表示它是一个近似值。
我不会使用容易出错的多个 CountDownLatch-es。据我了解,您想将树从叶节点并行遍历到父节点。
我会使用简单的 BFS/DFS 遍历树(那里不需要递归算法),一旦找到没有子节点的叶子,就将此节点放入阻塞队列。
队列可以被第二个线程轮询,这个线程会把新的任务调度到固定线程数的executor服务
一旦执行线程完成处理从队列中取出的当前节点,执行线程检查当前节点是否有父节点。如果有父节点,则重新放入队列中。您还应该检查 parent 是否已被处理。
像这样:
BlockingQueue blockingQueue = new BlockingQueue();
ExecutorService service =
new Thread(new Runnable(){
while (true) {
Task task = blockingQueue.poll();
service.execute(new Runnable(){
if (task.isProcessed()) {
return ;
}
... do you job
task.setProcessed(true);
Node node = task.getNode();
boolean allChildrenProcessed = true;
for (Node childeNode: node.getChildren()) {
allChildrenProcessed &= childeNode.isProcessed();
}
if (node.hasParent() && allChildrenProcessed) {
blockingQueue.put(node.getParent())
}
});
}
}).start();
Stack stack = new StackImpl();
stack.put(root);
while (node = stack.pop() != null) {
for (Node child: node.getChildren()) {
stack.push(child);
}
if (node.getChildren().isEmpty()) {
// add leaf node for processing
blockinQueue.add(node);
}
}
您似乎想要递归的替代方法。该替代方案是分散聚集。提交一个分叉所有任务的请求。当 NODE.2 执行时,它很简单 returns 因为子节点还没有完成。当最后一个 Task 完成时(2.1,2.b),完成处理开始处理 NODE.2。
我维护的 Data Parallel 开源产品完全可以做到这一点。无需获取整个产品,只需下载文档 (http://coopsoft.com/JavaDoc.html ) 并查看文件:Manual/DynamicJoin.html。如果这能解决您的问题,那么您就可以获取产品。