使用 ConcurrentBag 的并行 ForEach 未按预期工作

Parallel ForEach using a ConcurrentBag not working as expected

我有这段代码可以处理列表中的项目:

    static readonly object _Lock = new object();

    public class Item
    {
        public string Name;
        public string ID;
    }

    static void Main(string[] args)
    {
        var items = new List<Item>
        {
            new Item { Name = "One", ID = "123" },
            new Item { Name = "Two", ID = "234" },
            new Item { Name = "Three", ID = "123" }
        };

        var itemsProcess = new ConcurrentBag<Item>();
        Parallel.ForEach(items, (item) =>
        {
            Item itemProcess = null;
            // lock (_Lock)
            {
                itemProcess = itemsProcess.FirstOrDefault(a => a.ID == item.ID);
            }
            if (itemProcess != null)
            {
                Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
            }
            else
            {
                itemsProcess.Add(item);
                Console.WriteLine($"Processing item [{item.Name}]");
                Thread.Sleep(1000); // do some work...
            }
        });

        Console.ReadKey();
      }

我基本上是使用 ConcurrentBag 根据几个条件检查对象是否存在。
期望总是得到这样的输出(顺序可能不同):

Processing item [One]
Item [Three] was already processed as [One]
Processing item [Two]

但我有时得到一个输出,这表明我的代码不是线程安全的:

Processing item [Three]
Processing item [One]
Processing item [Two]

所以我认为 itemsProcess.FirstOrDefault() 会阻塞的假设是错误的。
使用 lock 不会改变任何东西。显然,这里有问题,我真的不明白为什么?

我知道我可以用其他方式“解决”这个问题(一种是在输入 Parallel.ForEach() 之前准备好列表),但我真的很想知道 为什么 这种行为?

之所以,是因为仍然存在数据竞争...2个线程仍然可以读取并添加到ConcurrentBag中的 ]non-thread 安全 方式。使用任何 并发集合 只意味着你有一个结构是 self-consistent,但它不能保护你不写其他non-thread安全代码

你的想法是正确的 lock

var itemsProcess = new Dictionary<string, Item>();
Parallel.ForEach(items, (item) =>
{

   lock (_Lock)
   {
      if (itemsProcess.TryGetValue(item.ID, out var val))
      {
         Console.WriteLine($"Item [{item.Name}] was already processed as [{val.Name}]");
         return;
      }

      itemsProcess.TryAdd(item.ID, item);
   }

   Console.WriteLine($"Processing item [{item.Name}]");
   Thread.Sleep(1000); // do some work...

});

注意 :您还可以在并行处理列表之前过滤重复项,这样根本不需要锁或集合

不求助于锁,你可以“滥用”一个ConcurrentDictionary,避免这里所有的锁来确保唯一性。

通过 ID 将项目添加到字典中,数据结构将保持一致,完成后您可以使用 dictionary.Values 字段来获取唯一项目。

P.S.: 我觉得你的例子要复杂得多,因为没有人使用 Parallel.ForEach()Distinct(),这就是你的代码的总和。

最后,要解决发生这种情况的原因,当涉及到并发时,这几乎总是 anti-pattern 并且不符合作者在这里的意思。

if(!collection.Contains(item))
      collection.Add(item);

Contains() 执行并 returned false 时,另一个线程可能已经执行了相同的任务,抢在前面并添加了相同的项目。

这种竞争条件是为什么几乎所有的集合修改操作都有两种形式:你有一个 collection.TryAdd() 会尝试自动添加一个项目和 return true/false 告诉你的结果或者你有像 GetOrAdd()AddOrUpdate() 这样的东西,它们再次自动插入一个项目,然后 get/update 它。

您的并行循环中有 2 个独立操作:FirstOrDefaultAdd

ConcurrentBag 无法确保这 2 个操作之间 thread-safety。

另一种方法是 ConcurrentDictionary,它有一个 GetOrAdd 方法,它只会在键不存在时添加一个项目:

var itemsProcess = new ConcurrentDictionary<string, Item>();
Parallel.ForEach(items, item =>
{
    // Returns existing item with same ID or adds this item
    var itemProcess = itemsProcess.GetOrAdd(item.Id, item);
    if (!object.ReferenceEquals(item, itemProcess))
    {
        Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
    }
    else
    {
        Console.WriteLine($"Processing item [{item.Name}]");
        // do some work...
    }
});

如果您随后需要作为 ICollection 处理的项目,可以通过 itemsProcess.Values.

访问它们