为什么结合 Task.Run 和 plinq 这么慢?

Why is combining Task.Run and plinq so slow?

我发现 Task.Run 与 plinq 结合起来非常慢,所以我做了一个简单的实验:

int scale = 32;

Enumerable.Range( 0, scale ).AsParallel().ForAll( i => {
    Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
    {
        for ( int k = 0; k < scale; k++ ) { }
    } );
} );

plinq inside plinq 运行良好,在 14 毫秒内完成

int scale = 32;

Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( async () =>
{
    Task[] _tasks = Enumerable.Range( 0, scale ).Select( j => Task.Run( () =>
    {
        for ( int k = 0; k < scale; k++ ) { }
    } ) ).ToArray();
    await Task.WhenAll( _tasks );
} ) ).ToArray();

await Task.WhenAll( tasks );

Task 里面的 Task 也会在 14 毫秒后结束,但是如果我把里面的 Task.Run 换成 plinq 这样的:

int scale = 32;

Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( () =>
{
    Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
    {
        for ( int k = 0; k < scale; k++ ) { }
    } );
} ) ).ToArray();

await Task.WhenAll( tasks );

执行需要 29 秒。如果 scale 变量更大,情况会变得更糟。

谁能解释一下这个案例中发生了什么?


编辑:

我又做了一个实验:

static async Task Main( string[] args )
{
    Stopwatch stopwatch = Stopwatch.StartNew();

    int scale = 8;

    Task[] tasks = Enumerable.Range( 0, scale ).Select( id => Run( scale, id ) ).ToArray();

    await Task.WhenAll( tasks );

    Console.WriteLine( $"ElapsedTime={stopwatch.ElapsedMilliseconds}ms" );
}

static Task Run( int scale, int id )
{
    return Task.Run( () =>
    {
        Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
        {
            for ( int k = 0; k < scale; k++ )
            {

            }

            Console.WriteLine( $"[{DateTimeOffset.Now.ToUnixTimeMilliseconds()}]Task {id} for loop {j} end" );
        } );
    } );
}

这是结果的一部分:

[1557475215796]Task 0 for loop 6 end
[1557475215796]Task 0 for loop 7 end
[1557475216776]Task 4 for loop 0 end
[1557475216776]Task 4 for loop 1 end
[1557475216777]Task 4 for loop 2 end
[1557475216777]Task 4 for loop 3 end
[1557475216778]Task 4 for loop 4 end
[1557475216778]Task 4 for loop 5 end
[1557475216779]Task 4 for loop 6 end
[1557475216780]Task 4 for loop 7 end
[1557475217774]Task 5 for loop 0 end
[1557475217774]Task 5 for loop 1 end
[1557475217775]Task 5 for loop 2 end

查看每个任务之间的时间戳,你会发现每当移动到下一个任务时,都会有一个神秘的 1000 毫秒延迟。我想 plinq 或任务中有一种机制会在某些情况下暂停一秒钟,这会显着减慢进程。


感谢@StephenCleary的解释,现在我明白了延迟来自线程的创建。我再次调整我的实验,发现 ForAll 方法将阻止任务,直到完成不同任务中的所有其他 ForAll 方法。

static Task Run( int scale, int id )
{
    return Task.Run( () =>
    {
        Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
        {
            for ( int k = 0; k < scale; k++ )
            {

            }

            Console.WriteLine( $"[{DateTimeOffset.Now.ToUnixTimeMilliseconds()}]Task {id} for loop {j} end, thread count = {Process.GetCurrentProcess().Threads.Count}" );
        } );
        Console.WriteLine( $"[{DateTimeOffset.Now.ToUnixTimeMilliseconds()}]Task {id} finished" );
    } );
}

结果是:

[1557478553656]Task 6 for loop 6 end, thread count = 19
[1557478553657]Task 6 for loop 7 end, thread count = 19
[1557478554645]Task 7 for loop 0 end, thread count = 20
[1557478554647]Task 7 for loop 1 end, thread count = 20
[1557478554649]Task 7 for loop 2 end, thread count = 20
[1557478554651]Task 7 for loop 3 end, thread count = 20
[1557478554653]Task 7 for loop 4 end, thread count = 20
[1557478554655]Task 7 for loop 5 end, thread count = 20
[1557478554657]Task 7 for loop 6 end, thread count = 20
[1557478554659]Task 7 for loop 7 end, thread count = 20
[1557478555644]Task 1 finished
[1557478555644]Task 0 finished
[1557478555644]Task 3 finished
[1557478555644]Task 2 finished
[1557478555644]Task 4 finished
[1557478555644]Task 6 finished
[1557478555644]Task 5 finished
[1557478555644]Task 7 finished

我希望 ForAll 方法应该立即 return。为什么会阻塞任务和线程?

您的代码中显然存在问题,让我们回顾一下各种代码片段,尤其是使用 Task 的代码片段,因为 PLinq 中的 PLinq 非常简单,几乎使用了所有代码尽可能快地处理可能的线程/内核,因为处理在内存中并且速度很快,所以不会有太多的上下文转换。事实上 PLinq 本身将管理/控制 并行调用 的数量,不像 Task.Run 相对独立。

Snippet 1

int scale = 32;

Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( async () =>
{
    Task[] _tasks = Enumerable.Range( 0, scale ).Select( j => Task.Run( () =>
    {
        for ( int k = 0; k < scale; k++ ) { }
    } ) ).ToArray();
    await Task.WhenAll( _tasks );
} ) ).ToArray();

await Task.WhenAll( tasks );
  • 这里你的完整处理是在内存中,每个外部任务,异步调度内部循环,而任务本身不会阻塞线程等待内部任务完成,所以外部Task.Run,会被通知当内部 Task.Run 完成时异步

现在在较慢的代码中发生了什么,让我们回顾一下

Snippet 2

int scale = 32;

Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( () =>
{
    Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
    {
        for ( int k = 0; k < scale; k++ ) { }
    } );
} ) ).ToArray();

await Task.WhenAll( tasks );
  • 这里每个 Task.Run 不会异步地将请求移交给内部 PLinq 调用,会发生什么情况是 Task.Run 调用的线程将被阻止以完成内部 PLinq,这是主要的问题的根源在这里,从而导致激烈的争用。

如上所述,Task.Run 调用 PLinqPLinq 调用 PLinq 之间存在实质性差异,因此关键在于了解这些不同之处API 单独工作,将它们组合起来一起工作有什么影响,如您的代码所期望的那样。