Parallel.ForEach 带有 Where 条件的源列表
Parallel.ForEach Source List with Where Condition
我有一个代码块,它处理 StoreProducts,然后在 for each 循环中在数据库中添加或更新它们。但这很慢。当我转换代码 Parallel.ForEach 块时,相同的产品会同时添加和更新。我不知道如何安全地使用以下功能,我们将不胜感激。
var validProducts = storeProducts.Where(p => p.Price2 > 0
&& !string.IsNullOrEmpty(p.ProductAtt08Desc.Trim())
&& !string.IsNullOrEmpty(p.Barcode.Trim())
).ToList();
var processedProductCodes = new List<string>();
var po = new ParallelOptions()
{
MaxDegreeOfParallelism = 4
};
Parallel.ForEach(validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode)), po,
(product) =>
{
lock (_lockThis)
{
processedProductCodes.Add(product.ProductCode);
}
// Check if Product Exists in Db
// if product is not in Db Add to Db
// if product is in Db Update product in Db
}
这里的问题是,列表 validProducts 可能有多个相同的 ProductCode,所以它们是变体,我必须管理,即使其中一个正在处理,也不应该再次处理。
因此,在并行 foreach 'validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode)' 中找到的 where condition 并没有像正常情况下那样正常工作。
Parallel.ForEach
为每个线程在内部缓冲项目,您可以做的一个选择是切换到不使用缓冲的分区程序
var pat = Partitioner.Create(validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode))
,EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(pat, po, (product) => ...
这会让你更接近,但你仍然会有竞争条件,其中两个相同的对象可以被处理,因为如果你找到重复的,你不会跳出循环。
更好的选择是将 processedProductCodes
切换为 HashSet<string>
并将代码更改为
var processedProductCodes = new HashSet<string>();
var po = new ParallelOptions()
{
MaxDegreeOfParallelism = 4
};
Parallel.ForEach(validProducts, po,
(product) =>
{
//You can safely lock on processedProductCodes
lock (processedProductCodes)
{
if(!processedProductCodes.Add(product.ProductCode))
{
//Add returns false if the code is already in the collection.
return;
}
}
// Check if Product Exists in Db
// if product is not in Db Add to Db
// if product is in Db Update product in Db
}
HashSet 的查找速度更快,并且内置于 Add
函数中。
我的大部分回答都是对你问题的回答,更多的是一些指导 - 如果你提供更多的技术细节,我可能会提供更准确的帮助。
Parallel.ForEach
可能不是这里的最佳解决方案 -- 特别是当您有共享列表或繁忙的服务器时。
您正在锁定写入而不是读取该共享列表。所以我很惊讶它没有在 Where 期间抛出。将 List<string>
转换为 ConcurrentDictionary<string, bool>
(只是为了创建一个简单的并发哈希 table),然后您将获得更好的写入吞吐量并且不会在读取期间抛出。
但是您将遇到数据库争用问题(如果使用多个连接),因为您的插入可能仍需要锁定。即使您只是简单地将工作量拆分,您也会 运行 到此。此数据库锁定可能会导致 blocks/deadlocks,因此最终可能会比原来慢。如果使用一个连接,通常无法并行化命令。
我会尝试将大部分插入包装在一个 事务 中,其中包含 批处理 ,例如 1000 个插入,或者将整个工作负载放入一个批量中插入。然后数据库会将数据保存在内存中,并在完成后将整个数据提交到磁盘(而不是一次一条记录)。
根据您的典型工作负载,您可能想要尝试不同的存储解决方案。数据库 通常 不适合插入大量记录...您可能会发现使用替代解决方案(例如键值存储)的性能要好得多。或者把数据放到Redis之类的东西里面,在后台慢慢持久化到数据库。
我有一个代码块,它处理 StoreProducts,然后在 for each 循环中在数据库中添加或更新它们。但这很慢。当我转换代码 Parallel.ForEach 块时,相同的产品会同时添加和更新。我不知道如何安全地使用以下功能,我们将不胜感激。
var validProducts = storeProducts.Where(p => p.Price2 > 0
&& !string.IsNullOrEmpty(p.ProductAtt08Desc.Trim())
&& !string.IsNullOrEmpty(p.Barcode.Trim())
).ToList();
var processedProductCodes = new List<string>();
var po = new ParallelOptions()
{
MaxDegreeOfParallelism = 4
};
Parallel.ForEach(validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode)), po,
(product) =>
{
lock (_lockThis)
{
processedProductCodes.Add(product.ProductCode);
}
// Check if Product Exists in Db
// if product is not in Db Add to Db
// if product is in Db Update product in Db
}
这里的问题是,列表 validProducts 可能有多个相同的 ProductCode,所以它们是变体,我必须管理,即使其中一个正在处理,也不应该再次处理。
因此,在并行 foreach 'validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode)' 中找到的 where condition 并没有像正常情况下那样正常工作。
Parallel.ForEach
为每个线程在内部缓冲项目,您可以做的一个选择是切换到不使用缓冲的分区程序
var pat = Partitioner.Create(validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode))
,EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(pat, po, (product) => ...
这会让你更接近,但你仍然会有竞争条件,其中两个相同的对象可以被处理,因为如果你找到重复的,你不会跳出循环。
更好的选择是将 processedProductCodes
切换为 HashSet<string>
并将代码更改为
var processedProductCodes = new HashSet<string>();
var po = new ParallelOptions()
{
MaxDegreeOfParallelism = 4
};
Parallel.ForEach(validProducts, po,
(product) =>
{
//You can safely lock on processedProductCodes
lock (processedProductCodes)
{
if(!processedProductCodes.Add(product.ProductCode))
{
//Add returns false if the code is already in the collection.
return;
}
}
// Check if Product Exists in Db
// if product is not in Db Add to Db
// if product is in Db Update product in Db
}
HashSet 的查找速度更快,并且内置于 Add
函数中。
我的大部分回答都是对你问题的回答,更多的是一些指导 - 如果你提供更多的技术细节,我可能会提供更准确的帮助。
Parallel.ForEach
可能不是这里的最佳解决方案 -- 特别是当您有共享列表或繁忙的服务器时。
您正在锁定写入而不是读取该共享列表。所以我很惊讶它没有在 Where 期间抛出。将 List<string>
转换为 ConcurrentDictionary<string, bool>
(只是为了创建一个简单的并发哈希 table),然后您将获得更好的写入吞吐量并且不会在读取期间抛出。
但是您将遇到数据库争用问题(如果使用多个连接),因为您的插入可能仍需要锁定。即使您只是简单地将工作量拆分,您也会 运行 到此。此数据库锁定可能会导致 blocks/deadlocks,因此最终可能会比原来慢。如果使用一个连接,通常无法并行化命令。
我会尝试将大部分插入包装在一个 事务 中,其中包含 批处理 ,例如 1000 个插入,或者将整个工作负载放入一个批量中插入。然后数据库会将数据保存在内存中,并在完成后将整个数据提交到磁盘(而不是一次一条记录)。
根据您的典型工作负载,您可能想要尝试不同的存储解决方案。数据库 通常 不适合插入大量记录...您可能会发现使用替代解决方案(例如键值存储)的性能要好得多。或者把数据放到Redis之类的东西里面,在后台慢慢持久化到数据库。