并发用于 AddOrUpdate 的 ConcurrentDictionary 和 ConcurrentBag

ConcurrentDictionary and ConcurrentBag for AddOrUpdate on parallel

使用 ConcurrentDictionary 和 ConcurrentBag 来 AddOrUpdate 值的方法是否正确。

基本上试过如下,

  1. 拥有包含数百万条记录的文件并尝试处理和提取到对象。

  2. 条目就像键值对,键=WBAN 和值作为对象

     var cd = new ConcurrentDictionary<String, ConcurrentBag<Data>>();
     int count = 0;
    
     foreach (var line in File.ReadLines(path).AsParallel().WithDegreeOfParallelism(5))
     {
         var sInfo = line.Split(new char[] { ',' });
         cd.AddOrUpdate(sInfo[0], new ConcurrentBag<Data>(){ new Data()
         {
             WBAN =  sInfo[0],
                 Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
                 time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
     }
         }
         ,
         (oldKey, oldValue) =>
         {
             oldValue.Add(new Data()
             {
                 WBAN = sInfo[0],
                 Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
                 time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
             });
    
             return oldValue;
         }
         );
     }
    
  • 您的程序是 IO-bound,而不是 CPU-bound,因此并行处理没有任何优势。
    • 这是 IO-bound 因为如果不首先从文件中读取该行,您的程序将无法处理该行数据,并且 一般来说 计算机总是从存储速度比他们处理速度慢得多。
    • 由于您的程序只对读取的每一行执行简单的字符串操作,所以我可以 99.9% 有把握地说,将 Data 元素添加到 Dictionary<String,List<Data>> 所花费的时间是您的计算机从 text-file.
    • 中读取一行所需时间的一小部分
  • 此外,避免对此类程序使用 File.ReadLines,因为这会首先将整个文件读入内存。
    • 如果您查看我的解决方案,您会发现它使用 StreamReader 来读取每一行 one-by-one 这意味着它不需要等到它首先将所有内容读入内存。

因此,要以最佳性能解析该文件,您不需要任何 并发集合。

就是这个:

private static readonly Char[] _sep = new Char[] { ',' }; // Declared here to ensure only a single array allocation.

public static async Task< Dictionary<String,List<Data>> > ReadFileAsync( FileInfo file )
{
    const Int32 ONE_MEGABYTE = 1 * 1024 * 1024; // Use 1MB+ sized buffers for async IO. Not smaller buffers like 1024 or 4096 as those are for synchronous IO.

    Dictionary<String,List<Data>> dict = new Dictionary<String,List<Data>>( capacity: 1024 );


    using( FileStream fs = new FileStream( path, FileAccess.Read, FileMode.Open, FileShare.Read, ONE_MEGABYTE, FileOptions.Asynchronous | FileOptions.SequentialScan ) )
    using( StreamReader rdr = new StreamReader( fs ) )
    {
        String line;
        while( ( line = await rdr.ReadLineAsync().ConfigureAwait(false) ) != null )
        {
            String[] values = line.Split( sep );
            if( values.Length < 3 ) continue;

            Data d = new Data()
            {
                WBAN = values[0],
                Date = values[1],
                time = values[2]
            };

            if( !dict.TryGetValue( d.WBAN, out List<Data> list ) )
            {
                dict[ d.WBAN ] = list = new List<Data>();
            }

            list.Add( d );
        }
    }
}

更新:假设地说...

假设地说,因为文件 IO(特别是异步 FileStream IO)使用大缓冲区(在这种情况下,ONE_MEGABYTE 大小的缓冲区)程序可以传递每个缓冲区(当它被读取时,按顺序)进入并行处理器。

然而,问题是缓冲区内的数据对于单个线程来说不能是微不足道的apportioned-out:在这种情况下因为行的长度不固定,所以单个线程仍然需要读取整个缓冲区以找出 line-breaks 的位置(从技术上讲 that 可以在某种程度上并行化,这会增加大量的复杂性(因为您还需要处理跨越缓冲区边界的行,或只包含一行的缓冲区等)。

并且在这种小规模下,使用 thread-pool 和并发 collection-types 的开销将擦除 parallel-procesing 的 speed-up,因为程序仍然主要是 IO-bound.

现在,如果您有一个大小为 GB 的文件,其中 Data 条记录的大小约为 1KB,那么我将详细说明如何做到这一点,因为在这种规模下您可能会看到适度的性能提升。

你的想法基本上是正确的,但是在实现上有一个瑕疵。使用 foreach 语句枚举 ParallelQuery 不会导致循环内的代码并行 运行。在此阶段,并行化阶段已经完成。在您的代码中实际上没有并行化的工作,因为 .AsParallel().WithDegreeOfParallelism(5) 之后没有附加运算符。要并行循环,您必须将 foreach 替换为 ForAll 运算符,如下所示:

File.ReadLines(path)
    .AsParallel()
    .WithDegreeOfParallelism(5)
    .ForAll(line => { /* Process each line in parallel */ });

了解此处并行化的内容很重要。每行的处理是并行的,而从文件系统加载每行不是。加载是序列化的。并行 LINQ 引擎使用的工作线程(其中一个是当前线程)在访问源 IEnumerable(本例中为 File.ReadLines(path))时同步。

使用嵌套的ConcurrentDictionary<String, ConcurrentBag<Data>>结构来存储处理过的行不是很有效。您可以相信 PLINQ 在对数据进行分组方面做得比您手动处理并发集合等做得更好。 通过使用 ToLookup 运算符,您可以获得一个 ILookup<string, Data>,它本质上是一个只读字典,每个键都有多个值。

var separators = new char[] { ',' };

var lookup = File.ReadLines(path)
    .AsParallel()
    .WithDegreeOfParallelism(5)
    .Select(line => line.Split(separators))
    .ToLookup(sInfo => sInfo[0], sInfo => new Data()
    {
        WBAN =  sInfo[0],
        Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
        time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
    });

就性能和内存效率而言,这应该是一个更好的选择,除非您特别希望生成的结构可变并且 thread-safe 出于某种原因。

另外两个注意事项:

  1. 硬编码并行度(在本例中为 5)是可以的,前提是您知道您的程序所在的硬件 运行。否则可能会造成 over-subscription 的摩擦(线程数比机器实际内核数多)。提示:虚拟机通常配置为单线程。

  2. ConcurrentBag is a very specialized collection. In the majority of cases you'll get better performance with a ConcurrentQueue. Both classes offer a similar API. People probably prefer the ConcurrentBag because its Add method is more familiar than the Enqueue.