具有 IO 和计算密集型任务的 TPL Parallel.Foreach
TPL Parallel.Foreach with IO and compute intensive tasks
我在 Azure blob 存储上有数十亿个 xml 日志文件需要处理、查询和结果存储。我正在使用 Parallel.Foreach,因为文件处理是相互独立的。
Parallel.ForEach<String> (listOfFeatureFiles, file => {
//For each file that was created
string fileName = file;
string directoryPath = outputfolderPath + "/" + FeatureFolderName;
string finalFilePath = directoryPath + "/" + fileName;
DownloadContent();
XMLParseAndQueryData();
UploadResultToQueue();
DeleteLocalCopy();
});
如果它只是计算密集型,那么我可能有最大 CPU 使用率,但是在我的场景中,与其余 80% 的文件相比,20% 的文件要大得多(以 GB 为单位)。这通常会导致 4 核的使用率仅为 50% CPU。我如何优化它以实现最大 CPU 使用率,即 > 90%?
我的假设是,一旦任务正在下载大文件,就不会使用 cpu,但同时也不会创建新线程,这可能会利用处理能力。我对这个假设可能是错误的,并且会欣赏具体的 link 来否定它。
我为我的一位客户构建了一个类似的应用程序,该应用程序还处理大量 xml 大小不一的文件。下载会干扰 CPU 使用,你没办法。但是您可以通过将 BlockingCollection 与多个消费者一起使用来优化 CPU 使用,并在下载较大文件时始终继续处理较小的文件。
My assumption is that once a task is downloading big files, no cpu is used however no new thread is created in the meantime as well which could make use of processing power.
您确定您有足够的网络带宽并且下载文件实际上不是此过程的瓶颈吗?
如果你是,而缓慢添加线程实际上是让你慢下来的原因,那么快速而肮脏的解决方案是强制 ThreadPool
(Parallel.ForEach()
在内部使用) 有更多的线程。您可以通过调用 ThreadPool.SetMinThreads
.
来做到这一点
正确的解决方案是使 IO 绑定方法异步并独立于 CPU 绑定方法调度它们。为了帮助安排时间,您可以使用 TPL 数据流(EnsureOrdered
需要预发布版本):
var cpuBoundOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
EnsureOrdered = false
};
var ioBoundOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10, // TODO: tweak this value as necessary
EnsureOrdered = false
};
var downloadBlock = new TransformBlock<string, string>(async file =>
{
await DownloadContentAsync(file);
return file;
}, ioBoundOptions);
var parseBlock = new TransformBlock<string, string>(file =>
{
XMLParseAndQueryData(file);
return file;
}, cpuBoundOptions);
var uploadBlock = new TransformBlock<string, string>(async file =>
{
await UploadResultToQueue(file);
return file;
}, ioBoundOptions);
var deleteBlock = new ActionBlock<string>(file => DeleteLocalCopy(file));
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadBlock.LinkTo(parseBlock, linkOptions);
parseBlock.LinkTo(uploadBlock, linkOptions);
uploadBlock.LinkTo(deleteBlock, linkOptions);
foreach (var file in listOfFeatureFiles)
{
downloadBlock.Post(file);
}
downloadBlock.Complete();
await deleteBlock.Completion;
我在 Azure blob 存储上有数十亿个 xml 日志文件需要处理、查询和结果存储。我正在使用 Parallel.Foreach,因为文件处理是相互独立的。
Parallel.ForEach<String> (listOfFeatureFiles, file => {
//For each file that was created
string fileName = file;
string directoryPath = outputfolderPath + "/" + FeatureFolderName;
string finalFilePath = directoryPath + "/" + fileName;
DownloadContent();
XMLParseAndQueryData();
UploadResultToQueue();
DeleteLocalCopy();
});
如果它只是计算密集型,那么我可能有最大 CPU 使用率,但是在我的场景中,与其余 80% 的文件相比,20% 的文件要大得多(以 GB 为单位)。这通常会导致 4 核的使用率仅为 50% CPU。我如何优化它以实现最大 CPU 使用率,即 > 90%?
我的假设是,一旦任务正在下载大文件,就不会使用 cpu,但同时也不会创建新线程,这可能会利用处理能力。我对这个假设可能是错误的,并且会欣赏具体的 link 来否定它。
我为我的一位客户构建了一个类似的应用程序,该应用程序还处理大量 xml 大小不一的文件。下载会干扰 CPU 使用,你没办法。但是您可以通过将 BlockingCollection 与多个消费者一起使用来优化 CPU 使用,并在下载较大文件时始终继续处理较小的文件。
My assumption is that once a task is downloading big files, no cpu is used however no new thread is created in the meantime as well which could make use of processing power.
您确定您有足够的网络带宽并且下载文件实际上不是此过程的瓶颈吗?
如果你是,而缓慢添加线程实际上是让你慢下来的原因,那么快速而肮脏的解决方案是强制 ThreadPool
(Parallel.ForEach()
在内部使用) 有更多的线程。您可以通过调用 ThreadPool.SetMinThreads
.
正确的解决方案是使 IO 绑定方法异步并独立于 CPU 绑定方法调度它们。为了帮助安排时间,您可以使用 TPL 数据流(EnsureOrdered
需要预发布版本):
var cpuBoundOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
EnsureOrdered = false
};
var ioBoundOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10, // TODO: tweak this value as necessary
EnsureOrdered = false
};
var downloadBlock = new TransformBlock<string, string>(async file =>
{
await DownloadContentAsync(file);
return file;
}, ioBoundOptions);
var parseBlock = new TransformBlock<string, string>(file =>
{
XMLParseAndQueryData(file);
return file;
}, cpuBoundOptions);
var uploadBlock = new TransformBlock<string, string>(async file =>
{
await UploadResultToQueue(file);
return file;
}, ioBoundOptions);
var deleteBlock = new ActionBlock<string>(file => DeleteLocalCopy(file));
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadBlock.LinkTo(parseBlock, linkOptions);
parseBlock.LinkTo(uploadBlock, linkOptions);
uploadBlock.LinkTo(deleteBlock, linkOptions);
foreach (var file in listOfFeatureFiles)
{
downloadBlock.Post(file);
}
downloadBlock.Complete();
await deleteBlock.Completion;