如何使用 Parallel.ForEach 更新共享状态
How do I update shared state with Parallel.ForEach
我有一个 WPF 应用程序,它读取 Outlook .pst 文件,提取每封邮件,并将它和任何附件保存为 .pdf 文件。完成后,它会对文件进行一些其他处理。
我目前在第一部分使用普通的旧 foreach
循环。这是代码的一个相当简化的版本...
// These two are used by the WPF UI to display progress
string BusyContent;
ObservableCollection<string> Msgs = new();
// See note lower down about the quick-and-dirty logging
string _logFile = @"C:\Path\To\LogFile.log";
// _allFiles is used to keep a record of all the files we generate. Used after the loop ends
List<string> _allFiles = new();
// nCurr is used to update BusyContent, which is bound to the UI to show progress
int nCurr = 0;
// The messages would really be extracted from the .pst file. Empty list used for simplicity
List<Message> messages = new();
async Task ProcessMessages() {
using StreamWriter logFile = new(_logFile, true);
foreach (Message msg in messages) {
nCurr++;
string fileName = GenerateFileName(msg);
// We log a lot more, but only one shown for simplicity
Log(logFile, $"File: {fileName}");
_allFiles.Add(fileName);
// Let the user know where we are up to
BusyContent = $"Processing message {nCurr}";
// Msgs is bound to a WPF grid, so we need to use Dispatcher to update
Application.Current.Dispatcher.Invoke(() => Msgs.Add(fileName));
// Finally we write out the .pdf files
await ProcessMessage(msg);
}
}
async Task ProcessMessage(Message msg) {
// The methods called here are omitted as they aren't relevant to my questions
await GenerateMessagePdf(msg);
foreach(Attachment a in msg.Attachments) {
string fileName = GenerateFileName(a);
// Note that we update _allFiles here as well as in the main loop
_allFiles.Add(fileName);
await GenerateAttachmentPdf(a);
}
}
static void Log(StreamWriter logFile, string msg) =>
logFile.WriteLine(DateTime.Now.ToString("yyMMdd-HHmmss.fff") + " - " + msg);
一切正常,但处理大型 .pst 文件可能需要相当长的时间。我想知道将其转换为使用 Parallel.ForEach
是否会加快速度。我可以看到这个方法的基本用法,但是有几个问题,主要是关于循环中使用的class级变量...
logFile
变量被传递。这会引起问题吗?这不是一个主要问题,因为这个日志记录是作为一个快速和肮脏的调试设备添加的,并且真的应该用一个合适的日志框架来代替,但我仍然想知道我正在做的是什么并行版本中的一个问题
nCurr
在循环内更新。这样安全吗,或者有更好的方法吗?
_allFiles
也在主循环内更新。我只是添加条目,而不是读取或删除条目,但这安全吗?
类似地,_allFiles
在ProcessMessage
方法内部更新。我想这个问题的答案取决于上一个。
在循环内更新 BusyContent 和调用 Application.Current.Dispatcher.Invoke 是否有问题?
感谢您提供的任何帮助。
首先需要使用线程安全的集合:
ObservableConcurrentCollection<string> Msgs = new();
ConcurrentQueue<string> _allFiles = new();
ObservableConcurrentCollection
can be installed 通过 NuGet。 ConcurrentQueue
位于 using System.Collections.Concurrent;
。
特别感谢 Theodor Zoulias for the pointing out that there is better option for ConcurentBag
.
然后可以使用Parallel.ForEach
或Task
。
Parallel.ForEach
使用 Partitioner
which allows to avoid creation more tasks than necessary. So it tries to run each method in parallel. So it is better to exclude async
和 await
关键字参与 Parallel.ForEach
.
async Task ProcessMessages()
{
using StreamWriter logFile = new(_logFile, true);
await Task.Run(() => {
Parallel.ForEach(messages, msg =>
{
var currentCount = Interlocked.Increment(ref nCurr);
string fileName = GenerateFileName(msg);
Log(logFile, $"File: {fileName}");
_allFiles.Enqueue(fileName);
BusyContent = $"Processing message {currentCount}";
ProcessMessage(msg);
});
});
}
int ProcessMessage(Message msg)
{
// The methods called here are omitted as they aren't relevant to my questions
var message = GenerateMessagePdf(msg);
foreach (Attachment a in msg.Attachments)
{
string fileName = GenerateFileName(a);
_allFiles.Enqueue(fileName);
GenerateAttachmentPdf(a);
}
return msg.Id;
}
private string GenerateAttachmentPdf(Attachment a) => string.Empty;
private string GenerateMessagePdf(Message message) => string.Empty;
string GenerateFileName(Attachment attachment) => string.Empty;
string GenerateFileName(Message message) => string.Empty;
void Log(StreamWriter logFile, string msg) =>
logFile.WriteLine(DateTime.Now.ToString("yyMMdd-HHmmss.fff") + " - " + msg);
另一种方式是等待所有任务。在这种情况下,不需要排除 async
和 await
关键字。
async Task ProcessMessages()
{
using StreamWriter logFile = new(_logFile, true);
var messageTasks = messages.Select(msg =>
{
var currentCount = Interlocked.Increment(ref nCurr);
string fileName = GenerateFileName(msg);
Log(logFile, $"File: {fileName}");
_allFiles.Enqueue(fileName);
BusyContent = $"Processing message {currentCount}";
return ProcessMessage(msg);
});
var msgs = await Task.WhenAll(messageTasks);
}
我有一个 WPF 应用程序,它读取 Outlook .pst 文件,提取每封邮件,并将它和任何附件保存为 .pdf 文件。完成后,它会对文件进行一些其他处理。
我目前在第一部分使用普通的旧 foreach
循环。这是代码的一个相当简化的版本...
// These two are used by the WPF UI to display progress
string BusyContent;
ObservableCollection<string> Msgs = new();
// See note lower down about the quick-and-dirty logging
string _logFile = @"C:\Path\To\LogFile.log";
// _allFiles is used to keep a record of all the files we generate. Used after the loop ends
List<string> _allFiles = new();
// nCurr is used to update BusyContent, which is bound to the UI to show progress
int nCurr = 0;
// The messages would really be extracted from the .pst file. Empty list used for simplicity
List<Message> messages = new();
async Task ProcessMessages() {
using StreamWriter logFile = new(_logFile, true);
foreach (Message msg in messages) {
nCurr++;
string fileName = GenerateFileName(msg);
// We log a lot more, but only one shown for simplicity
Log(logFile, $"File: {fileName}");
_allFiles.Add(fileName);
// Let the user know where we are up to
BusyContent = $"Processing message {nCurr}";
// Msgs is bound to a WPF grid, so we need to use Dispatcher to update
Application.Current.Dispatcher.Invoke(() => Msgs.Add(fileName));
// Finally we write out the .pdf files
await ProcessMessage(msg);
}
}
async Task ProcessMessage(Message msg) {
// The methods called here are omitted as they aren't relevant to my questions
await GenerateMessagePdf(msg);
foreach(Attachment a in msg.Attachments) {
string fileName = GenerateFileName(a);
// Note that we update _allFiles here as well as in the main loop
_allFiles.Add(fileName);
await GenerateAttachmentPdf(a);
}
}
static void Log(StreamWriter logFile, string msg) =>
logFile.WriteLine(DateTime.Now.ToString("yyMMdd-HHmmss.fff") + " - " + msg);
一切正常,但处理大型 .pst 文件可能需要相当长的时间。我想知道将其转换为使用 Parallel.ForEach
是否会加快速度。我可以看到这个方法的基本用法,但是有几个问题,主要是关于循环中使用的class级变量...
logFile
变量被传递。这会引起问题吗?这不是一个主要问题,因为这个日志记录是作为一个快速和肮脏的调试设备添加的,并且真的应该用一个合适的日志框架来代替,但我仍然想知道我正在做的是什么并行版本中的一个问题nCurr
在循环内更新。这样安全吗,或者有更好的方法吗?_allFiles
也在主循环内更新。我只是添加条目,而不是读取或删除条目,但这安全吗?类似地,
_allFiles
在ProcessMessage
方法内部更新。我想这个问题的答案取决于上一个。在循环内更新 BusyContent 和调用 Application.Current.Dispatcher.Invoke 是否有问题?
感谢您提供的任何帮助。
首先需要使用线程安全的集合:
ObservableConcurrentCollection<string> Msgs = new();
ConcurrentQueue<string> _allFiles = new();
ObservableConcurrentCollection
can be installed 通过 NuGet。 ConcurrentQueue
位于 using System.Collections.Concurrent;
。
特别感谢 Theodor Zoulias for the pointing out that there is better option for ConcurentBag
.
然后可以使用Parallel.ForEach
或Task
。
Parallel.ForEach
使用 Partitioner
which allows to avoid creation more tasks than necessary. So it tries to run each method in parallel. So it is better to exclude async
和 await
关键字参与 Parallel.ForEach
.
async Task ProcessMessages()
{
using StreamWriter logFile = new(_logFile, true);
await Task.Run(() => {
Parallel.ForEach(messages, msg =>
{
var currentCount = Interlocked.Increment(ref nCurr);
string fileName = GenerateFileName(msg);
Log(logFile, $"File: {fileName}");
_allFiles.Enqueue(fileName);
BusyContent = $"Processing message {currentCount}";
ProcessMessage(msg);
});
});
}
int ProcessMessage(Message msg)
{
// The methods called here are omitted as they aren't relevant to my questions
var message = GenerateMessagePdf(msg);
foreach (Attachment a in msg.Attachments)
{
string fileName = GenerateFileName(a);
_allFiles.Enqueue(fileName);
GenerateAttachmentPdf(a);
}
return msg.Id;
}
private string GenerateAttachmentPdf(Attachment a) => string.Empty;
private string GenerateMessagePdf(Message message) => string.Empty;
string GenerateFileName(Attachment attachment) => string.Empty;
string GenerateFileName(Message message) => string.Empty;
void Log(StreamWriter logFile, string msg) =>
logFile.WriteLine(DateTime.Now.ToString("yyMMdd-HHmmss.fff") + " - " + msg);
另一种方式是等待所有任务。在这种情况下,不需要排除 async
和 await
关键字。
async Task ProcessMessages()
{
using StreamWriter logFile = new(_logFile, true);
var messageTasks = messages.Select(msg =>
{
var currentCount = Interlocked.Increment(ref nCurr);
string fileName = GenerateFileName(msg);
Log(logFile, $"File: {fileName}");
_allFiles.Enqueue(fileName);
BusyContent = $"Processing message {currentCount}";
return ProcessMessage(msg);
});
var msgs = await Task.WhenAll(messageTasks);
}