如何创建自定义分组/哈希 PLINQ 分区程序或并行查询

How to create a custom grouped / hashed PLINQ partitioner or parallel query

我正在尝试使用 PLINQ 并行处理文件路径列表。 我必须在同一个线程中处理所有具有相同名称的文件(不包括扩展名),因为该线程可能正在重命名文件扩展名,如果同时从不同的线程完成会导致问题。

从文档看来,可以使用例如创建基于哈希的分区GroupBy(),或者我需要创建一个自定义分区。我找不到任何一个可用的例子,至少不是我所理解和可以开始工作的。

参见:
https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.partitioner
https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-implement-dynamic-partitions
https://devblogs.microsoft.com/pfxteam/partitioning-in-plinq/
https://weblogs.asp.net/dixin/parallel-linq-2-partitioning

我想咨询如何使用例如GroupBy(),或者是否有一个预先存在的哈希分区方案,我可以在其中向函数提供哈希键?

示例代码:

// All files with the same path minus extension must be processed together
var fileList = new List<string>()
{
    "/path1/file1.ext",
    "/path1/file2.ext",
    "/path2/file1.avi",
    "/path1/file1.mkv",
    "/path1/file3.avi",
    "/path1/file1.avi",
    "/path2/file3.mkv",
    "/path1/file2.mkv"
};

// Group files by path ignoring extensions
var pathDictionary = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);
fileList.ForEach(path => {
    string normalPath = Path.Combine(Path.GetDirectoryName(path), Path.GetFileNameWithoutExtension(path));
    if (pathDictionary.TryGetValue(normalPath, out var pathList))
    {
        pathList.Add(path);
    }
    else
    {
        pathDictionary.Add(normalPath, new List<string> { path });
    }
});

// HOWTO: Skip the grouping and use GroupBy() or a native hash iterator?

// Process groups in parallel
var partitioner = Partitioner.Create(pathDictionary, EnumerablePartitionerOptions.NoBuffering);
partitioner.AsParallel()
    .ForAll(keyPair =>
    {
        keyPair.Value.ForEach(fileName => {
            Console.WriteLine($"Thread: {Environment.CurrentManagedThreadId}, Path: {fileName}");
            Thread.Sleep(100);
        });
    });

我认为您已接近解决问题。 GroupBy 运算符将发出具有相同键的路径组。所以你只需要在 ForAll lambda 中做一个 foreach 循环,并一一处理具有相同键的路径:

pathList
    .GroupBy(path => Path.ChangeExtension(path, ""), StringComparer.OrdinalIgnoreCase)
    .AsParallel()
    .ForAll(g =>
    {
        Console.WriteLine($"Thread: {Environment.CurrentManagedThreadId}, Key: {g.Key}");
        foreach (string path in g)
        {
            Console.WriteLine($"Thread: {Environment.CurrentManagedThreadId}, Path: {path}");
            Thread.Sleep(100);
        }
    });

如您所见,GroupBy 可以放在 AsParallel 之前或之后。这没什么区别,因为检索每条路径的密钥并不需要 CPU 密集。所以按顺序做应该不会比并行做慢。