枚举 ConcurrentDictionary 时是否可能遗漏初始项?

Is it possible to miss an initial item when enumerating ConcurrentDictionary?

我正在枚举 ConcurrentDictionary,我需要确保我没有遗漏任何初始项目。换句话说,我需要确保枚举所有初始项。

初始项:枚举开始时字典中的所有项。

文档说:

The enumerator returned from the dictionary is safe to use concurrently with reads and writes to the dictionary, however it does not represent a moment-in-time snapshot of the dictionary. The contents exposed through the enumerator may contain modifications made to the dictionary after

但不清楚是否枚举了所有初始项。所以我用下面的代码测试了它:

public class Program
{
    static volatile bool enumeratioCompleted;
    static volatile bool enumerationStarted;
    static int itemsAddedInWorkerThread;
    static ConcurrentDictionary<Guid, object> concurrentDic = new ConcurrentDictionary<Guid, object>();

    public static void Main(string[] args)
    {
        var dic = new Dictionary<Guid, object>();
        const int initialItems = 100_000;
        const int workerThreadCount = 4;

        for (int i = 1; i < initialItems; i++)
        {
            var key = Guid.NewGuid();
            var value = new object();
            dic.Add(key, value);
            concurrentDic.TryAdd(key, value);
        }

        var workerThreads = new Thread[workerThreadCount];
        for (var i = 0; i < workerThreadCount; i++)
        {
            workerThreads[i] = new Thread(AddItemsToConcurrentDicWhileEnumerating);
            workerThreads[i].Start();
        }
        int enumeratedItems = 0;
        foreach (var kv in concurrentDic)
        {
            if (enumerationStarted == false) enumerationStarted = true;
            enumeratedItems++;
            dic.Remove(kv.Key);
        }
        enumeratioCompleted = true;
        for (var i= 0; i < workerThreadCount; i++)
        {
            workerThreads[i].Join();
        }
        Console.WriteLine($"Initial items {initialItems}");
        Console.WriteLine($"Initial items not enumerated: {dic.Count}");
        Console.WriteLine($"Items enumerated: {enumeratedItems}");
        Console.WriteLine($"Items added in worker thread: {itemsAddedInWorkerThread}");
    }

    static void AddItemsToConcurrentDicWhileEnumerating()
    {
        while (enumerationStarted == false) ;
        while (enumeratioCompleted == false)
        {
            var key = Guid.NewGuid();
            var value = new object();
            concurrentDic.TryAdd(key, value);
            Interlocked.Increment(ref itemsAddedInWorkerThread);
        }
    }
}

它输出如下内容:

Initial items 100000
Initial items not enumerated: 0
Items enumerated: 108301
Items added in worker thread: 136729

所以似乎所有初始项都被枚举了。请您确认是否有保证?

希望这会让它更清楚。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp3
{
    public class Program
    {
        static volatile bool enumeratioCompleted;
        static volatile bool enumerationStarted;
        static int itemsAddedInWorkerThread;
        static ConcurrentDictionary<Guid, object> concurrentDic = new ConcurrentDictionary<Guid, object>();

        public static void Main(string[] args)
        {
            var dic = new Dictionary<Guid, object>();
            const int initialItems = 100_000;
            const int workerThreadCount = 4;

            for (int i = 0; i < initialItems; i++)
            {
                var key = Guid.NewGuid();
                var value = new object();
                dic.Add(key, value);
                concurrentDic.TryAdd(key, value);
            }

            Console.WriteLine($"Initial items {initialItems}");

            var workerThreads = new Thread[workerThreadCount];
            for (var i = 0; i < workerThreadCount; i++)
            {
                workerThreads[i] = new Thread(AddItemsToConcurrentDicWhileEnumerating);
                workerThreads[i].Start(i+1);
            }
            Console.WriteLine($"Number of items in concurrent dictionary right now: {concurrentDic.Count}");
            int itemsremovedduring1stiteration = 0;
            int enumeratedItems = 0;
            foreach (var kv in concurrentDic)
            {
                if (enumerationStarted == false) enumerationStarted = true;

                if (dic.ContainsKey(kv.Key)) // is it an item from our "initial" list?
                {
                    dic.Remove(kv.Key);
                    concurrentDic.TryRemove(kv.Key, out _);

                    itemsremovedduring1stiteration++;
                }

                enumeratedItems++;
            }
            Console.WriteLine($"Items added in worker thread: {itemsAddedInWorkerThread}");
            Console.WriteLine($"Number of items in concurrent dictionary right now: {concurrentDic.Count}");
            Console.WriteLine($"Items removed from concurrent dictionary: {itemsremovedduring1stiteration}");
            Console.WriteLine($"Items enumerated in 1st enumeration: {enumeratedItems}");
            enumeratioCompleted = true;
            for (var i = 0; i < workerThreadCount; i++)
            {
                workerThreads[i].Join();
            }
            Console.WriteLine($"Items added in worker thread: {itemsAddedInWorkerThread}");
            Console.WriteLine($"Items still left in concurrent dictionary: {concurrentDic.Count}");
            int enumeratedItems2nd = 0;
            foreach (var kv in concurrentDic)
            {
                if (!concurrentDic.TryRemove(kv.Key, out _))
                {
                    System.Diagnostics.Debugger.Break();
                }
                else
                    enumeratedItems2nd++;
            }
            Console.WriteLine($"Items enumerated in 2nd enumeration: {enumeratedItems2nd}");
        }

        static void AddItemsToConcurrentDicWhileEnumerating(object data)
        {
            int threadno = (int)data;

            Console.WriteLine($"Adding items from thread {threadno} ...");

            while (enumerationStarted == false) ;
            while (enumeratioCompleted == false)
            {
                var key = Guid.NewGuid();
                var value = new object();
                if (concurrentDic.TryAdd(key, value) == false)
                {
                    System.Diagnostics.Debugger.Break();
                }
                Interlocked.Increment(ref itemsAddedInWorkerThread);
            }
        }
    }
}

不,Microsoft 不对 ConcurrentDictionary<K,V> 的枚举做出任何其他保证,除了它是“安全的”。而已。这个集合可以(理论上)总是 return 一个空序列,并且仍然符合规范的当前状态。如需深入了解,您可以查看 this GitHub 问题。但实际上,这个集合有一个合理的行为。它的行为就像任何人期望的那样。您有一些选择:

  1. 切换到正常 Dictionary<K,V>,受 lock 保护。经常更新,很少枚举就好了。
  2. 切换到奇异的 ImmutableDictionary<K,V>, and use the ImmutableInterlocked class 以无锁方式更新它。少更新多枚举就好了
  3. 继续使用 ConcurrentDictionary<K,V>,并依靠 Microsoft 普遍不愿意进行可能破坏现有代码的更改。你也可以写几个单元测试,以便尽快观察并有时间对这种变化做出及时反应。

¹ 这些人在幻想未来会对他们的代码库进行哪些重大更改时往往是进步的,因此他们喜欢发布不限制他们保留当前实现的模糊规范.当关于更好实现的实际想法出现时,他们往往会保守,因为担心有代码依赖于当前未记录的行为。