使用 ConcurrentBag 的并行 ForEach 未按预期工作
Parallel ForEach using a ConcurrentBag not working as expected
我有这段代码可以处理列表中的项目:
static readonly object _Lock = new object();
public class Item
{
public string Name;
public string ID;
}
static void Main(string[] args)
{
var items = new List<Item>
{
new Item { Name = "One", ID = "123" },
new Item { Name = "Two", ID = "234" },
new Item { Name = "Three", ID = "123" }
};
var itemsProcess = new ConcurrentBag<Item>();
Parallel.ForEach(items, (item) =>
{
Item itemProcess = null;
// lock (_Lock)
{
itemProcess = itemsProcess.FirstOrDefault(a => a.ID == item.ID);
}
if (itemProcess != null)
{
Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
}
else
{
itemsProcess.Add(item);
Console.WriteLine($"Processing item [{item.Name}]");
Thread.Sleep(1000); // do some work...
}
});
Console.ReadKey();
}
我基本上是使用 ConcurrentBag
根据几个条件检查对象是否存在。
我期望总是得到这样的输出(顺序可能不同):
Processing item [One]
Item [Three] was already processed as [One]
Processing item [Two]
但我有时得到一个输出,这表明我的代码不是线程安全的:
Processing item [Three]
Processing item [One]
Processing item [Two]
所以我认为 itemsProcess.FirstOrDefault()
会阻塞的假设是错误的。
使用 lock
不会改变任何东西。显然,这里有问题,我真的不明白为什么?
我知道我可以用其他方式“解决”这个问题(一种是在输入 Parallel.ForEach()
之前准备好列表),但我真的很想知道 为什么 这种行为?
之所以,是因为仍然存在数据竞争...2个线程仍然可以读取并添加到ConcurrentBag
中的 ]non-thread 安全 方式。使用任何 并发集合 只意味着你有一个结构是 self-consistent,但它不能保护你不写其他non-thread安全代码
你的想法是正确的 lock
var itemsProcess = new Dictionary<string, Item>();
Parallel.ForEach(items, (item) =>
{
lock (_Lock)
{
if (itemsProcess.TryGetValue(item.ID, out var val))
{
Console.WriteLine($"Item [{item.Name}] was already processed as [{val.Name}]");
return;
}
itemsProcess.TryAdd(item.ID, item);
}
Console.WriteLine($"Processing item [{item.Name}]");
Thread.Sleep(1000); // do some work...
});
注意 :您还可以在并行处理列表之前过滤重复项,这样根本不需要锁或集合
不求助于锁,你可以“滥用”一个ConcurrentDictionary
,避免这里所有的锁来确保唯一性。
通过 ID 将项目添加到字典中,数据结构将保持一致,完成后您可以使用 dictionary.Values
字段来获取唯一项目。
P.S.: 我觉得你的例子要复杂得多,因为没有人使用 Parallel.ForEach()
做 Distinct()
,这就是你的代码的总和。
最后,要解决发生这种情况的原因,当涉及到并发时,这几乎总是 anti-pattern 并且不符合作者在这里的意思。
if(!collection.Contains(item))
collection.Add(item);
当 Contains()
执行并 returned false 时,另一个线程可能已经执行了相同的任务,抢在前面并添加了相同的项目。
这种竞争条件是为什么几乎所有的集合修改操作都有两种形式:你有一个 collection.TryAdd()
会尝试自动添加一个项目和 return true/false 告诉你的结果或者你有像 GetOrAdd()
和 AddOrUpdate()
这样的东西,它们再次自动插入一个项目,然后 get/update 它。
您的并行循环中有 2 个独立操作:FirstOrDefault
和 Add
。
ConcurrentBag
无法确保这 2 个操作之间 thread-safety。
另一种方法是 ConcurrentDictionary
,它有一个 GetOrAdd
方法,它只会在键不存在时添加一个项目:
var itemsProcess = new ConcurrentDictionary<string, Item>();
Parallel.ForEach(items, item =>
{
// Returns existing item with same ID or adds this item
var itemProcess = itemsProcess.GetOrAdd(item.Id, item);
if (!object.ReferenceEquals(item, itemProcess))
{
Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
}
else
{
Console.WriteLine($"Processing item [{item.Name}]");
// do some work...
}
});
如果您随后需要作为 ICollection
处理的项目,可以通过 itemsProcess.Values
.
访问它们
我有这段代码可以处理列表中的项目:
static readonly object _Lock = new object();
public class Item
{
public string Name;
public string ID;
}
static void Main(string[] args)
{
var items = new List<Item>
{
new Item { Name = "One", ID = "123" },
new Item { Name = "Two", ID = "234" },
new Item { Name = "Three", ID = "123" }
};
var itemsProcess = new ConcurrentBag<Item>();
Parallel.ForEach(items, (item) =>
{
Item itemProcess = null;
// lock (_Lock)
{
itemProcess = itemsProcess.FirstOrDefault(a => a.ID == item.ID);
}
if (itemProcess != null)
{
Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
}
else
{
itemsProcess.Add(item);
Console.WriteLine($"Processing item [{item.Name}]");
Thread.Sleep(1000); // do some work...
}
});
Console.ReadKey();
}
我基本上是使用 ConcurrentBag
根据几个条件检查对象是否存在。
我期望总是得到这样的输出(顺序可能不同):
Processing item [One]
Item [Three] was already processed as [One]
Processing item [Two]
但我有时得到一个输出,这表明我的代码不是线程安全的:
Processing item [Three]
Processing item [One]
Processing item [Two]
所以我认为 itemsProcess.FirstOrDefault()
会阻塞的假设是错误的。
使用 lock
不会改变任何东西。显然,这里有问题,我真的不明白为什么?
我知道我可以用其他方式“解决”这个问题(一种是在输入 Parallel.ForEach()
之前准备好列表),但我真的很想知道 为什么 这种行为?
之所以,是因为仍然存在数据竞争...2个线程仍然可以读取并添加到ConcurrentBag
中的 ]non-thread 安全 方式。使用任何 并发集合 只意味着你有一个结构是 self-consistent,但它不能保护你不写其他non-thread安全代码
你的想法是正确的 lock
var itemsProcess = new Dictionary<string, Item>();
Parallel.ForEach(items, (item) =>
{
lock (_Lock)
{
if (itemsProcess.TryGetValue(item.ID, out var val))
{
Console.WriteLine($"Item [{item.Name}] was already processed as [{val.Name}]");
return;
}
itemsProcess.TryAdd(item.ID, item);
}
Console.WriteLine($"Processing item [{item.Name}]");
Thread.Sleep(1000); // do some work...
});
注意 :您还可以在并行处理列表之前过滤重复项,这样根本不需要锁或集合
不求助于锁,你可以“滥用”一个ConcurrentDictionary
,避免这里所有的锁来确保唯一性。
通过 ID 将项目添加到字典中,数据结构将保持一致,完成后您可以使用 dictionary.Values
字段来获取唯一项目。
P.S.: 我觉得你的例子要复杂得多,因为没有人使用 Parallel.ForEach()
做 Distinct()
,这就是你的代码的总和。
最后,要解决发生这种情况的原因,当涉及到并发时,这几乎总是 anti-pattern 并且不符合作者在这里的意思。
if(!collection.Contains(item))
collection.Add(item);
当 Contains()
执行并 returned false 时,另一个线程可能已经执行了相同的任务,抢在前面并添加了相同的项目。
这种竞争条件是为什么几乎所有的集合修改操作都有两种形式:你有一个 collection.TryAdd()
会尝试自动添加一个项目和 return true/false 告诉你的结果或者你有像 GetOrAdd()
和 AddOrUpdate()
这样的东西,它们再次自动插入一个项目,然后 get/update 它。
您的并行循环中有 2 个独立操作:FirstOrDefault
和 Add
。
ConcurrentBag
无法确保这 2 个操作之间 thread-safety。
另一种方法是 ConcurrentDictionary
,它有一个 GetOrAdd
方法,它只会在键不存在时添加一个项目:
var itemsProcess = new ConcurrentDictionary<string, Item>();
Parallel.ForEach(items, item =>
{
// Returns existing item with same ID or adds this item
var itemProcess = itemsProcess.GetOrAdd(item.Id, item);
if (!object.ReferenceEquals(item, itemProcess))
{
Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
}
else
{
Console.WriteLine($"Processing item [{item.Name}]");
// do some work...
}
});
如果您随后需要作为 ICollection
处理的项目,可以通过 itemsProcess.Values
.