Parallel.ForEach 阻塞调用方法
Parallel.ForEach blocking calling method
我遇到了 Parallel.ForEach
的问题。我编写了一个简单的应用程序,将要下载的文件名添加到队列中,然后使用 while 循环遍历队列,一次下载一个文件,然后在下载文件后,调用另一个异步方法从下载的对象创建对象memoryStream
。该方法的返回结果不等待,直接丢弃,立即开始下一次下载。如果我在对象创建中使用简单的 foreach
一切正常 - 在继续下载的同时创建对象。但是如果我想加快对象创建过程并使用 Parallel.ForEach
它会停止下载过程,直到创建对象为止。 UI 完全响应,但它不会下载下一个对象。我不明白为什么会发生这种情况 - Parallel.ForEach
在 await Task.Run()
里面,而且据我对异步编程的有限了解,这应该可以解决问题。谁能帮我理解为什么它会阻止第一种方法以及如何避免它?
这是一个小样本:
public async Task DownloadFromCloud(List<string> constructNames)
{
_downloadDataQueue = new Queue<string>();
var _gcsClient = StorageClient.Create();
foreach (var item in constructNames)
{
_downloadDataQueue.Enqueue(item);
}
while (_downloadDataQueue.Count > 0)
{
var memoryStream = new MemoryStream();
await _gcsClient.DownloadObjectAsync("companyprojects",
_downloadDataQueue.Peek(), memoryStream);
memoryStream.Position = 0;
_ = ReadFileXml(memoryStream);
_downloadDataQueue.Dequeue();
}
}
private async Task ReadFileXml(MemoryStream memoryStream)
{
var reader = new XmlReader();
var properties = reader.ReadXmlTest(memoryStream);
await Task.Run(() =>
{
var entityList = new List<Entity>();
foreach (var item in properties)
{
entityList.Add(CreateObjectsFromDownloadedProperties(item));
}
//Parallel.ForEach(properties item =>
//{
// entityList.Add(CreateObjectsFromDownloadedProperties(item));
//});
});
}
编辑
这是简化的对象创建方法:
public Entity CreateObjectsFromDownloadedProperties(RebarProperties properties)
{
var path = new LinearPath(properties.Path);
var section = new Region(properties.Region);
var sweep = section.SweepAsMesh(path, 1);
return sweep;
}
Returned result of this method is not awaited, it is discarded, so the next download starts immediately.
这也很危险。 "Fire and forget" 意味着 "I don't care when this operation completes, or if it completes. Just discard all exceptions because I don't care." 所以即发即弃 应该 在实践中极为罕见。这里不合适。
UI is fully responsive, but it just won't download the next object.
我不知道为什么它会阻止下载,但是切换到 Parallel.ForEach
肯定有问题:List<T>.Add
不是线程安全的。
private async Task ReadFileXml(MemoryStream memoryStream)
{
var reader = new XmlReader();
var properties = reader.ReadXmlTest(memoryStream);
await Task.Run(() =>
{
var entityList = new List<Entity>();
Parallel.ForEach(properties, item =>
{
var itemToAdd = CreateObjectsFromDownloadedProperties(item);
lock (entityList) { entityList.Add(itemToAdd); }
});
});
}
一个提示:如果您有结果值,PLINQ
通常比 Parallel
:
更干净
private async Task ReadFileXml(MemoryStream memoryStream)
{
var reader = new XmlReader();
var properties = reader.ReadXmlTest(memoryStream);
await Task.Run(() =>
{
var entityList = proeprties
.AsParallel()
.Select(CreateObjectsFromDownloadedProperties)
.ToList();
});
}
但是,代码仍然存在“即发即弃”问题。
为了更好地解决问题,我建议退后一步,使用更适合 "pipeline" 式处理的方法。例如,TPL 数据流:
public async Task DownloadFromCloud(List<string> constructNames)
{
// Set up the pipeline.
var gcsClient = StorageClient.Create();
var downloadBlock = new TransformBlock<string, MemoryStream>(async constructName =>
{
var memoryStream = new MemoryStream();
await gcsClient.DownloadObjectAsync("companyprojects", constructName, memoryStream);
memoryStream.Position = 0;
return memoryStream;
});
var processBlock = new TransformBlock<MemoryStream, List<Entity>>(memoryStream =>
{
var reader = new XmlReader();
var properties = reader.ReadXmlTest(memoryStream);
return proeprties
.AsParallel()
.Select(CreateObjectsFromDownloadedProperties)
.ToList();
});
var resultsBlock = new ActionBlock<List<Entity>>(entities => { /* TODO */ });
downloadBlock.LinkTo(processBlock, new DataflowLinkOptions { PropagateCompletion = true });
processBlock.LinkTo(resultsBlock, new DataflowLinkOptions { PropagateCompletion = true });
// Push data into the pipeline.
foreach (var constructName in constructNames)
await downloadBlock.SendAsync(constructName);
downlodBlock.Complete();
// Wait for pipeline to complete.
await resultsBlock.Completion;
}
我遇到了 Parallel.ForEach
的问题。我编写了一个简单的应用程序,将要下载的文件名添加到队列中,然后使用 while 循环遍历队列,一次下载一个文件,然后在下载文件后,调用另一个异步方法从下载的对象创建对象memoryStream
。该方法的返回结果不等待,直接丢弃,立即开始下一次下载。如果我在对象创建中使用简单的 foreach
一切正常 - 在继续下载的同时创建对象。但是如果我想加快对象创建过程并使用 Parallel.ForEach
它会停止下载过程,直到创建对象为止。 UI 完全响应,但它不会下载下一个对象。我不明白为什么会发生这种情况 - Parallel.ForEach
在 await Task.Run()
里面,而且据我对异步编程的有限了解,这应该可以解决问题。谁能帮我理解为什么它会阻止第一种方法以及如何避免它?
这是一个小样本:
public async Task DownloadFromCloud(List<string> constructNames)
{
_downloadDataQueue = new Queue<string>();
var _gcsClient = StorageClient.Create();
foreach (var item in constructNames)
{
_downloadDataQueue.Enqueue(item);
}
while (_downloadDataQueue.Count > 0)
{
var memoryStream = new MemoryStream();
await _gcsClient.DownloadObjectAsync("companyprojects",
_downloadDataQueue.Peek(), memoryStream);
memoryStream.Position = 0;
_ = ReadFileXml(memoryStream);
_downloadDataQueue.Dequeue();
}
}
private async Task ReadFileXml(MemoryStream memoryStream)
{
var reader = new XmlReader();
var properties = reader.ReadXmlTest(memoryStream);
await Task.Run(() =>
{
var entityList = new List<Entity>();
foreach (var item in properties)
{
entityList.Add(CreateObjectsFromDownloadedProperties(item));
}
//Parallel.ForEach(properties item =>
//{
// entityList.Add(CreateObjectsFromDownloadedProperties(item));
//});
});
}
编辑
这是简化的对象创建方法:
public Entity CreateObjectsFromDownloadedProperties(RebarProperties properties)
{
var path = new LinearPath(properties.Path);
var section = new Region(properties.Region);
var sweep = section.SweepAsMesh(path, 1);
return sweep;
}
Returned result of this method is not awaited, it is discarded, so the next download starts immediately.
这也很危险。 "Fire and forget" 意味着 "I don't care when this operation completes, or if it completes. Just discard all exceptions because I don't care." 所以即发即弃 应该 在实践中极为罕见。这里不合适。
UI is fully responsive, but it just won't download the next object.
我不知道为什么它会阻止下载,但是切换到 Parallel.ForEach
肯定有问题:List<T>.Add
不是线程安全的。
private async Task ReadFileXml(MemoryStream memoryStream)
{
var reader = new XmlReader();
var properties = reader.ReadXmlTest(memoryStream);
await Task.Run(() =>
{
var entityList = new List<Entity>();
Parallel.ForEach(properties, item =>
{
var itemToAdd = CreateObjectsFromDownloadedProperties(item);
lock (entityList) { entityList.Add(itemToAdd); }
});
});
}
一个提示:如果您有结果值,PLINQ
通常比 Parallel
:
private async Task ReadFileXml(MemoryStream memoryStream)
{
var reader = new XmlReader();
var properties = reader.ReadXmlTest(memoryStream);
await Task.Run(() =>
{
var entityList = proeprties
.AsParallel()
.Select(CreateObjectsFromDownloadedProperties)
.ToList();
});
}
但是,代码仍然存在“即发即弃”问题。
为了更好地解决问题,我建议退后一步,使用更适合 "pipeline" 式处理的方法。例如,TPL 数据流:
public async Task DownloadFromCloud(List<string> constructNames)
{
// Set up the pipeline.
var gcsClient = StorageClient.Create();
var downloadBlock = new TransformBlock<string, MemoryStream>(async constructName =>
{
var memoryStream = new MemoryStream();
await gcsClient.DownloadObjectAsync("companyprojects", constructName, memoryStream);
memoryStream.Position = 0;
return memoryStream;
});
var processBlock = new TransformBlock<MemoryStream, List<Entity>>(memoryStream =>
{
var reader = new XmlReader();
var properties = reader.ReadXmlTest(memoryStream);
return proeprties
.AsParallel()
.Select(CreateObjectsFromDownloadedProperties)
.ToList();
});
var resultsBlock = new ActionBlock<List<Entity>>(entities => { /* TODO */ });
downloadBlock.LinkTo(processBlock, new DataflowLinkOptions { PropagateCompletion = true });
processBlock.LinkTo(resultsBlock, new DataflowLinkOptions { PropagateCompletion = true });
// Push data into the pipeline.
foreach (var constructName in constructNames)
await downloadBlock.SendAsync(constructName);
downlodBlock.Complete();
// Wait for pipeline to complete.
await resultsBlock.Completion;
}