java.util.ConcurrentModificationException 将 Future 的结果组合到 HashMap 中时

java.util.ConcurrentModificationException when combing results from Future's into HashMap

我 运行 遇到一个问题,我在合并期货结果时间歇性地遇到 java.util.ConcurrentModificationException 错误。我已经尝试使我所有的 HashMap 并发,并且我尝试使用迭代器,但错误仍然存​​在,我完全不知道这个错误是如何发生的。我也没有遍历我正在插入的同一个 HashMap(正如在类似问题中所做的那样,出现了这个错误)。

对于我的代码,首先我构建了一堆 return 的 HashMap 任务,然后使用 .invokeAll 我的任务所有 return 上面的 HashMap 其中然后我尝试合并在一起(我正在读取一个大的 CSV 以获得每一列的所有唯一结果)。

我从定义所有键开始。

HashMap<String, HashSet<String>> headersHashset = new HashMap<String, HashSet<String>>();
headers = Arrays.asList(br.readLine().split(delimiter));
for (String headerID : headers) {
    headersHashset.put(headerID, new HashSet<>());
}

然后我执行我的任务,克隆密钥,进行处理,然后return它是结果

tasks.add(() -> {
    HashMap<String, HashSet<String>> localHeadersHashset = (HashMap<String, HashSet<String>>) headersHashset.clone();
    
    for (String[] values : sampleSet.values()) {  // sampleSet is a SortedMap<Integer, String[]>
        int headerAsINT = 0;
        for (String value : values) {
            localHeadersHashset.get(headers.get(headerAsINT)).add(value);
            headerAsINT++;
        }
    }
        
    return localHeadersHashset;
});

我调用了所有的任务,其中的结果被放入了 Future 的列表中

ExecutorService es = Executors.newCachedThreadPool();
List<Future<HashMap<String, HashSet<String>>>> futures = null;
try {
    futures = es.invokeAll(tasks);
    es.shutdown();
} catch (InterruptedException e) {
    e.printStackTrace();
}

然后,为了安全起见,我制作了一个 ConruentHashMap,其中包含所有预定义键的副本。然后,我遍历任务的所有结果,将它们也放入 ConcurrentHashMap 中,并将所有结果添加到带有预定义键的原始复制的 ConcurrentHashMap 中。这是错误发生的时候(尽管只是有时,这表明它与线程有关,但我认为此时所有的线程工作都已完成?)。

ConcurrentHashMap<String, HashSet<String>> headersHashsetConcurrent = new ConcurrentHashMap<>(headersHashset);

try {
    for (Future<HashMap<String, HashSet<String>>> f : futures) {
        ConcurrentHashMap<String, HashSet<String>> threadSafeItems = new ConcurrentHashMap<>(f.get());
        for (Map.Entry<String, HashSet<String>> items : threadSafeItems.entrySet()) {
            headersHashsetConcurrent.get(items.getKey()).addAll(items.getValue());  // ERROR SHOWS HERE 
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}

这里是完整的错误:

java.util.ConcurrentModificationException
    at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1584)
    at java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1607)
    at java.base/java.util.AbstractCollection.addAll(AbstractCollection.java:335)
    at project/project.parseCSV.processFile(parseCSV.java:101)
    at project/project.parseCSV.call(parseCSV.java:126)
    at project/project.parseCSV.call(parseCSV.java:11)
    at javafx.graphics/javafx.concurrent.Task$TaskCallable.call(Task.java:1425)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:832)

我真的不确定为什么这会导致错误,因为 addAll 仅在主线程上 运行,而且它也在插入到 ConCurrent HashMap 中 - 任何关于为什么会发生这种情况的想法都会不胜感激!

问题是 headersHashset.clone() 没有在值中克隆 HashSets。

来自docs:

Returns a shallow copy of this HashMap instance: the keys and values themselves are not cloned.

这意味着 localHeadersHashset 在您的任务中,您的 headersHashsetConcurrentfutures 返回的 threadSafeItems — 所有这些都使用相同的 HashSet相同键的对象。

由于任务是在并行线程中执行的,所以完全有可能在主线程遍历同一个HashSet里面的元素时,某个任务同时执行HashSet.add()

            headersHashsetConcurrent.get(items.getKey()).addAll(items.getValue());

这就是导致您 ConcurrentModificationException 的原因。