Parallel.ForEach missing/ignoring/jumping over/not 添加对象
Parallel.ForEach missing/ignoring/jumping over/not adding objects
我的 _baseBlockContainer.GetBaseBlocks();
returns 一个 ConcurrentQueue
有 15317
个对象。为了进一步处理,我想按 Type
对它们进行排序。但是,它总是“漏掉”一些对象。
我的 Parallel.ForEach
似乎不是线程安全的,因为 ConcurrentQueue
中的对象数量对于 Type
有时更少(对于 a 减少 1 到 250 个对象) Type
) 比按同步 foreach
排序时要好;但我没有看到 where/why.
var baseBlocks = _baseBlockContainer.GetBaseBlocks();
var baseBlocksByTypeConcurrent = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this always differ
Parallel.ForEach(baseBlocks, bb =>
{
if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
{
baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
});
var baseBlocksByType = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this are always the same
foreach (var bb in baseBlocks)
{
if (!baseBlocksByType.ContainsKey(bb.GetType()))
{
baseBlocksByType[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByType[bb.GetType()].Enqueue(bb);
}
替换为:
if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
{
baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
有了这个:
baseBlocksByTypeConcurrent.TryAdd(bb.GetType(), new ConcurrentQueue<BaseBlock>());
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
您现有代码的问题是,如果 .ContainsKey
在多个线程中同时对同一块类型计算为 false
,那么它们都会设置与该类型对应的值到一个新队列,擦除该类型的任何现有队列。也就是说:ContainsKey
和索引器本身是线程安全的,但如果按照您的方式单独使用则不是。
TryAdd
是线程安全的,只会添加该键一次,而不是像分配给索引器那样重写它。
您的代码受到所谓的 race condition. Using concurrent collections alone does not prevent race condition to occur. You also have to use them correctly, by utilizing their special atomic APIs. In your case the appropriate API to use is the GetOrAdd
方法的影响:
Adds a key/value pair to the ConcurrentDictionary<TKey,TValue>
if the key does not already exist. Returns the new value, or the existing value if the key already exists.
用法:
ParallelOptions options = new()
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(baseBlocks, options, bb =>
{
baseBlocksByTypeConcurrent
.GetOrAdd(bb.GetType(), _ => new ConcurrentQueue<BaseBlock>())
.Enqueue(bb);
});
附带说明一下,无论何时使用 Parallel.ForEach
方法,都建议明确指定 MaxDegreeOfParallelism
。默认的 MaxDegreeOfParallelism
是 -1,这意味着无限制的并行性,这实际上使 ThreadPool
.
饱和
我的 _baseBlockContainer.GetBaseBlocks();
returns 一个 ConcurrentQueue
有 15317
个对象。为了进一步处理,我想按 Type
对它们进行排序。但是,它总是“漏掉”一些对象。
我的 Parallel.ForEach
似乎不是线程安全的,因为 ConcurrentQueue
中的对象数量对于 Type
有时更少(对于 a 减少 1 到 250 个对象) Type
) 比按同步 foreach
排序时要好;但我没有看到 where/why.
var baseBlocks = _baseBlockContainer.GetBaseBlocks();
var baseBlocksByTypeConcurrent = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this always differ
Parallel.ForEach(baseBlocks, bb =>
{
if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
{
baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
});
var baseBlocksByType = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this are always the same
foreach (var bb in baseBlocks)
{
if (!baseBlocksByType.ContainsKey(bb.GetType()))
{
baseBlocksByType[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByType[bb.GetType()].Enqueue(bb);
}
替换为:
if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
{
baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
有了这个:
baseBlocksByTypeConcurrent.TryAdd(bb.GetType(), new ConcurrentQueue<BaseBlock>());
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
您现有代码的问题是,如果 .ContainsKey
在多个线程中同时对同一块类型计算为 false
,那么它们都会设置与该类型对应的值到一个新队列,擦除该类型的任何现有队列。也就是说:ContainsKey
和索引器本身是线程安全的,但如果按照您的方式单独使用则不是。
TryAdd
是线程安全的,只会添加该键一次,而不是像分配给索引器那样重写它。
您的代码受到所谓的 race condition. Using concurrent collections alone does not prevent race condition to occur. You also have to use them correctly, by utilizing their special atomic APIs. In your case the appropriate API to use is the GetOrAdd
方法的影响:
Adds a key/value pair to the
ConcurrentDictionary<TKey,TValue>
if the key does not already exist. Returns the new value, or the existing value if the key already exists.
用法:
ParallelOptions options = new()
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(baseBlocks, options, bb =>
{
baseBlocksByTypeConcurrent
.GetOrAdd(bb.GetType(), _ => new ConcurrentQueue<BaseBlock>())
.Enqueue(bb);
});
附带说明一下,无论何时使用 Parallel.ForEach
方法,都建议明确指定 MaxDegreeOfParallelism
。默认的 MaxDegreeOfParallelism
是 -1,这意味着无限制的并行性,这实际上使 ThreadPool
.