computeIfAbsent 如何使 ConcurrentHashMap 随机失败?
How does computeIfAbsent fail ConcurrentHashMap randomly?
我有以下代码,它是一个玩具代码,但可以重现问题:
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toList;
public class TestClass3 {
public static void main(String[] args) throws InterruptedException {
// Setup data that we will be playing with concurrently
List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
HashMap<String, List<Integer>> keyValueMap = new HashMap<>();
for (String key : keys) {
int[] randomInts = new Random().ints(10000, 0, 10000).toArray();
keyValueMap.put(key, stream(randomInts).boxed().collect(toList()));
}
// Entering danger zone, concurrently transforming our data to another shape
ExecutorService es = Executors.newFixedThreadPool(10);
Map<Integer, Set<String>> valueKeyMap = new ConcurrentHashMap<>();
for (String key : keys) {
es.submit(() -> {
for (Integer value : keyValueMap.get(key)) {
valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
}
});
}
// Wait for all tasks in executorservice to finish
es.shutdown();
es.awaitTermination(1, TimeUnit.MINUTES);
// Danger zone ends..
// We should be in a single-thread environment now and safe
StringBuilder stringBuilder = new StringBuilder();
for (Integer integer : valueKeyMap.keySet()) {
String collect = valueKeyMap
.get(integer)
.stream()
.sorted() // This will blow randomly
.collect(Collectors.joining());
stringBuilder.append(collect); // just to print something..
}
System.out.println(stringBuilder.length());
}
}
当我一遍又一遍地运行这段代码时,它通常会运行毫无例外地打印一些数字..但是从时间开始(大约10次尝试中的1次)我将得到类似于以下的异常:
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 6
at java.util.stream.SortedOps$SizedRefSortingSink.accept(SortedOps.java:369)
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at biz.tugay.TestClass3.main(TestClass3.java:40)
我很确定它与
有关
valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
如果我按如下方式更改这部分,我永远不会得到异常:
synchronized (valueKeyMap) {
valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
}
我想 computeIfAbsent
即使在所有线程都完成后仍在修改 valueKeyMap
。
谁能解释一下为什么这段代码会随机失败,原因是什么?还是有一个完全不同的原因我可能无法看到并且我认为 computeIfAbsent
是罪魁祸首的假设是错误的?
使用 synchronized
时没有出现异常这一事实应该已经说明了问题出在哪里。如前所述,问题确实是 HashSet
因为它不是线程安全的。这在集合的文档中也有说明。
Note that this implementation is not synchronized. If multiple threads access a hash set concurrently, and at least one of the threads modifies the set, it must be synchronized externally. This is typically accomplished by synchronizing on some object that naturally encapsulates the set.
解决方案是使用 synchronized
块或使用线程安全的 CollectionView
,例如 KeySetView
,您可以使用 ConcurrentHashMap.newKeySet()
.
问题不在 computeIfAbsent
调用中,而是在最后的 .add(key)
中:您可以让多个线程尝试将元素添加到同一个 HashSet,但无法确保安全的并发访问。由于 HashSet 不是线程安全的,因此无法正常工作,并且 HashSet 有时会以损坏状态结束。稍后,当您尝试遍历 HashSet 以获取字符串时,它会由于这种损坏状态而崩溃。 (从你的异常来看,HashSet 认为它的后备数组比实际长,所以它试图访问越界数组元素。)
即使在您 没有 得到异常的运行中,您有时可能最终会得到 "dropping" 本应添加的元素,但并发更新意味着一些更新丢失了。
ConcurrentHashMap.computeIfAbsent
以原子方式执行,也就是说,一次只有一个线程可以访问与给定键关联的值。
但是,一旦返回值,就没有这样的保证了。 HashSet
可以被多个写入线程访问,因此不能安全地访问。
相反,您可以这样做:
valueKeyMap.compute(value, (k, v) -> {
if (v == null) {
v = new HashSet<>();
}
v.add(key);
return v;
});
之所以有效,是因为 compute
也是原子的。
我有以下代码,它是一个玩具代码,但可以重现问题:
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toList;
public class TestClass3 {
public static void main(String[] args) throws InterruptedException {
// Setup data that we will be playing with concurrently
List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
HashMap<String, List<Integer>> keyValueMap = new HashMap<>();
for (String key : keys) {
int[] randomInts = new Random().ints(10000, 0, 10000).toArray();
keyValueMap.put(key, stream(randomInts).boxed().collect(toList()));
}
// Entering danger zone, concurrently transforming our data to another shape
ExecutorService es = Executors.newFixedThreadPool(10);
Map<Integer, Set<String>> valueKeyMap = new ConcurrentHashMap<>();
for (String key : keys) {
es.submit(() -> {
for (Integer value : keyValueMap.get(key)) {
valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
}
});
}
// Wait for all tasks in executorservice to finish
es.shutdown();
es.awaitTermination(1, TimeUnit.MINUTES);
// Danger zone ends..
// We should be in a single-thread environment now and safe
StringBuilder stringBuilder = new StringBuilder();
for (Integer integer : valueKeyMap.keySet()) {
String collect = valueKeyMap
.get(integer)
.stream()
.sorted() // This will blow randomly
.collect(Collectors.joining());
stringBuilder.append(collect); // just to print something..
}
System.out.println(stringBuilder.length());
}
}
当我一遍又一遍地运行这段代码时,它通常会运行毫无例外地打印一些数字..但是从时间开始(大约10次尝试中的1次)我将得到类似于以下的异常:
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 6
at java.util.stream.SortedOps$SizedRefSortingSink.accept(SortedOps.java:369)
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at biz.tugay.TestClass3.main(TestClass3.java:40)
我很确定它与
有关valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
如果我按如下方式更改这部分,我永远不会得到异常:
synchronized (valueKeyMap) {
valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
}
我想 computeIfAbsent
即使在所有线程都完成后仍在修改 valueKeyMap
。
谁能解释一下为什么这段代码会随机失败,原因是什么?还是有一个完全不同的原因我可能无法看到并且我认为 computeIfAbsent
是罪魁祸首的假设是错误的?
使用 synchronized
时没有出现异常这一事实应该已经说明了问题出在哪里。如前所述,问题确实是 HashSet
因为它不是线程安全的。这在集合的文档中也有说明。
Note that this implementation is not synchronized. If multiple threads access a hash set concurrently, and at least one of the threads modifies the set, it must be synchronized externally. This is typically accomplished by synchronizing on some object that naturally encapsulates the set.
解决方案是使用 synchronized
块或使用线程安全的 CollectionView
,例如 KeySetView
,您可以使用 ConcurrentHashMap.newKeySet()
.
问题不在 computeIfAbsent
调用中,而是在最后的 .add(key)
中:您可以让多个线程尝试将元素添加到同一个 HashSet,但无法确保安全的并发访问。由于 HashSet 不是线程安全的,因此无法正常工作,并且 HashSet 有时会以损坏状态结束。稍后,当您尝试遍历 HashSet 以获取字符串时,它会由于这种损坏状态而崩溃。 (从你的异常来看,HashSet 认为它的后备数组比实际长,所以它试图访问越界数组元素。)
即使在您 没有 得到异常的运行中,您有时可能最终会得到 "dropping" 本应添加的元素,但并发更新意味着一些更新丢失了。
ConcurrentHashMap.computeIfAbsent
以原子方式执行,也就是说,一次只有一个线程可以访问与给定键关联的值。
但是,一旦返回值,就没有这样的保证了。 HashSet
可以被多个写入线程访问,因此不能安全地访问。
相反,您可以这样做:
valueKeyMap.compute(value, (k, v) -> {
if (v == null) {
v = new HashSet<>();
}
v.add(key);
return v;
});
之所以有效,是因为 compute
也是原子的。