如何使用 Parallel.ForEach 更新共享状态

How do I update shared state with Parallel.ForEach

我有一个 WPF 应用程序,它读取 Outlook .pst 文件,提取每封邮件,并将它和任何附件保存为 .pdf 文件。完成后,它会对文件进行一些其他处理。

我目前在第一部分使用普通的旧 foreach 循环。这是代码的一个相当简化的版本...

// These two are used by the WPF UI to display progress
string BusyContent;
ObservableCollection<string> Msgs = new();
// See note lower down about the quick-and-dirty logging
string _logFile = @"C:\Path\To\LogFile.log";
// _allFiles is used to keep a record of all the files we generate. Used after the loop ends
List<string> _allFiles = new();
// nCurr is used to update BusyContent, which is bound to the UI to show progress
int nCurr = 0;
// The messages would really be extracted from the .pst file. Empty list used for simplicity
List<Message> messages = new();

async Task ProcessMessages() {
  using StreamWriter logFile = new(_logFile, true);
  foreach (Message msg in messages) {
    nCurr++;
    string fileName = GenerateFileName(msg);
    // We log a lot more, but only one shown for simplicity
    Log(logFile, $"File: {fileName}");
    _allFiles.Add(fileName);
    // Let the user know where we are up to
    BusyContent = $"Processing message {nCurr}";
    // Msgs is bound to a WPF grid, so we need to use Dispatcher to update
    Application.Current.Dispatcher.Invoke(() => Msgs.Add(fileName));
    // Finally we write out the .pdf files
    await ProcessMessage(msg);
  }
}

async Task ProcessMessage(Message msg) {
  // The methods called here are omitted as they aren't relevant to my questions
  await GenerateMessagePdf(msg);
  foreach(Attachment a in msg.Attachments) {
    string fileName = GenerateFileName(a);
    // Note that we update _allFiles here as well as in the main loop
    _allFiles.Add(fileName);
    await GenerateAttachmentPdf(a);
  }
}

static void Log(StreamWriter logFile, string msg) =>
  logFile.WriteLine(DateTime.Now.ToString("yyMMdd-HHmmss.fff") + " - " + msg);

一切正常,但处理大型 .pst 文件可能需要相当长的时间。我想知道将其转换为使用 Parallel.ForEach 是否会加快速度。我可以看到这个方法的基本用法,但是有几个问题,主要是关于循环中使用的class级变量...

  1. logFile 变量被传递。这会引起问题吗?这不是一个主要问题,因为这个日志记录是作为一个快速和肮脏的调试设备添加的,并且真的应该用一个合适的日志框架来代替,但我仍然想知道我正在做的是什么并行版本中的一个问题

  2. nCurr 在循环内更新。这样安全吗,或者有更好的方法吗?

  3. _allFiles 也在主循环内更新。我只是添加条目,而不是读取或删除条目,但这安全吗?

  4. 类似地,_allFilesProcessMessage方法内部更新。我想这个问题的答案取决于上一个。

  5. 在循环内更新 BusyContent 和调用 Application.Current.Dispatcher.Invoke 是否有问题?

感谢您提供的任何帮助。

首先需要使用线程安全的集合:

ObservableConcurrentCollection<string> Msgs = new();
ConcurrentQueue<string> _allFiles = new();

ObservableConcurrentCollection can be installed 通过 NuGet。 ConcurrentQueue 位于 using System.Collections.Concurrent;。 特别感谢 Theodor Zoulias for the pointing out that there is better option for ConcurentBag.

然后可以使用Parallel.ForEachTask

Parallel.ForEach 使用 Partitioner which allows to avoid creation more tasks than necessary. So it tries to run each method in parallel. So it is better to exclude asyncawait 关键字参与 Parallel.ForEach.

    async Task  ProcessMessages()
    {
        using StreamWriter logFile = new(_logFile, true);

        await Task.Run(() => {
            Parallel.ForEach(messages, msg =>
            {
                var currentCount = Interlocked.Increment(ref nCurr);
                string fileName = GenerateFileName(msg);
                Log(logFile, $"File: {fileName}");
                _allFiles.Enqueue(fileName);
                BusyContent = $"Processing message {currentCount}";
                ProcessMessage(msg);
            });
        });
    }
    
    
    int ProcessMessage(Message msg)
    {
        // The methods called here are omitted as they aren't relevant to my questions
        var message = GenerateMessagePdf(msg);
        foreach (Attachment a in msg.Attachments)
        {
            string fileName = GenerateFileName(a);                
            _allFiles.Enqueue(fileName);
            GenerateAttachmentPdf(a);
        }
        return msg.Id;
    }


    private string GenerateAttachmentPdf(Attachment a) => string.Empty;


    private string GenerateMessagePdf(Message message) => string.Empty;


    string GenerateFileName(Attachment attachment) => string.Empty;


    string GenerateFileName(Message message) => string.Empty;


    void Log(StreamWriter logFile, string msg) =>
      logFile.WriteLine(DateTime.Now.ToString("yyMMdd-HHmmss.fff") + " - " + msg);
    

另一种方式是等待所有任务。在这种情况下,不需要排除 asyncawait 关键字。

    async Task ProcessMessages()
    {
        using StreamWriter logFile = new(_logFile, true);            
        var messageTasks = messages.Select(msg =>
        {                
            var currentCount = Interlocked.Increment(ref nCurr);
            string fileName = GenerateFileName(msg);                
            Log(logFile, $"File: {fileName}");
            _allFiles.Enqueue(fileName);                
            BusyContent = $"Processing message {currentCount}";
            return ProcessMessage(msg);
        });

        var msgs = await Task.WhenAll(messageTasks);
    }