并发用于 AddOrUpdate 的 ConcurrentDictionary 和 ConcurrentBag
ConcurrentDictionary and ConcurrentBag for AddOrUpdate on parallel
使用 ConcurrentDictionary 和 ConcurrentBag 来 AddOrUpdate 值的方法是否正确。
基本上试过如下,
拥有包含数百万条记录的文件并尝试处理和提取到对象。
条目就像键值对,键=WBAN 和值作为对象。
var cd = new ConcurrentDictionary<String, ConcurrentBag<Data>>();
int count = 0;
foreach (var line in File.ReadLines(path).AsParallel().WithDegreeOfParallelism(5))
{
var sInfo = line.Split(new char[] { ',' });
cd.AddOrUpdate(sInfo[0], new ConcurrentBag<Data>(){ new Data()
{
WBAN = sInfo[0],
Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
}
}
,
(oldKey, oldValue) =>
{
oldValue.Add(new Data()
{
WBAN = sInfo[0],
Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
});
return oldValue;
}
);
}
- 您的程序是 IO-bound,而不是 CPU-bound,因此并行处理没有任何优势。
- 这是 IO-bound 因为如果不首先从文件中读取该行,您的程序将无法处理该行数据,并且 一般来说 计算机总是从存储速度比他们处理速度慢得多。
- 由于您的程序只对读取的每一行执行简单的字符串操作,所以我可以 99.9% 有把握地说,将
Data
元素添加到 Dictionary<String,List<Data>>
所花费的时间是您的计算机从 text-file. 中读取一行所需时间的一小部分
- 此外,避免对此类程序使用
File.ReadLines
,因为这会首先将整个文件读入内存。
- 如果您查看我的解决方案,您会发现它使用
StreamReader
来读取每一行 one-by-one 这意味着它不需要等到它首先将所有内容读入内存。
因此,要以最佳性能解析该文件,您不需要任何 并发集合。
就是这个:
private static readonly Char[] _sep = new Char[] { ',' }; // Declared here to ensure only a single array allocation.
public static async Task< Dictionary<String,List<Data>> > ReadFileAsync( FileInfo file )
{
const Int32 ONE_MEGABYTE = 1 * 1024 * 1024; // Use 1MB+ sized buffers for async IO. Not smaller buffers like 1024 or 4096 as those are for synchronous IO.
Dictionary<String,List<Data>> dict = new Dictionary<String,List<Data>>( capacity: 1024 );
using( FileStream fs = new FileStream( path, FileAccess.Read, FileMode.Open, FileShare.Read, ONE_MEGABYTE, FileOptions.Asynchronous | FileOptions.SequentialScan ) )
using( StreamReader rdr = new StreamReader( fs ) )
{
String line;
while( ( line = await rdr.ReadLineAsync().ConfigureAwait(false) ) != null )
{
String[] values = line.Split( sep );
if( values.Length < 3 ) continue;
Data d = new Data()
{
WBAN = values[0],
Date = values[1],
time = values[2]
};
if( !dict.TryGetValue( d.WBAN, out List<Data> list ) )
{
dict[ d.WBAN ] = list = new List<Data>();
}
list.Add( d );
}
}
}
更新:假设地说...
假设地说,因为文件 IO(特别是异步 FileStream
IO)使用大缓冲区(在这种情况下,ONE_MEGABYTE
大小的缓冲区)程序可以传递每个缓冲区(当它被读取时,按顺序)进入并行处理器。
然而,问题是缓冲区内的数据对于单个线程来说不能是微不足道的apportioned-out:在这种情况下因为行的长度不固定,所以单个线程仍然需要读取整个缓冲区以找出 line-breaks 的位置(从技术上讲 that 可以在某种程度上并行化,这会增加大量的复杂性(因为您还需要处理跨越缓冲区边界的行,或只包含一行的缓冲区等)。
并且在这种小规模下,使用 thread-pool 和并发 collection-types 的开销将擦除 parallel-procesing 的 speed-up,因为程序仍然主要是 IO-bound.
现在,如果您有一个大小为 GB 的文件,其中 Data
条记录的大小约为 1KB,那么我将详细说明如何做到这一点,因为在这种规模下您可能会看到适度的性能提升。
你的想法基本上是正确的,但是在实现上有一个瑕疵。使用 foreach
语句枚举 ParallelQuery
不会导致循环内的代码并行 运行。在此阶段,并行化阶段已经完成。在您的代码中实际上没有并行化的工作,因为 .AsParallel().WithDegreeOfParallelism(5)
之后没有附加运算符。要并行循环,您必须将 foreach
替换为 ForAll
运算符,如下所示:
File.ReadLines(path)
.AsParallel()
.WithDegreeOfParallelism(5)
.ForAll(line => { /* Process each line in parallel */ });
了解此处并行化的内容很重要。每行的处理是并行的,而从文件系统加载每行不是。加载是序列化的。并行 LINQ 引擎使用的工作线程(其中一个是当前线程)在访问源 IEnumerable
(本例中为 File.ReadLines(path)
)时同步。
使用嵌套的ConcurrentDictionary<String, ConcurrentBag<Data>>
结构来存储处理过的行不是很有效。您可以相信 PLINQ 在对数据进行分组方面做得比您手动处理并发集合等做得更好。
通过使用 ToLookup
运算符,您可以获得一个 ILookup<string, Data>
,它本质上是一个只读字典,每个键都有多个值。
var separators = new char[] { ',' };
var lookup = File.ReadLines(path)
.AsParallel()
.WithDegreeOfParallelism(5)
.Select(line => line.Split(separators))
.ToLookup(sInfo => sInfo[0], sInfo => new Data()
{
WBAN = sInfo[0],
Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
});
就性能和内存效率而言,这应该是一个更好的选择,除非您特别希望生成的结构可变并且 thread-safe 出于某种原因。
另外两个注意事项:
硬编码并行度(在本例中为 5)是可以的,前提是您知道您的程序所在的硬件 运行。否则可能会造成 over-subscription 的摩擦(线程数比机器实际内核数多)。提示:虚拟机通常配置为单线程。
ConcurrentBag
is a very specialized collection. In the majority of cases you'll get better performance with a ConcurrentQueue
. Both classes offer a similar API. People probably prefer the ConcurrentBag
because its Add
method is more familiar than the Enqueue
.
使用 ConcurrentDictionary 和 ConcurrentBag 来 AddOrUpdate 值的方法是否正确。
基本上试过如下,
拥有包含数百万条记录的文件并尝试处理和提取到对象。
条目就像键值对,键=WBAN 和值作为对象。
var cd = new ConcurrentDictionary<String, ConcurrentBag<Data>>(); int count = 0; foreach (var line in File.ReadLines(path).AsParallel().WithDegreeOfParallelism(5)) { var sInfo = line.Split(new char[] { ',' }); cd.AddOrUpdate(sInfo[0], new ConcurrentBag<Data>(){ new Data() { WBAN = sInfo[0], Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1], time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2] } } , (oldKey, oldValue) => { oldValue.Add(new Data() { WBAN = sInfo[0], Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1], time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2] }); return oldValue; } ); }
- 您的程序是 IO-bound,而不是 CPU-bound,因此并行处理没有任何优势。
- 这是 IO-bound 因为如果不首先从文件中读取该行,您的程序将无法处理该行数据,并且 一般来说 计算机总是从存储速度比他们处理速度慢得多。
- 由于您的程序只对读取的每一行执行简单的字符串操作,所以我可以 99.9% 有把握地说,将
Data
元素添加到Dictionary<String,List<Data>>
所花费的时间是您的计算机从 text-file. 中读取一行所需时间的一小部分
- 此外,避免对此类程序使用
File.ReadLines
,因为这会首先将整个文件读入内存。- 如果您查看我的解决方案,您会发现它使用
StreamReader
来读取每一行 one-by-one 这意味着它不需要等到它首先将所有内容读入内存。
- 如果您查看我的解决方案,您会发现它使用
因此,要以最佳性能解析该文件,您不需要任何 并发集合。
就是这个:
private static readonly Char[] _sep = new Char[] { ',' }; // Declared here to ensure only a single array allocation.
public static async Task< Dictionary<String,List<Data>> > ReadFileAsync( FileInfo file )
{
const Int32 ONE_MEGABYTE = 1 * 1024 * 1024; // Use 1MB+ sized buffers for async IO. Not smaller buffers like 1024 or 4096 as those are for synchronous IO.
Dictionary<String,List<Data>> dict = new Dictionary<String,List<Data>>( capacity: 1024 );
using( FileStream fs = new FileStream( path, FileAccess.Read, FileMode.Open, FileShare.Read, ONE_MEGABYTE, FileOptions.Asynchronous | FileOptions.SequentialScan ) )
using( StreamReader rdr = new StreamReader( fs ) )
{
String line;
while( ( line = await rdr.ReadLineAsync().ConfigureAwait(false) ) != null )
{
String[] values = line.Split( sep );
if( values.Length < 3 ) continue;
Data d = new Data()
{
WBAN = values[0],
Date = values[1],
time = values[2]
};
if( !dict.TryGetValue( d.WBAN, out List<Data> list ) )
{
dict[ d.WBAN ] = list = new List<Data>();
}
list.Add( d );
}
}
}
更新:假设地说...
假设地说,因为文件 IO(特别是异步 FileStream
IO)使用大缓冲区(在这种情况下,ONE_MEGABYTE
大小的缓冲区)程序可以传递每个缓冲区(当它被读取时,按顺序)进入并行处理器。
然而,问题是缓冲区内的数据对于单个线程来说不能是微不足道的apportioned-out:在这种情况下因为行的长度不固定,所以单个线程仍然需要读取整个缓冲区以找出 line-breaks 的位置(从技术上讲 that 可以在某种程度上并行化,这会增加大量的复杂性(因为您还需要处理跨越缓冲区边界的行,或只包含一行的缓冲区等)。
并且在这种小规模下,使用 thread-pool 和并发 collection-types 的开销将擦除 parallel-procesing 的 speed-up,因为程序仍然主要是 IO-bound.
现在,如果您有一个大小为 GB 的文件,其中 Data
条记录的大小约为 1KB,那么我将详细说明如何做到这一点,因为在这种规模下您可能会看到适度的性能提升。
你的想法基本上是正确的,但是在实现上有一个瑕疵。使用 foreach
语句枚举 ParallelQuery
不会导致循环内的代码并行 运行。在此阶段,并行化阶段已经完成。在您的代码中实际上没有并行化的工作,因为 .AsParallel().WithDegreeOfParallelism(5)
之后没有附加运算符。要并行循环,您必须将 foreach
替换为 ForAll
运算符,如下所示:
File.ReadLines(path)
.AsParallel()
.WithDegreeOfParallelism(5)
.ForAll(line => { /* Process each line in parallel */ });
了解此处并行化的内容很重要。每行的处理是并行的,而从文件系统加载每行不是。加载是序列化的。并行 LINQ 引擎使用的工作线程(其中一个是当前线程)在访问源 IEnumerable
(本例中为 File.ReadLines(path)
)时同步。
使用嵌套的ConcurrentDictionary<String, ConcurrentBag<Data>>
结构来存储处理过的行不是很有效。您可以相信 PLINQ 在对数据进行分组方面做得比您手动处理并发集合等做得更好。
通过使用 ToLookup
运算符,您可以获得一个 ILookup<string, Data>
,它本质上是一个只读字典,每个键都有多个值。
var separators = new char[] { ',' };
var lookup = File.ReadLines(path)
.AsParallel()
.WithDegreeOfParallelism(5)
.Select(line => line.Split(separators))
.ToLookup(sInfo => sInfo[0], sInfo => new Data()
{
WBAN = sInfo[0],
Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
});
就性能和内存效率而言,这应该是一个更好的选择,除非您特别希望生成的结构可变并且 thread-safe 出于某种原因。
另外两个注意事项:
硬编码并行度(在本例中为 5)是可以的,前提是您知道您的程序所在的硬件 运行。否则可能会造成 over-subscription 的摩擦(线程数比机器实际内核数多)。提示:虚拟机通常配置为单线程。
ConcurrentBag
is a very specialized collection. In the majority of cases you'll get better performance with aConcurrentQueue
. Both classes offer a similar API. People probably prefer theConcurrentBag
because itsAdd
method is more familiar than theEnqueue
.