Parallel.ForEach:当集合的记录数变高时保存集合的最佳方法?

Parallel.ForEach: Best way to save off a collection when its record count gets high?

所以我运行宁 Parallel.ForEach 基本上生成一堆数据,这些数据最终将保存到数据库中。但是,由于数据收集可能会变得非常大,我需要偶尔 save/clear 收集,以免 运行 变成 OutOfMemoryException.

我刚开始使用 Parallel.ForEach、并发集合和锁,所以我不太清楚到底需要做什么才能确保一切正常(即我们不知道在保存和清除操作之间添加到集合中的任何记录)。

目前我是说,如果记录数超过某个阈值,则将数据保存在当前集合中的 lock 块内。

ConcurrentStack<OutRecord> OutRecs = new ConcurrentStack<OutRecord>();
object StackLock = new object();

Parallel.ForEach(inputrecords, input =>
{
  lock(StackLock)
  {  
    if (OutRecs.Count >= 50000)
    {
       Save(OutRecs);
       OutRecs.Clear();
     }
   }

  OutRecs.Push(CreateOutputRecord(input);
});

if (OutRecs.Count > 0) Save(OutRecs);

我不能 100% 确定这是否按照我认为的方式工作。锁是否会阻止循环的其他实例写入输出集合?如果没有,有更好的方法吗?

您的锁将正常工作,但效率不高,因为所有您的工作线程将在每次保存操作的整个持续时间内被迫暂停。此外,锁往往(相对)昂贵,因此在每个线程的每次迭代中执行锁有点浪费。

您的一条评论提到为每个工作线程提供自己的数据存储:是的,您可以这样做。这是一个您可以根据需要定制的示例:

Parallel.ForEach(
    // collection of objects to iterate over
    inputrecords,

    // delegate to initialize thread-local data
    () => new List<OutRecord>(),

    // body of loop
    (inputrecord, loopstate, localstorage) =>
    {
        localstorage.Add(CreateOutputRecord(inputrecord));
        if (localstorage.Count > 1000)
        {
            // Save() must be thread-safe, or you'll need to wrap it in a lock
            Save(localstorage);
            localstorage.Clear();
        }
        return localstorage;
    },

    // finally block gets executed after each thread exits
    localstorage =>
    {
        if (localstorage.Count > 0)
        {
            // Save() must be thread-safe, or you'll need to wrap it in a lock
            Save(localstorage);
            localstorage.Clear();
        }
    });

一种方法是定义代表数据目的地的抽象。可能是这样的:

public interface IRecordWriter<T> // perhaps come up with a better name.
{
    void WriteRecord(T record);
    void Flush();
}

并行处理记录的 class 无需担心这些记录的处理方式或记录过多时会发生什么。 IRecordWriter 的实现处理所有这些细节,使您的其他 class 更容易测试。

IRecordWriter 的实现可能如下所示:

public abstract class BufferedRecordWriter<T> : IRecordWriter<T>
{
    private readonly ConcurrentQueue<T> _buffer = new ConcurrentQueue<T>();
    private readonly int _maxCapacity;
    private bool _flushing;

    public ConcurrentQueueRecordOutput(int maxCapacity = 100)
    {
        _maxCapacity = maxCapacity;
    }

    public void WriteRecord(T record)
    {
        _buffer.Enqueue(record);
        if (_buffer.Count >= _maxCapacity && !_flushing)
            Flush();
    }

    public void Flush()
    {
        _flushing = true;
        try
        {
            var recordsToWrite = new List<T>();
            while (_buffer.TryDequeue(out T dequeued))
            {
                recordsToWrite.Add(dequeued);
            }
            if(recordsToWrite.Any())
                WriteRecords(recordsToWrite);
        }
        finally
        {
            _flushing = false;
        }
    }

    protected abstract void WriteRecords(IEnumerable<T> records);
}

当缓冲区达到最大大小时,其中的所有记录都将发送到WriteRecords。因为 _buffer 是一个 ConcurrentQueue 它可以在添加记录时保持阅读记录。

Flush 方法可以是任何特定于您如何编写记录的方法。这不是一个抽象的 class 到数据库或文件的实际输出可能是注入到这个的另一个依赖项。您可以做出这样的决定、重构并改变主意,因为第一个 class 不受这些更改的影响。它所知道的只是 IRecordWriter 不会改变的接口。

您可能会注意到我还没有完全确定 Flush 不会在不同线程上并发执行。我可以对此进行更多锁定,但这并不重要。这将避免大多数并发执行,但如果并发执行都从 ConcurrentQueue.

读取也没关系

这只是一个粗略的轮廓,但它显示了如果我们将它们分开,所有步骤将如何变得更简单和更容易测试。一个 class 将输入转换为输出。另一个 class 缓冲输出并写入它们。第二个 class 甚至可以分成两部分 - 一个作为缓冲区,另一个作为 "final" 写入器,将它们发送到数据库或文件或其他目的地。