Parallel.ForEachAsync 的实际最大并发任务

Actual maximum concurrent tasks of Parallel.ForEachAsync

我预计此代码需要 1 秒才能执行:

public async void Test()
{
    DateTime start = DateTime.Now;
    await Parallel.ForEachAsync(new int[1000], new ParallelOptions { MaxDegreeOfParallelism = 1000 }, async (i, token) =>
    {
        Thread.Sleep(1000);
    });
    Console.WriteLine("End program: " + (DateTime.Now - start).Seconds + " seconds elapsed.");
}

相反,我的电脑(i7-9700 8 核 8 线程)需要 37 秒:

End program: 37 seconds elapsed.

我正在使用 MaxDegreeOfParallelism = 1000 生成 1000 个任务...为什么它们不全部同时 运行?

我不知道 ForEachAsync 的确切实现,但我假设他们使用 Task,而不是 Thread

当您使用 1000 Tasks 到 运行 1000 CPU 绑定操作时,您实际上并不是在创建 1000 Threads,您只是要求少数几个ThreadPool Thread 到 运行 那些操作。那些 ThreadSleep 调用阻塞,因此大多数 Task 在它们开始执行之前排队。

这正是为什么在 Task 或一般的异步上下文中调用 Thread.Sleep 是一个可怕的想法。如果您将代码编辑为异步等待而不是同步等待,则经过的时间可能会更接近一秒。

await Parallel.ForEachAsync(new int[1000], new ParallelOptions { MaxDegreeOfParallelism = 1000 }, async (i, token) =>
{
    await Task.Delay(1000);
});

Parallel.ForEachAsync method invokes the asynchronous body delegate on ThreadPool 个线程。通常这个委托 returns 一个 ValueTask 很快,但在你的情况下不会发生这种情况,因为你的委托并不是真正的异步:

async (i, token) => Thread.Sleep(1000);

您可能会收到编译器警告,关于缺少 await 运算符的 async 方法。尽管如此,为 Parallel.ForEachAsync 方法提供混合 sync/async 工作负载是可以的。此方法旨在处理任何类型的工作负载。但如果工作负载大部分是同步的,结果可能会饱和 ThreadPool.

ThreadPool已经创建了SetMinThreads method, which by default is equal to Environment.ProcessorCount指定的线程数时,就说饱和了,还有更多的工作需要完成。在这种情况下,ThreadPool 切换到一种每秒创建一个新线程的保守算法(从 .NET 6 开始)。此行为没有准确记录,并且可能会在未来的 .NET 版本中更改。

为了获得您想要的行为,即 运行 并行处理所有 1000 个输入的委托,您必须增加 ThreadPool 创建的线程数即时点播:

ThreadPool.SetMinThreads(1000, 1000); // At the start of the program

有人会说,这样做之后您将不再有线程池,因为线程池本来就是一个小型的可重用线程池。但是,如果您不关心语义,只想完成工作,那么无论结果如何 和操作系统级别的开销,这都是解决问题的最简单方法。