如何创建自定义分组/哈希 PLINQ 分区程序或并行查询
How to create a custom grouped / hashed PLINQ partitioner or parallel query
我正在尝试使用 PLINQ 并行处理文件路径列表。
我必须在同一个线程中处理所有具有相同名称的文件(不包括扩展名),因为该线程可能正在重命名文件扩展名,如果同时从不同的线程完成会导致问题。
从文档看来,可以使用例如创建基于哈希的分区GroupBy(),或者我需要创建一个自定义分区。我找不到任何一个可用的例子,至少不是我所理解和可以开始工作的。
参见:
https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.partitioner
https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-implement-dynamic-partitions
https://devblogs.microsoft.com/pfxteam/partitioning-in-plinq/
https://weblogs.asp.net/dixin/parallel-linq-2-partitioning
我想咨询如何使用例如GroupBy(),或者是否有一个预先存在的哈希分区方案,我可以在其中向函数提供哈希键?
示例代码:
// All files with the same path minus extension must be processed together
var fileList = new List<string>()
{
"/path1/file1.ext",
"/path1/file2.ext",
"/path2/file1.avi",
"/path1/file1.mkv",
"/path1/file3.avi",
"/path1/file1.avi",
"/path2/file3.mkv",
"/path1/file2.mkv"
};
// Group files by path ignoring extensions
var pathDictionary = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);
fileList.ForEach(path => {
string normalPath = Path.Combine(Path.GetDirectoryName(path), Path.GetFileNameWithoutExtension(path));
if (pathDictionary.TryGetValue(normalPath, out var pathList))
{
pathList.Add(path);
}
else
{
pathDictionary.Add(normalPath, new List<string> { path });
}
});
// HOWTO: Skip the grouping and use GroupBy() or a native hash iterator?
// Process groups in parallel
var partitioner = Partitioner.Create(pathDictionary, EnumerablePartitionerOptions.NoBuffering);
partitioner.AsParallel()
.ForAll(keyPair =>
{
keyPair.Value.ForEach(fileName => {
Console.WriteLine($"Thread: {Environment.CurrentManagedThreadId}, Path: {fileName}");
Thread.Sleep(100);
});
});
我认为您已接近解决问题。 GroupBy
运算符将发出具有相同键的路径组。所以你只需要在 ForAll
lambda 中做一个 foreach
循环,并一一处理具有相同键的路径:
pathList
.GroupBy(path => Path.ChangeExtension(path, ""), StringComparer.OrdinalIgnoreCase)
.AsParallel()
.ForAll(g =>
{
Console.WriteLine($"Thread: {Environment.CurrentManagedThreadId}, Key: {g.Key}");
foreach (string path in g)
{
Console.WriteLine($"Thread: {Environment.CurrentManagedThreadId}, Path: {path}");
Thread.Sleep(100);
}
});
如您所见,GroupBy
可以放在 AsParallel
之前或之后。这没什么区别,因为检索每条路径的密钥并不需要 CPU 密集。所以按顺序做应该不会比并行做慢。
我正在尝试使用 PLINQ 并行处理文件路径列表。 我必须在同一个线程中处理所有具有相同名称的文件(不包括扩展名),因为该线程可能正在重命名文件扩展名,如果同时从不同的线程完成会导致问题。
从文档看来,可以使用例如创建基于哈希的分区GroupBy(),或者我需要创建一个自定义分区。我找不到任何一个可用的例子,至少不是我所理解和可以开始工作的。
参见:
https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.partitioner
https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-implement-dynamic-partitions
https://devblogs.microsoft.com/pfxteam/partitioning-in-plinq/
https://weblogs.asp.net/dixin/parallel-linq-2-partitioning
我想咨询如何使用例如GroupBy(),或者是否有一个预先存在的哈希分区方案,我可以在其中向函数提供哈希键?
示例代码:
// All files with the same path minus extension must be processed together
var fileList = new List<string>()
{
"/path1/file1.ext",
"/path1/file2.ext",
"/path2/file1.avi",
"/path1/file1.mkv",
"/path1/file3.avi",
"/path1/file1.avi",
"/path2/file3.mkv",
"/path1/file2.mkv"
};
// Group files by path ignoring extensions
var pathDictionary = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);
fileList.ForEach(path => {
string normalPath = Path.Combine(Path.GetDirectoryName(path), Path.GetFileNameWithoutExtension(path));
if (pathDictionary.TryGetValue(normalPath, out var pathList))
{
pathList.Add(path);
}
else
{
pathDictionary.Add(normalPath, new List<string> { path });
}
});
// HOWTO: Skip the grouping and use GroupBy() or a native hash iterator?
// Process groups in parallel
var partitioner = Partitioner.Create(pathDictionary, EnumerablePartitionerOptions.NoBuffering);
partitioner.AsParallel()
.ForAll(keyPair =>
{
keyPair.Value.ForEach(fileName => {
Console.WriteLine($"Thread: {Environment.CurrentManagedThreadId}, Path: {fileName}");
Thread.Sleep(100);
});
});
我认为您已接近解决问题。 GroupBy
运算符将发出具有相同键的路径组。所以你只需要在 ForAll
lambda 中做一个 foreach
循环,并一一处理具有相同键的路径:
pathList
.GroupBy(path => Path.ChangeExtension(path, ""), StringComparer.OrdinalIgnoreCase)
.AsParallel()
.ForAll(g =>
{
Console.WriteLine($"Thread: {Environment.CurrentManagedThreadId}, Key: {g.Key}");
foreach (string path in g)
{
Console.WriteLine($"Thread: {Environment.CurrentManagedThreadId}, Path: {path}");
Thread.Sleep(100);
}
});
如您所见,GroupBy
可以放在 AsParallel
之前或之后。这没什么区别,因为检索每条路径的密钥并不需要 CPU 密集。所以按顺序做应该不会比并行做慢。