如何处理任务并行库中的目录文件?
How to process directory files in Task parallel library?
我有一个场景,我必须根据处理器内核并行处理多个文件(例如 30 个)。我必须根据处理器内核的数量将这些文件分配给单独的任务。我不知道如何为每个要处理的任务设置开始和结束限制。例如,每个任务都知道它必须处理多少个文件。
private void ProcessFiles(object e)
{
try
{
var diectoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value;
var FilePaths = Directory.EnumerateFiles(diectoryPath);
int numCores = System.Environment.ProcessorCount;
int NoOfTasks = FilePaths.Count() > numCores ? (FilePaths.Count()/ numCores) : FilePaths.Count();
for (int i = 0; i < NoOfTasks; i++)
{
Task.Factory.StartNew(
() =>
{
int startIndex = 0, endIndex = 0;
for (int Count = startIndex; Count < endIndex; Count++)
{
this.ProcessFile(FilePaths);
}
});
}
}
catch (Exception ex)
{
throw;
}
}
基于我对 TPL 的有限理解,我认为您的代码可以重写为:
private void ProcessFiles(object e)
{
try
{
var diectoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value;
var FilePaths = Directory.EnumerateFiles(diectoryPath);
Parallel.ForEach(FilePaths, path => this.ProcessFile(path));
}
catch (Exception ex)
{
throw;
}
}
问候
对于像您这样的问题,C# 中提供了并发数据结构。您想使用 BlockingCollection 并将所有文件名存储在其中。
您使用机器上可用的核心数来计算任务数的想法不是很好。为什么?因为 ProcessFile()
可能每个文件花费的时间不同。因此,最好将任务数设置为您拥有的内核数。然后,让每个任务从BlockingCollection中一个一个读取文件名,然后处理文件,直到BlockingCollection为空。
try
{
var directoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value;
var filePaths = CreateBlockingCollection(directoryPath);
//Start the same #tasks as the #cores (Assuming that #files > #cores)
int taskCount = System.Environment.ProcessorCount;
for (int i = 0; i < taskCount; i++)
{
Task.Factory.StartNew(
() =>
{
string fileName;
while (!filePaths.IsCompleted)
{
if (!filePaths.TryTake(out fileName)) continue;
this.ProcessFile(fileName);
}
});
}
}
而 CreateBlockingCollection()
将如下所示:
private BlockingCollection<string> CreateBlockingCollection(string path)
{
var allFiles = Directory.EnumerateFiles(path);
var filePaths = new BlockingCollection<string>(allFiles.Count);
foreach(var fileName in allFiles)
{
filePaths.Add(fileName);
}
filePaths.CompleteAdding();
return filePaths;
}
您现在必须修改 ProcessFile()
以接收文件名,而不是获取所有文件路径并处理其块。
这种方法的优点是,现在您的 CPU 订阅不会过多或不足,负载也会均衡。
我自己没有 运行 代码,所以我的代码中可能存在一些语法错误。如果您遇到任何错误,请随时纠正错误。
我有一个场景,我必须根据处理器内核并行处理多个文件(例如 30 个)。我必须根据处理器内核的数量将这些文件分配给单独的任务。我不知道如何为每个要处理的任务设置开始和结束限制。例如,每个任务都知道它必须处理多少个文件。
private void ProcessFiles(object e)
{
try
{
var diectoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value;
var FilePaths = Directory.EnumerateFiles(diectoryPath);
int numCores = System.Environment.ProcessorCount;
int NoOfTasks = FilePaths.Count() > numCores ? (FilePaths.Count()/ numCores) : FilePaths.Count();
for (int i = 0; i < NoOfTasks; i++)
{
Task.Factory.StartNew(
() =>
{
int startIndex = 0, endIndex = 0;
for (int Count = startIndex; Count < endIndex; Count++)
{
this.ProcessFile(FilePaths);
}
});
}
}
catch (Exception ex)
{
throw;
}
}
基于我对 TPL 的有限理解,我认为您的代码可以重写为:
private void ProcessFiles(object e)
{
try
{
var diectoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value;
var FilePaths = Directory.EnumerateFiles(diectoryPath);
Parallel.ForEach(FilePaths, path => this.ProcessFile(path));
}
catch (Exception ex)
{
throw;
}
}
问候
对于像您这样的问题,C# 中提供了并发数据结构。您想使用 BlockingCollection 并将所有文件名存储在其中。
您使用机器上可用的核心数来计算任务数的想法不是很好。为什么?因为 ProcessFile()
可能每个文件花费的时间不同。因此,最好将任务数设置为您拥有的内核数。然后,让每个任务从BlockingCollection中一个一个读取文件名,然后处理文件,直到BlockingCollection为空。
try
{
var directoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value;
var filePaths = CreateBlockingCollection(directoryPath);
//Start the same #tasks as the #cores (Assuming that #files > #cores)
int taskCount = System.Environment.ProcessorCount;
for (int i = 0; i < taskCount; i++)
{
Task.Factory.StartNew(
() =>
{
string fileName;
while (!filePaths.IsCompleted)
{
if (!filePaths.TryTake(out fileName)) continue;
this.ProcessFile(fileName);
}
});
}
}
而 CreateBlockingCollection()
将如下所示:
private BlockingCollection<string> CreateBlockingCollection(string path)
{
var allFiles = Directory.EnumerateFiles(path);
var filePaths = new BlockingCollection<string>(allFiles.Count);
foreach(var fileName in allFiles)
{
filePaths.Add(fileName);
}
filePaths.CompleteAdding();
return filePaths;
}
您现在必须修改 ProcessFile()
以接收文件名,而不是获取所有文件路径并处理其块。
这种方法的优点是,现在您的 CPU 订阅不会过多或不足,负载也会均衡。
我自己没有 运行 代码,所以我的代码中可能存在一些语法错误。如果您遇到任何错误,请随时纠正错误。