ConcurrentHashMap 中 entrySet().removeIf 的行为

Behavior of entrySet().removeIf in ConcurrentHashMap

我想使用 ConcurrentHashMap 让一个线程定期从地图中删除一些项目,同时让其他线程从地图中放置和获取项目。

我在删除线程中使用 map.entrySet().removeIf(lambda)。我想知道我可以对其行为做出哪些假设。我可以看到 removeIf 方法使用迭代器遍历映射中的元素,检查给定条件,然后在需要时使用 iterator.remove() 删除它们。

文档提供了一些关于 ConcurrentHashMap 迭代器行为的信息:

Similarly, Iterators, Spliterators and Enumerations return elements reflecting the state of the hash table at some point at or since the creation of the iterator/enumeration. hey do not throw ConcurrentModificationException. However, iterators are designed to be used by only one thread at a time.

由于整个 removeIf 调用发生在一个线程中,因此我可以确定迭代器不会同时被多个线程使用。我仍然想知道下面描述的事件过程是否可能:

  1. 地图包含映射:'A'->0
  2. 删除线程开始执行map.entrySet().removeIf(entry->entry.getValue()==0)
  3. Deleting Thread 调用 .iteratator() inside removeIf 调用并获取反映集合当前状态的迭代器
  4. 另一个线程执行map.put('A', 1)
  5. 删除线程仍然看到 'A'->0 映射(迭代器反映旧状态)并且因为 0==0 为真,它决定从映射中删除 A 键。
  6. 地图现在包含 'A'->1 但删除线程会看到 0 的旧值并且 'A' ->1 条目被删除,即使它不应该被删除。地图是空的。

我可以想象,实施可以通过多种方式阻止该行为。例如:也许迭代器不反映 put/remove 操作但总是反映值更新,或者迭代器的 remove 方法在调用 remove 之前检查整个映射(键和值)是否仍然存在于映射中钥匙。我找不到有关发生的任何事情的信息,我想知道是否有什么东西可以使该用例安全。

在 Zielu 的回答下方的评论中与用户 Zielu 讨论后,我深入研究了 ConcurrentHashMap 代码并发现:

  • ConcurrentHashMap 实现提供 remove(key, value) 调用 replaceNode(key, null, value) 的方法
  • replaceNode 在删除之前检查键和值是否仍然存在于地图中,因此使用它应该没问题。文档说它

Replaces node value with v, conditional upon match of cv if * non-null.

  • 在问题中提到的情况下ConcurrentHashMap的.entrySet()被调用了whichreturnsEntrySetViewclass。然后 removeIf 方法调用 .iterator() 其中 returns EntryIterator
  • EntryIterator 扩展 BaseIterator 并继承 remove 调用 map.replaceNode(p.key, null, null) 的实现,它禁用有条件的删除并且总是删除密钥。

如果迭代器总是在 'current' 值上迭代并且在某些值被修改时从不返回旧值,那么仍然可以防止负面事件的发生。我仍然不知道是否会发生这种情况,但下面提到的测试用例似乎验证了整个事情。

我认为已经创建了一个测试用例,它表明我的问题中描述的行为确实会发生。如果代码中有任何错误,请纠正我。

代码启动了两个线程。其中之一 (DELETING_THREAD) 删除映射到 'false' 布尔值的所有条目。另一个 (ADDING_THREAD) 随机将 (1, true)(1,false) 值放入地图中。如果它把 true 放在值中,它期望条目在检查时仍然存在,如果不存在则抛出异常。当我在本地 运行 它时,它会迅速抛出异常。

package test;

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;

public class MainClass {

    private static final Random RANDOM = new Random();

    private static final ConcurrentHashMap<Integer, Boolean> MAP = new ConcurrentHashMap<Integer, Boolean>();

    private static final Integer KEY = 1;

    private static final Thread DELETING_THREAD = new Thread() {

        @Override
        public void run() {
            while (true) {
                MAP.entrySet().removeIf(entry -> entry.getValue() == false);
            }
        }

    };

    private static final Thread ADDING_THREAD = new Thread() {

        @Override
        public void run() {
            while (true) {
                boolean val = RANDOM.nextBoolean();

                MAP.put(KEY, val);
                if (val == true && !MAP.containsKey(KEY)) {
                    throw new RuntimeException("TRUE value was removed");
                }

            }
        }

    };

    public static void main(String[] args) throws InterruptedException {
        DELETING_THREAD.setDaemon(true);
        ADDING_THREAD.start();
        DELETING_THREAD.start();
        ADDING_THREAD.join();
    }
}

我也设法在我的机器上重现了这种情况。 我认为,问题在于 EntrySetView(由 ConcurrentHashMap.entrySet() 返回)从 Collection 继承其 removeIf 实现,它看起来像:

    default boolean removeIf(Predicate<? super E> filter) {
        Objects.requireNonNull(filter);
        boolean removed = false;
        final Iterator<E> each = iterator();
        while (each.hasNext()) {
            // `test` returns `true` for some entry
            if (filter.test(each.next())) { 
               // entry has been just changed, `test` would return `false` now
               each.remove(); // ...but we still remove
               removed = true;
            }
        }
        return removed;
    }

以我的愚见,这不能被视为 ConcurrentHashMap 的正确实施。