ParallelEnumerable.WithDegreeOfParallelism() 不限制任务?

ParallelEnumerable.WithDegreeOfParallelism() not restricting tasks?

我正在尝试将 AsParallel() 与 async-await 结合使用,让应用程序并行处理一系列任务,但由于任务启动的外部进程占用了大量内存,因此并发程度有限(因此想要等待该过程完成,然后再继续系列中的下一项)。我看到的关于函数 ParallelEnumerable.WithDegreeOfSeparation 的大多数文献都表明使用它会在任何时候设置并发任务的最大限制,但我自己的测试似乎表明它完全跳过了限制。

提供一个粗略的例子(WithDegreeOrParallelism() 故意设置为 1 以演示问题):

public class Example
{
    private async Task HeavyTask(int i)
    {
        await Task.Delay(10 * 1000);
    }

    public async Task Run()
    {
        int n = 0;

        await Task.WhenAll(Enumerable.Range(0, 100)
                                     .AsParallel()
                                     .WithDegreeOfParallelism(1)
                                     .Select(async i =>
                                     {
                                         Interlocked.Increment(ref n);
                                         Console.WriteLine("[+] " + n);

                                         await HeavyTask(i);

                                         Interlocked.Decrement(ref n);
                                         Console.WriteLine("[-] " + n);
                                     }));
    }
}

class Program
{
    public static void Main(string[] args)
    {
        Task.Run(async () =>
        {
            await new Example().Run();
        }).Wait();
    }
}

据我了解,上面的代码旨在产生如下输出:

[+] 1
[-] 0
[+] 1
[-] 0
...

而是 returns:

[+] 1
[+] 2
[+] 3
[+] 4
...

建议它启动列表中的所有任务,然后等待任务return。

是否有任何特别明显(或不明显)的地方我做错了,这使得 WithDegreeOfParallelism() 似乎被忽略了?

更新

抱歉,在测试您的代码后,我明白您现在看到的是什么

async i =>

Async lambda 只是 async void,基本上是未观察到的任务 运行 不管 Thread.CurrentThread.ManagedThreadId); 会清楚地告诉你它正在消耗尽可能多的线程

另请注意,如果您的繁重任务受 IO 限制,则跳过 PLINQParallel 在 TPL 数据流中使用 asyncawait ActionBlock 因为它会给你两全其美

例如

public static async Task DoWorkLoads(List<Something> results)
{
   var options = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 2
                     };

   var block = new ActionBlock<int>(MyMethodAsync, options);

   foreach (var item in list)
      block.Post(item );

   block.Complete();
   await block.Completion;

}

...

public async Task MyMethodAsync(int i)
{       
    await Task.Delay(10 * 1000);
}

原创

这是一个非常微妙且非常普遍的误解,但是我认为文档似乎是错误的

Sets the degree of parallelism to use in a query. Degree of parallelism is the maximum number of concurrently executing tasks that will be used to process the query.

虽然如果我们更深入地研究这一点,我们会得到更好的理解,但也有关于此的 github 对话。

ParallelOptions.MaxDegreeOfParallelism vs PLINQ’s WithDegreeOfParallelism

PLINQ is different. Some important Standard Query Operators in PLINQ require communication between the threads involved in the processing of the query, including some that rely on a Barrier to enable threads to operate in lock-step. The PLINQ design requires that a specific number of threads be actively involved for the query to make any progress. Thus when you specify a DegreeOfParallelism for PLINQ, you’re specifying the actual number of threads that will be involved, rather than just a maximum.