如何使用 TPL 管道从多个线程安全地访问集合

How to safely access collection from multiple threads using TPL pipeline

这是使用 TPL 数据流的非常简化的代码示例:

//This is collection where I register all items that need to be processed by pipeline
//MyData is simple class with 2 properties = int Id, bool IsCompleted       
private ConcurrentBag<MyData> completedItems = new ConcurrentBag<MyData>();
//This method may be called multiple times in a very short time frame which means that
//few pipelines may be running simultaneously.
public void InitiateProcess(List<MyData> inputData)
{
    inputData.ForEach(ent => completedItems.Add(ent));
    StartPipeline(inputData);
}
public void StartPipeline(List<MyData> inputData)
{
    //Here goes TransformBlock downloadBlock=...
    //Here goes TransfromBlock processBlock=...
    //In the resultBlock I would like to update corresponding item in completedItems bag.
    var resultBlock = new ActionBlock<MyData>(data =>
    {
        var completedItem = completedItems.FirstOrDefault(ent => ent.Id == data.Id);
        if (completedItem != null)
            completedItem.IsCompleted = true;
    });
}

我的主要目标是注册已成功完成的项目。没有什么很复杂,但是我对并行编程了解得越多,我就越了解它有多复杂,并且应该非常小心地使用它。我知道可能有多个不同的线程试图同时访问 completedItems 集合。我做了一些研究,使用 ConcurrentBag 来跟踪这些项目似乎是一个很好的方法。所以我的问题是使用这种方法是否有任何潜在的危险?

根据微软 documentation

All public and protected members of ConcurrentBag are thread-safe and may be used concurrently from multiple threads.

这意味着ConcurrentBag<T>被设计成可以被多个线程访问而不用担心它们相互干扰

编辑:我刚刚注意到您正在使用 FirstOrDefault,这与文档

一样

However, members accessed through one of the interfaces the ConcurrentBag implements, including extension methods, are not guaranteed to be thread safe and may need to be synchronized by the caller.

也就是说 FirstOrDefault 可能不是线程安全的,并且 ConcurrentBag<T> 允许存在重复项。也许切换到 ConcurrentDictionary 或使用带有 lock 语句的 List<T> 会更好