为什么 ParallelStream 不会在递归中使用所有 commonPool 的线程?
Why ParallelStream won't use all commonPool's thread in recursion?
当我 运行 以下代码时,8 个线程中只有 2 个可用 运行,谁能解释为什么会这样?我怎样才能更改代码以利用所有 8 个线程?
Tree.java
:
package il.co.roy;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class Tree<T>
{
private final T data;
private final Set<Tree<T>> subTrees;
public Tree(T data, Set<Tree<T>> subTrees)
{
this.data = data;
this.subTrees = subTrees;
}
public Tree(T data)
{
this(data, new HashSet<>());
}
public Tree()
{
this(null);
}
public T getData()
{
return data;
}
public Set<Tree<T>> getSubTrees()
{
return subTrees;
}
@Override
public boolean equals(Object o)
{
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Tree<?> tree = (Tree<?>) o;
return Objects.equals(data, tree.data) &&
Objects.equals(subTrees, tree.subTrees);
}
@Override
public int hashCode()
{
return Objects.hash(data, subTrees);
}
@Override
public String toString()
{
return "Tree{" +
"data=" + data +
", subTrees=" + subTrees +
'}';
}
public void sendCommandAll()
{
if (data != null)
System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data);
try
{
Thread.sleep(5000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
if (data != null)
System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true);
subTrees.parallelStream()
// .map(Tree::sendCommandAll)
.forEach(Tree::sendCommandAll);
// .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2);
}
}
(我用forEach
还是reduce
都无所谓)
Main.java
:
package il.co.roy;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Main
{
public static void main(String... args)
{
System.out.println("Processors: " + Runtime.getRuntime().availableProcessors());
final Tree<Integer> root = new Tree<>(null,
Set.of(new Tree<>(1,
IntStream.range(2, 7)
.boxed()
.map(Tree::new)
.collect(Collectors.toSet()))));
root.sendCommandAll();
// IntStream.generate(() -> 1)
// .parallel()
// .forEach(i ->
// {
// System.out.println(Thread.currentThread().getName());
// try
// {
// Thread.sleep(5000);
// } catch (InterruptedException e)
// {
// e.printStackTrace();
// }
// });
}
}
在 main
方法中,我创建了一个具有以下结构的树:\
root (data is `null`)
|- 1
|- 2
|- 3
|- 4
|- 5
|- 6
sendCommandAll
函数仅在父树完成处理后才处理每个子树(并行)。
但结果如下:
Processors: 8
[main] sending command to 1
[main] tree with data 1 got true
[main] sending command to 6
[ForkJoinPool.commonPool-worker-2] sending command to 5
[main] tree with data 6 got true
[ForkJoinPool.commonPool-worker-2] tree with data 5 got true
[ForkJoinPool.commonPool-worker-2] sending command to 4
[ForkJoinPool.commonPool-worker-2] tree with data 4 got true
[ForkJoinPool.commonPool-worker-2] sending command to 3
[ForkJoinPool.commonPool-worker-2] tree with data 3 got true
[ForkJoinPool.commonPool-worker-2] sending command to 2
[ForkJoinPool.commonPool-worker-2] tree with data 2 got true
(作为记录,当我在 Main.java
中执行注释代码时,JVM 使用所有 7(+ 1)个可用线程 commonPool
)
如何改进我的代码?
如后半部分所述,处理HashMap
s或HashSet
s时的线程利用率取决于后备数组中元素的分布,这取决于在哈希码上。尤其是在元素数量较少的情况下,与(默认)容量相比,这可能会导致糟糕的工作拆分。
一个简单的解决方法是使用 new ArrayList<>(subTrees).parallelStream()
而不是 subTrees.parallelStream()
。
但请注意,您的方法在处理子节点之前会执行当前节点的实际工作(在使用 sleep
模拟的示例中),这也会降低潜在的并行性。
您可以使用
public void sendCommandAll() {
if(subTrees.isEmpty()) {
actualSendCommand();
return;
}
List<Tree<T>> tmp = new ArrayList<>(subTrees.size() + 1);
tmp.addAll(subTrees);
tmp.add(this);
tmp.parallelStream().forEach(t -> {
if(t != this) t.sendCommandAll(); else t.actualSendCommand();
});
}
private void actualSendCommand() {
if (data != null)
System.out.println("[" + Thread.currentThread().getName()
+ "] sending command to " + data);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (data != null)
System.out.println("[" + Thread.currentThread().getName()
+ "] tree with data " + data + " got " + true);
}
这允许在处理子节点的同时处理当前节点。
当我 运行 以下代码时,8 个线程中只有 2 个可用 运行,谁能解释为什么会这样?我怎样才能更改代码以利用所有 8 个线程?
Tree.java
:
package il.co.roy;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class Tree<T>
{
private final T data;
private final Set<Tree<T>> subTrees;
public Tree(T data, Set<Tree<T>> subTrees)
{
this.data = data;
this.subTrees = subTrees;
}
public Tree(T data)
{
this(data, new HashSet<>());
}
public Tree()
{
this(null);
}
public T getData()
{
return data;
}
public Set<Tree<T>> getSubTrees()
{
return subTrees;
}
@Override
public boolean equals(Object o)
{
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Tree<?> tree = (Tree<?>) o;
return Objects.equals(data, tree.data) &&
Objects.equals(subTrees, tree.subTrees);
}
@Override
public int hashCode()
{
return Objects.hash(data, subTrees);
}
@Override
public String toString()
{
return "Tree{" +
"data=" + data +
", subTrees=" + subTrees +
'}';
}
public void sendCommandAll()
{
if (data != null)
System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data);
try
{
Thread.sleep(5000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
if (data != null)
System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true);
subTrees.parallelStream()
// .map(Tree::sendCommandAll)
.forEach(Tree::sendCommandAll);
// .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2);
}
}
(我用forEach
还是reduce
都无所谓)
Main.java
:
package il.co.roy;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Main
{
public static void main(String... args)
{
System.out.println("Processors: " + Runtime.getRuntime().availableProcessors());
final Tree<Integer> root = new Tree<>(null,
Set.of(new Tree<>(1,
IntStream.range(2, 7)
.boxed()
.map(Tree::new)
.collect(Collectors.toSet()))));
root.sendCommandAll();
// IntStream.generate(() -> 1)
// .parallel()
// .forEach(i ->
// {
// System.out.println(Thread.currentThread().getName());
// try
// {
// Thread.sleep(5000);
// } catch (InterruptedException e)
// {
// e.printStackTrace();
// }
// });
}
}
在 main
方法中,我创建了一个具有以下结构的树:\
root (data is `null`)
|- 1
|- 2
|- 3
|- 4
|- 5
|- 6
sendCommandAll
函数仅在父树完成处理后才处理每个子树(并行)。
但结果如下:
Processors: 8
[main] sending command to 1
[main] tree with data 1 got true
[main] sending command to 6
[ForkJoinPool.commonPool-worker-2] sending command to 5
[main] tree with data 6 got true
[ForkJoinPool.commonPool-worker-2] tree with data 5 got true
[ForkJoinPool.commonPool-worker-2] sending command to 4
[ForkJoinPool.commonPool-worker-2] tree with data 4 got true
[ForkJoinPool.commonPool-worker-2] sending command to 3
[ForkJoinPool.commonPool-worker-2] tree with data 3 got true
[ForkJoinPool.commonPool-worker-2] sending command to 2
[ForkJoinPool.commonPool-worker-2] tree with data 2 got true
(作为记录,当我在 Main.java
中执行注释代码时,JVM 使用所有 7(+ 1)个可用线程 commonPool
)
如何改进我的代码?
如HashMap
s或HashSet
s时的线程利用率取决于后备数组中元素的分布,这取决于在哈希码上。尤其是在元素数量较少的情况下,与(默认)容量相比,这可能会导致糟糕的工作拆分。
一个简单的解决方法是使用 new ArrayList<>(subTrees).parallelStream()
而不是 subTrees.parallelStream()
。
但请注意,您的方法在处理子节点之前会执行当前节点的实际工作(在使用 sleep
模拟的示例中),这也会降低潜在的并行性。
您可以使用
public void sendCommandAll() {
if(subTrees.isEmpty()) {
actualSendCommand();
return;
}
List<Tree<T>> tmp = new ArrayList<>(subTrees.size() + 1);
tmp.addAll(subTrees);
tmp.add(this);
tmp.parallelStream().forEach(t -> {
if(t != this) t.sendCommandAll(); else t.actualSendCommand();
});
}
private void actualSendCommand() {
if (data != null)
System.out.println("[" + Thread.currentThread().getName()
+ "] sending command to " + data);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (data != null)
System.out.println("[" + Thread.currentThread().getName()
+ "] tree with data " + data + " got " + true);
}
这允许在处理子节点的同时处理当前节点。