为什么 ParallelStream 不会在递归中使用所有 commonPool 的线程?

Why ParallelStream won't use all commonPool's thread in recursion?

当我 运行 以下代码时,8 个线程中只有 2 个可用 运行,谁能解释为什么会这样?我怎样才能更改代码以利用所有 8 个线程?

Tree.java:

package il.co.roy;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

public class Tree<T>
{
    private final T data;
    private final Set<Tree<T>> subTrees;

    public Tree(T data, Set<Tree<T>> subTrees)
    {
        this.data = data;
        this.subTrees = subTrees;
    }

    public Tree(T data)
    {
        this(data, new HashSet<>());
    }

    public Tree()
    {
        this(null);
    }

    public T getData()
    {
        return data;
    }

    public Set<Tree<T>> getSubTrees()
    {
        return subTrees;
    }

    @Override
    public boolean equals(Object o)
    {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        Tree<?> tree = (Tree<?>) o;
        return Objects.equals(data, tree.data) &&
                Objects.equals(subTrees, tree.subTrees);
    }

    @Override
    public int hashCode()
    {
        return Objects.hash(data, subTrees);
    }

    @Override
    public String toString()
    {
        return "Tree{" +
                "data=" + data +
                ", subTrees=" + subTrees +
                '}';
    }

    public void sendCommandAll()
    {
        if (data != null)
            System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data);
        try
        {
            Thread.sleep(5000);
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        if (data != null)
            System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true);
        subTrees.parallelStream()
//              .map(Tree::sendCommandAll)
                .forEach(Tree::sendCommandAll);
//              .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2);
    }
}

(我用forEach还是reduce都无所谓)

Main.java:

package il.co.roy;

import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Main
{
    public static void main(String... args)
    {
        System.out.println("Processors: " + Runtime.getRuntime().availableProcessors());


        final Tree<Integer> root = new Tree<>(null,
                Set.of(new Tree<>(1,
                        IntStream.range(2, 7)
                                        .boxed()
                                        .map(Tree::new)
                                        .collect(Collectors.toSet()))));

        root.sendCommandAll();

//      IntStream.generate(() -> 1)
//              .parallel()
//              .forEach(i ->
//              {
//                  System.out.println(Thread.currentThread().getName());
//                  try
//                  {
//                      Thread.sleep(5000);
//                  } catch (InterruptedException e)
//                  {
//                      e.printStackTrace();
//                  }
//              });
    }
}

main 方法中,我创建了一个具有以下结构的树:\

root (data is `null`)
  |- 1
     |- 2
     |- 3
     |- 4
     |- 5
     |- 6

sendCommandAll 函数仅在父树完成处理后才处理每个子树(并行)。 但结果如下:

Processors: 8
[main] sending command to 1
[main] tree with data 1 got true
[main] sending command to 6
[ForkJoinPool.commonPool-worker-2] sending command to 5
[main] tree with data 6 got true
[ForkJoinPool.commonPool-worker-2] tree with data 5 got true
[ForkJoinPool.commonPool-worker-2] sending command to 4
[ForkJoinPool.commonPool-worker-2] tree with data 4 got true
[ForkJoinPool.commonPool-worker-2] sending command to 3
[ForkJoinPool.commonPool-worker-2] tree with data 3 got true
[ForkJoinPool.commonPool-worker-2] sending command to 2
[ForkJoinPool.commonPool-worker-2] tree with data 2 got true

(作为记录,当我在 Main.java 中执行注释代码时,JVM 使用所有 7(+ 1)个可用线程 commonPool

如何改进我的代码?

后半部分所述,处理HashMaps或HashSets时的线程利用率取决于后备数组中元素的分布,这取决于在哈希码上。尤其是在元素数量较少的情况下,与(默认)容量相比,这可能会导致糟糕的工作拆分。

一个简单的解决方法是使用 new ArrayList<>(subTrees).parallelStream() 而不是 subTrees.parallelStream()

但请注意,您的方法在处理子节点之前会执行当前节点的实际工作(在使用 sleep 模拟的示例中),这也会降低潜在的并行性。

您可以使用

public void sendCommandAll() {
    if(subTrees.isEmpty()) {
        actualSendCommand();
        return;
    }
    List<Tree<T>> tmp = new ArrayList<>(subTrees.size() + 1);
    tmp.addAll(subTrees);
    tmp.add(this);
    tmp.parallelStream().forEach(t -> {
        if(t != this) t.sendCommandAll(); else t.actualSendCommand();
    });
}

private void actualSendCommand() {
    if (data != null)
        System.out.println("[" + Thread.currentThread().getName()
                         + "] sending command to " + data);
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if (data != null)
        System.out.println("[" + Thread.currentThread().getName()
                         + "] tree with data " + data + " got " + true);
}

这允许在处理子节点的同时处理当前节点。