Parallel.ForEach 带有 Where 条件的源列表

Parallel.ForEach Source List with Where Condition

我有一个代码块,它处理 StoreProducts,然后在 for each 循环中在数据库中添加或更新它们。但这很慢。当我转换代码 Parallel.ForEach 块时,相同的产品会同时添加和更新。我不知道如何安全地使用以下功能,我们将不胜感激。

var validProducts = storeProducts.Where(p => p.Price2 > 0
                                                     && !string.IsNullOrEmpty(p.ProductAtt08Desc.Trim())
                                                     && !string.IsNullOrEmpty(p.Barcode.Trim()) 
            ).ToList();

var processedProductCodes = new List<string>();

var po = new ParallelOptions()
        {
            MaxDegreeOfParallelism = 4
        };

Parallel.ForEach(validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode)), po,
            (product) =>
{
            lock (_lockThis)
            {
                processedProductCodes.Add(product.ProductCode);
            }

    // Check if Product Exists in Db

    // if product is not in Db Add to Db

    // if product is in Db Update product in Db

}

这里的问题是,列表 validProducts 可能有多个相同的 ProductCode,所以它们是变体,我必须管理,即使其中一个正在处理,也不应该再次处理。

因此,在并行 foreach 'validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode)' 中找到的 where condition 并没有像正常情况下那样正常工作。

Parallel.ForEach 为每个线程在内部缓冲项目,您可以做的一个选择是切换到不使用缓冲的分区程序

var pat = Partitioner.Create(validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode))
                            ,EnumerablePartitionerOptions.NoBuffering);

Parallel.ForEach(pat, po, (product) => ...

这会让你更接近,但你仍然会有竞争条件,其中两个相同的对象可以被处理,因为如果你找到重复的,你不会跳出循环。

更好的选择是将 processedProductCodes 切换为 HashSet<string> 并将代码更改为

var processedProductCodes = new HashSet<string>();

var po = new ParallelOptions()
        {
            MaxDegreeOfParallelism = 4
        };

Parallel.ForEach(validProducts, po,
            (product) =>
{
            //You can safely lock on processedProductCodes
            lock (processedProductCodes)
            {
                if(!processedProductCodes.Add(product.ProductCode))
                {
                    //Add returns false if the code is already in the collection.
                    return;
                }
            }

    // Check if Product Exists in Db

    // if product is not in Db Add to Db

    // if product is in Db Update product in Db

}

HashSet 的查找速度更快,并且内置于 Add 函数中。

我的大部分回答都是对你问题的回答,更多的是一些指导 - 如果你提供更多的技术细节,我可能会提供更准确的帮助。

Parallel.ForEach 可能不是这里的最佳解决方案 -- 特别是当您有共享列表或繁忙的服务器时。

您正在锁定写入而不是读取该共享列表。所以我很惊讶它没有在 Where 期间抛出。将 List<string> 转换为 ConcurrentDictionary<string, bool>(只是为了创建一个简单的并发哈希 table),然后您将获得更好的写入吞吐量并且不会在读取期间抛出。

但是您将遇到数据库争用问题(如果使用多个连接),因为您的插入可能仍需要锁定。即使您只是简单地将工作量拆分,您也会 运行 到此。此数据库锁定可能会导致 blocks/deadlocks,因此最终可能会比原来慢。如果使用一个连接,通常无法并行化命令。

我会尝试将大部分插入包装在一个 事务 中,其中包含 批处理 ,例如 1000 个插入,或者将整个工作负载放入一个批量中插入。然后数据库会将数据保存在内存中,并在完成后将整个数据提交到磁盘(而不是一次一条记录)。

根据您的典型工作负载,您可能想要尝试不同的存储解决方案。数据库 通常 不适合插入大量记录...您可能会发现使用替代解决方案(例如键值存储)的性能要好得多。或者把数据放到Redis之类的东西里面,在后台慢慢持久化到数据库。