Parallel.ForEachAsync 的实际最大并发任务
Actual maximum concurrent tasks of Parallel.ForEachAsync
我预计此代码需要 1 秒才能执行:
public async void Test()
{
DateTime start = DateTime.Now;
await Parallel.ForEachAsync(new int[1000], new ParallelOptions { MaxDegreeOfParallelism = 1000 }, async (i, token) =>
{
Thread.Sleep(1000);
});
Console.WriteLine("End program: " + (DateTime.Now - start).Seconds + " seconds elapsed.");
}
相反,我的电脑(i7-9700 8 核 8 线程)需要 37 秒:
End program: 37 seconds elapsed.
我正在使用 MaxDegreeOfParallelism = 1000
生成 1000 个任务...为什么它们不全部同时 运行?
我不知道 ForEachAsync
的确切实现,但我假设他们使用 Task
,而不是 Thread
。
当您使用 1000 Task
s 到 运行 1000 CPU 绑定操作时,您实际上并不是在创建 1000 Thread
s,您只是要求少数几个ThreadPool
Thread
到 运行 那些操作。那些 Thread
被 Sleep
调用阻塞,因此大多数 Task
在它们开始执行之前排队。
这正是为什么在 Task
或一般的异步上下文中调用 Thread.Sleep
是一个可怕的想法。如果您将代码编辑为异步等待而不是同步等待,则经过的时间可能会更接近一秒。
await Parallel.ForEachAsync(new int[1000], new ParallelOptions { MaxDegreeOfParallelism = 1000 }, async (i, token) =>
{
await Task.Delay(1000);
});
Parallel.ForEachAsync
method invokes the asynchronous body
delegate on ThreadPool
个线程。通常这个委托 returns 一个 ValueTask
很快,但在你的情况下不会发生这种情况,因为你的委托并不是真正的异步:
async (i, token) => Thread.Sleep(1000);
您可能会收到编译器警告,关于缺少 await
运算符的 async
方法。尽管如此,为 Parallel.ForEachAsync
方法提供混合 sync/async 工作负载是可以的。此方法旨在处理任何类型的工作负载。但如果工作负载大部分是同步的,结果可能会饱和 ThreadPool
.
当ThreadPool
已经创建了SetMinThreads
method, which by default is equal to Environment.ProcessorCount
指定的线程数时,就说饱和了,还有更多的工作需要完成。在这种情况下,ThreadPool
切换到一种每秒创建一个新线程的保守算法(从 .NET 6 开始)。此行为没有准确记录,并且可能会在未来的 .NET 版本中更改。
为了获得您想要的行为,即 运行 并行处理所有 1000 个输入的委托,您必须增加 ThreadPool
创建的线程数即时点播:
ThreadPool.SetMinThreads(1000, 1000); // At the start of the program
有人会说,这样做之后您将不再有线程池,因为线程池本来就是一个小型的可重用线程池。但是,如果您不关心语义,只想完成工作,那么无论结果如何 和操作系统级别的开销,这都是解决问题的最简单方法。
我预计此代码需要 1 秒才能执行:
public async void Test()
{
DateTime start = DateTime.Now;
await Parallel.ForEachAsync(new int[1000], new ParallelOptions { MaxDegreeOfParallelism = 1000 }, async (i, token) =>
{
Thread.Sleep(1000);
});
Console.WriteLine("End program: " + (DateTime.Now - start).Seconds + " seconds elapsed.");
}
相反,我的电脑(i7-9700 8 核 8 线程)需要 37 秒:
End program: 37 seconds elapsed.
我正在使用 MaxDegreeOfParallelism = 1000
生成 1000 个任务...为什么它们不全部同时 运行?
我不知道 ForEachAsync
的确切实现,但我假设他们使用 Task
,而不是 Thread
。
当您使用 1000 Task
s 到 运行 1000 CPU 绑定操作时,您实际上并不是在创建 1000 Thread
s,您只是要求少数几个ThreadPool
Thread
到 运行 那些操作。那些 Thread
被 Sleep
调用阻塞,因此大多数 Task
在它们开始执行之前排队。
这正是为什么在 Task
或一般的异步上下文中调用 Thread.Sleep
是一个可怕的想法。如果您将代码编辑为异步等待而不是同步等待,则经过的时间可能会更接近一秒。
await Parallel.ForEachAsync(new int[1000], new ParallelOptions { MaxDegreeOfParallelism = 1000 }, async (i, token) =>
{
await Task.Delay(1000);
});
Parallel.ForEachAsync
method invokes the asynchronous body
delegate on ThreadPool
个线程。通常这个委托 returns 一个 ValueTask
很快,但在你的情况下不会发生这种情况,因为你的委托并不是真正的异步:
async (i, token) => Thread.Sleep(1000);
您可能会收到编译器警告,关于缺少 await
运算符的 async
方法。尽管如此,为 Parallel.ForEachAsync
方法提供混合 sync/async 工作负载是可以的。此方法旨在处理任何类型的工作负载。但如果工作负载大部分是同步的,结果可能会饱和 ThreadPool
.
当ThreadPool
已经创建了SetMinThreads
method, which by default is equal to Environment.ProcessorCount
指定的线程数时,就说饱和了,还有更多的工作需要完成。在这种情况下,ThreadPool
切换到一种每秒创建一个新线程的保守算法(从 .NET 6 开始)。此行为没有准确记录,并且可能会在未来的 .NET 版本中更改。
为了获得您想要的行为,即 运行 并行处理所有 1000 个输入的委托,您必须增加 ThreadPool
创建的线程数即时点播:
ThreadPool.SetMinThreads(1000, 1000); // At the start of the program
有人会说,这样做之后您将不再有线程池,因为线程池本来就是一个小型的可重用线程池。但是,如果您不关心语义,只想完成工作,那么无论结果如何