为什么 Hashmap.values().parallelStream() 不能并行 运行 而将它们包装在 ArrayList 中可以工作?
Why Hashmap.values().parallelStream() does not run in parallel while wrap them in ArrayList could work?
hashmap 有两个键值对,它们不是由不同线程并行处理的。
import java.util.stream.Stream;
import java.util.Map;
import java.util.HashMap;
class Ideone
{
public static void main (String[] args) throws java.lang.Exception
{
Map<String, Integer> map = new HashMap<>();
map.put("a", 1);
map.put("b", 2);
map.values().parallelStream()
.peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
.forEach(System.out::println);
}
}
输出:
processing 1 in Thread[main,5,main]
1
processing 2 in Thread[main,5,main]
2
URL: https://ideone.com/Hkxkoz
ValueSpliterator 应该尝试将 HashMap 的数组拆分到大小为 1 的槽中,这意味着两个元素应该在不同的线程中处理。
来源:https://www.codota.com/code/java/methods/java8.util.HMSpliterators$ValueSpliterator/%3Cinit%3E
将它们包装在 ArrayList
中后,它按预期工作。
new ArrayList(map.values()).parallelStream()
.peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
.forEach(System.out::println);
输出:
processing 1 in Thread[ForkJoinPool.commonPool-worker-3,5,main]
1
processing 2 in Thread[main,5,main]
2
如 中所述,问题与以下事实有关:HashMap
的容量可能大于其大小,并且实际值根据其哈希分布在支持数组中代码。
对于所有基于数组的拆分器,拆分逻辑基本相同,无论您流过数组、ArrayList
还是 HashMap
。为了在最大努力的基础上获得平衡的拆分,每个拆分将减半(索引)范围,但在 HashMap
的情况下,范围内的实际元素数量与范围大小不同。
原则上,每个基于范围的拆分器都可以拆分为单个元素,但是,客户端代码,即 Stream API 实现,到目前为止可能不会拆分。甚至尝试拆分的决定是由预期的元素数量和 CPU 核心数量决定的。
参加以下节目
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
map.put("a", 1);
map.put("b", 2);
for(int depth: new int[] { 1, 2, Integer.MAX_VALUE }) {
System.out.println("With max depth: "+depth);
Tree<Spliterator<Map.Entry<String, Integer>>> spTree
= split(map.entrySet().spliterator(), depth);
Tree<String> valueTree = spTree.map(sp -> "estimated: "+sp.estimateSize()+" "
+StreamSupport.stream(sp, false).collect(Collectors.toList()));
System.out.println(valueTree);
}
}
private static <T> Tree<Spliterator<T>> split(Spliterator<T> sp, int depth) {
Spliterator<T> prefix = depth-- > 0? sp.trySplit(): null;
return prefix == null?
new Tree<>(sp): new Tree<>(null, split(prefix, depth), split(sp, depth));
}
public static class Tree<T> {
final T value;
List<Tree<T>> children;
public Tree(T value) {
this.value = value;
children = Collections.emptyList();
}
public Tree(T value, Tree<T>... ch) {
this.value = value;
children = Arrays.asList(ch);
}
public <U> Tree<U> map(Function<? super T, ? extends U> f) {
Tree<U> t = new Tree<>(value == null? null: f.apply(value));
if(!children.isEmpty()) {
t.children = new ArrayList<>(children.size());
for(Tree<T> ch: children) t.children.add(ch.map(f));
}
return t;
}
public @Override String toString() {
if(children.isEmpty()) return value == null? "": value.toString();
final StringBuilder sb = new StringBuilder(100);
toString(sb, 0, 0);
return sb.toString();
}
public void toString(StringBuilder sb, int preS, int preEnd) {
final int myHandle = sb.length() - 2;
sb.append(value == null? "": value).append('\n');
final int num = children.size() - 1;
if (num >= 0) {
if (num != 0) {
for (int ix = 0; ix < num; ix++) {
int nPreS = sb.length();
sb.append(sb, preS, preEnd);
sb.append("\u2502 ");
int nPreE = sb.length();
children.get(ix).toString(sb, nPreS, nPreE);
}
}
int nPreS = sb.length();
sb.append(sb, preS, preEnd);
final int lastItemHandle = sb.length();
sb.append(" ");
int nPreE = sb.length();
children.get(num).toString(sb, nPreS, nPreE);
sb.setCharAt(lastItemHandle, '\u2514');
}
if (myHandle > 0) {
sb.setCharAt(myHandle, '\u251c');
sb.setCharAt(myHandle + 1, '\u2500');
}
}
}
您将获得:
With max depth: 1
├─estimated: 1 [a=1, b=2]
└─estimated: 1 []
With max depth: 2
├─
│ ├─estimated: 0 [a=1, b=2]
│ └─estimated: 0 []
└─
├─estimated: 0 []
└─estimated: 0 []
With max depth: 2147483647
├─
│ ├─
│ │ ├─
│ │ │ ├─estimated: 0 []
│ │ │ └─estimated: 0 [a=1]
│ │ └─
│ │ ├─estimated: 0 [b=2]
│ │ └─estimated: 0 []
│ └─
│ ├─
│ │ ├─estimated: 0 []
│ │ └─estimated: 0 []
│ └─
│ ├─estimated: 0 []
│ └─estimated: 0 []
└─
├─
│ ├─
│ │ ├─estimated: 0 []
│ │ └─estimated: 0 []
│ └─
│ ├─estimated: 0 []
│ └─estimated: 0 []
└─
├─
│ ├─estimated: 0 []
│ └─estimated: 0 []
└─
├─estimated: 0 []
└─estimated: 0 []
因此,如前所述,如果我们拆分得足够深,拆分器可以拆分为单个元素,但是,两个元素的估计大小并不表明这样做是值得的。在每次拆分时,它会将估计值减半,虽然您可能会说这对于您感兴趣的元素是错误的,但对于这里的大多数拆分器来说它实际上是正确的,因为当下降到最大级别时,大多数拆分器都代表一个空范围将它们拆分出来是一种资源浪费。
如另一个答案所述,决定是关于平衡拆分工作(或一般准备工作)和预期并行化工作,Stream 实现无法提前知道。如果你事先知道每个元素的工作量会非常高,为了证明更多的准备工作是合理的,你可以使用,例如new ArrayList<>(map.[keySet|entrySet|values]()) .parallelStream()
强制执行平衡拆分。通常,无论如何,对于更大的地图,问题会小得多。
感谢,我会在这里添加更多细节。
根本原因来自 HashMap.values()
的 sizeEstimate 不准确。
默认情况下,HashMap 的容量为 16,有 2 个元素,由数组支持。 Spliterator 的估计大小为 2.
每一次,每次拆分都会将数组减半。在这种情况下,16 长度的数组被分成两半,每半 8 个,每半的估计大小为 1。由于元素是根据哈希码放置的,不幸的是,两个元素位于同一半。
那么forkjoin框架认为1是below the sizeThreshold,就会停止拆分,开始处理任务。
同时arrayList没有这个问题,因为estimatedSize总是准确的。
hashmap 有两个键值对,它们不是由不同线程并行处理的。
import java.util.stream.Stream;
import java.util.Map;
import java.util.HashMap;
class Ideone
{
public static void main (String[] args) throws java.lang.Exception
{
Map<String, Integer> map = new HashMap<>();
map.put("a", 1);
map.put("b", 2);
map.values().parallelStream()
.peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
.forEach(System.out::println);
}
}
输出:
processing 1 in Thread[main,5,main]
1
processing 2 in Thread[main,5,main]
2
URL: https://ideone.com/Hkxkoz
ValueSpliterator 应该尝试将 HashMap 的数组拆分到大小为 1 的槽中,这意味着两个元素应该在不同的线程中处理。
来源:https://www.codota.com/code/java/methods/java8.util.HMSpliterators$ValueSpliterator/%3Cinit%3E
将它们包装在 ArrayList
中后,它按预期工作。
new ArrayList(map.values()).parallelStream()
.peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
.forEach(System.out::println);
输出:
processing 1 in Thread[ForkJoinPool.commonPool-worker-3,5,main]
1
processing 2 in Thread[main,5,main]
2
如 HashMap
的容量可能大于其大小,并且实际值根据其哈希分布在支持数组中代码。
对于所有基于数组的拆分器,拆分逻辑基本相同,无论您流过数组、ArrayList
还是 HashMap
。为了在最大努力的基础上获得平衡的拆分,每个拆分将减半(索引)范围,但在 HashMap
的情况下,范围内的实际元素数量与范围大小不同。
原则上,每个基于范围的拆分器都可以拆分为单个元素,但是,客户端代码,即 Stream API 实现,到目前为止可能不会拆分。甚至尝试拆分的决定是由预期的元素数量和 CPU 核心数量决定的。
参加以下节目
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
map.put("a", 1);
map.put("b", 2);
for(int depth: new int[] { 1, 2, Integer.MAX_VALUE }) {
System.out.println("With max depth: "+depth);
Tree<Spliterator<Map.Entry<String, Integer>>> spTree
= split(map.entrySet().spliterator(), depth);
Tree<String> valueTree = spTree.map(sp -> "estimated: "+sp.estimateSize()+" "
+StreamSupport.stream(sp, false).collect(Collectors.toList()));
System.out.println(valueTree);
}
}
private static <T> Tree<Spliterator<T>> split(Spliterator<T> sp, int depth) {
Spliterator<T> prefix = depth-- > 0? sp.trySplit(): null;
return prefix == null?
new Tree<>(sp): new Tree<>(null, split(prefix, depth), split(sp, depth));
}
public static class Tree<T> {
final T value;
List<Tree<T>> children;
public Tree(T value) {
this.value = value;
children = Collections.emptyList();
}
public Tree(T value, Tree<T>... ch) {
this.value = value;
children = Arrays.asList(ch);
}
public <U> Tree<U> map(Function<? super T, ? extends U> f) {
Tree<U> t = new Tree<>(value == null? null: f.apply(value));
if(!children.isEmpty()) {
t.children = new ArrayList<>(children.size());
for(Tree<T> ch: children) t.children.add(ch.map(f));
}
return t;
}
public @Override String toString() {
if(children.isEmpty()) return value == null? "": value.toString();
final StringBuilder sb = new StringBuilder(100);
toString(sb, 0, 0);
return sb.toString();
}
public void toString(StringBuilder sb, int preS, int preEnd) {
final int myHandle = sb.length() - 2;
sb.append(value == null? "": value).append('\n');
final int num = children.size() - 1;
if (num >= 0) {
if (num != 0) {
for (int ix = 0; ix < num; ix++) {
int nPreS = sb.length();
sb.append(sb, preS, preEnd);
sb.append("\u2502 ");
int nPreE = sb.length();
children.get(ix).toString(sb, nPreS, nPreE);
}
}
int nPreS = sb.length();
sb.append(sb, preS, preEnd);
final int lastItemHandle = sb.length();
sb.append(" ");
int nPreE = sb.length();
children.get(num).toString(sb, nPreS, nPreE);
sb.setCharAt(lastItemHandle, '\u2514');
}
if (myHandle > 0) {
sb.setCharAt(myHandle, '\u251c');
sb.setCharAt(myHandle + 1, '\u2500');
}
}
}
您将获得:
With max depth: 1
├─estimated: 1 [a=1, b=2]
└─estimated: 1 []
With max depth: 2
├─
│ ├─estimated: 0 [a=1, b=2]
│ └─estimated: 0 []
└─
├─estimated: 0 []
└─estimated: 0 []
With max depth: 2147483647
├─
│ ├─
│ │ ├─
│ │ │ ├─estimated: 0 []
│ │ │ └─estimated: 0 [a=1]
│ │ └─
│ │ ├─estimated: 0 [b=2]
│ │ └─estimated: 0 []
│ └─
│ ├─
│ │ ├─estimated: 0 []
│ │ └─estimated: 0 []
│ └─
│ ├─estimated: 0 []
│ └─estimated: 0 []
└─
├─
│ ├─
│ │ ├─estimated: 0 []
│ │ └─estimated: 0 []
│ └─
│ ├─estimated: 0 []
│ └─estimated: 0 []
└─
├─
│ ├─estimated: 0 []
│ └─estimated: 0 []
└─
├─estimated: 0 []
└─estimated: 0 []
因此,如前所述,如果我们拆分得足够深,拆分器可以拆分为单个元素,但是,两个元素的估计大小并不表明这样做是值得的。在每次拆分时,它会将估计值减半,虽然您可能会说这对于您感兴趣的元素是错误的,但对于这里的大多数拆分器来说它实际上是正确的,因为当下降到最大级别时,大多数拆分器都代表一个空范围将它们拆分出来是一种资源浪费。
如另一个答案所述,决定是关于平衡拆分工作(或一般准备工作)和预期并行化工作,Stream 实现无法提前知道。如果你事先知道每个元素的工作量会非常高,为了证明更多的准备工作是合理的,你可以使用,例如new ArrayList<>(map.[keySet|entrySet|values]()) .parallelStream()
强制执行平衡拆分。通常,无论如何,对于更大的地图,问题会小得多。
感谢
根本原因来自 HashMap.values()
的 sizeEstimate 不准确。
默认情况下,HashMap 的容量为 16,有 2 个元素,由数组支持。 Spliterator 的估计大小为 2.
每一次,每次拆分都会将数组减半。在这种情况下,16 长度的数组被分成两半,每半 8 个,每半的估计大小为 1。由于元素是根据哈希码放置的,不幸的是,两个元素位于同一半。
那么forkjoin框架认为1是below the sizeThreshold,就会停止拆分,开始处理任务。
同时arrayList没有这个问题,因为estimatedSize总是准确的。