为什么结合 Task.Run 和 plinq 这么慢?
Why is combining Task.Run and plinq so slow?
我发现 Task.Run 与 plinq 结合起来非常慢,所以我做了一个简单的实验:
int scale = 32;
Enumerable.Range( 0, scale ).AsParallel().ForAll( i => {
Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
{
for ( int k = 0; k < scale; k++ ) { }
} );
} );
plinq inside plinq 运行良好,在 14 毫秒内完成
int scale = 32;
Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( async () =>
{
Task[] _tasks = Enumerable.Range( 0, scale ).Select( j => Task.Run( () =>
{
for ( int k = 0; k < scale; k++ ) { }
} ) ).ToArray();
await Task.WhenAll( _tasks );
} ) ).ToArray();
await Task.WhenAll( tasks );
Task 里面的 Task 也会在 14 毫秒后结束,但是如果我把里面的 Task.Run 换成 plinq 这样的:
int scale = 32;
Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( () =>
{
Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
{
for ( int k = 0; k < scale; k++ ) { }
} );
} ) ).ToArray();
await Task.WhenAll( tasks );
执行需要 29 秒。如果 scale
变量更大,情况会变得更糟。
谁能解释一下这个案例中发生了什么?
编辑:
我又做了一个实验:
static async Task Main( string[] args )
{
Stopwatch stopwatch = Stopwatch.StartNew();
int scale = 8;
Task[] tasks = Enumerable.Range( 0, scale ).Select( id => Run( scale, id ) ).ToArray();
await Task.WhenAll( tasks );
Console.WriteLine( $"ElapsedTime={stopwatch.ElapsedMilliseconds}ms" );
}
static Task Run( int scale, int id )
{
return Task.Run( () =>
{
Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
{
for ( int k = 0; k < scale; k++ )
{
}
Console.WriteLine( $"[{DateTimeOffset.Now.ToUnixTimeMilliseconds()}]Task {id} for loop {j} end" );
} );
} );
}
这是结果的一部分:
[1557475215796]Task 0 for loop 6 end
[1557475215796]Task 0 for loop 7 end
[1557475216776]Task 4 for loop 0 end
[1557475216776]Task 4 for loop 1 end
[1557475216777]Task 4 for loop 2 end
[1557475216777]Task 4 for loop 3 end
[1557475216778]Task 4 for loop 4 end
[1557475216778]Task 4 for loop 5 end
[1557475216779]Task 4 for loop 6 end
[1557475216780]Task 4 for loop 7 end
[1557475217774]Task 5 for loop 0 end
[1557475217774]Task 5 for loop 1 end
[1557475217775]Task 5 for loop 2 end
查看每个任务之间的时间戳,你会发现每当移动到下一个任务时,都会有一个神秘的 1000 毫秒延迟。我想 plinq 或任务中有一种机制会在某些情况下暂停一秒钟,这会显着减慢进程。
感谢@StephenCleary的解释,现在我明白了延迟来自线程的创建。我再次调整我的实验,发现 ForAll
方法将阻止任务,直到完成不同任务中的所有其他 ForAll
方法。
static Task Run( int scale, int id )
{
return Task.Run( () =>
{
Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
{
for ( int k = 0; k < scale; k++ )
{
}
Console.WriteLine( $"[{DateTimeOffset.Now.ToUnixTimeMilliseconds()}]Task {id} for loop {j} end, thread count = {Process.GetCurrentProcess().Threads.Count}" );
} );
Console.WriteLine( $"[{DateTimeOffset.Now.ToUnixTimeMilliseconds()}]Task {id} finished" );
} );
}
结果是:
[1557478553656]Task 6 for loop 6 end, thread count = 19
[1557478553657]Task 6 for loop 7 end, thread count = 19
[1557478554645]Task 7 for loop 0 end, thread count = 20
[1557478554647]Task 7 for loop 1 end, thread count = 20
[1557478554649]Task 7 for loop 2 end, thread count = 20
[1557478554651]Task 7 for loop 3 end, thread count = 20
[1557478554653]Task 7 for loop 4 end, thread count = 20
[1557478554655]Task 7 for loop 5 end, thread count = 20
[1557478554657]Task 7 for loop 6 end, thread count = 20
[1557478554659]Task 7 for loop 7 end, thread count = 20
[1557478555644]Task 1 finished
[1557478555644]Task 0 finished
[1557478555644]Task 3 finished
[1557478555644]Task 2 finished
[1557478555644]Task 4 finished
[1557478555644]Task 6 finished
[1557478555644]Task 5 finished
[1557478555644]Task 7 finished
我希望 ForAll
方法应该立即 return。为什么会阻塞任务和线程?
您的代码中显然存在问题,让我们回顾一下各种代码片段,尤其是使用 Task
的代码片段,因为 PLinq
中的 PLinq
非常简单,几乎使用了所有代码尽可能快地处理可能的线程/内核,因为处理在内存中并且速度很快,所以不会有太多的上下文转换。事实上 PLinq
本身将管理/控制 并行调用 的数量,不像 Task.Run
相对独立。
Snippet 1
int scale = 32;
Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( async () =>
{
Task[] _tasks = Enumerable.Range( 0, scale ).Select( j => Task.Run( () =>
{
for ( int k = 0; k < scale; k++ ) { }
} ) ).ToArray();
await Task.WhenAll( _tasks );
} ) ).ToArray();
await Task.WhenAll( tasks );
- 这里你的完整处理是在内存中,每个外部任务,异步调度内部循环,而任务本身不会阻塞线程等待内部任务完成,所以外部
Task.Run
,会被通知当内部 Task.Run
完成时异步
现在在较慢的代码中发生了什么,让我们回顾一下
Snippet 2
int scale = 32;
Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( () =>
{
Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
{
for ( int k = 0; k < scale; k++ ) { }
} );
} ) ).ToArray();
await Task.WhenAll( tasks );
- 这里每个
Task.Run
不会异步地将请求移交给内部 PLinq 调用,会发生什么情况是 Task.Run
调用的线程将被阻止以完成内部 PLinq,这是主要的问题的根源在这里,从而导致激烈的争用。
如上所述,Task.Run
调用 PLinq
与 PLinq
调用 PLinq
之间存在实质性差异,因此关键在于了解这些不同之处API 单独工作,将它们组合起来一起工作有什么影响,如您的代码所期望的那样。
我发现 Task.Run 与 plinq 结合起来非常慢,所以我做了一个简单的实验:
int scale = 32;
Enumerable.Range( 0, scale ).AsParallel().ForAll( i => {
Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
{
for ( int k = 0; k < scale; k++ ) { }
} );
} );
plinq inside plinq 运行良好,在 14 毫秒内完成
int scale = 32;
Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( async () =>
{
Task[] _tasks = Enumerable.Range( 0, scale ).Select( j => Task.Run( () =>
{
for ( int k = 0; k < scale; k++ ) { }
} ) ).ToArray();
await Task.WhenAll( _tasks );
} ) ).ToArray();
await Task.WhenAll( tasks );
Task 里面的 Task 也会在 14 毫秒后结束,但是如果我把里面的 Task.Run 换成 plinq 这样的:
int scale = 32;
Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( () =>
{
Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
{
for ( int k = 0; k < scale; k++ ) { }
} );
} ) ).ToArray();
await Task.WhenAll( tasks );
执行需要 29 秒。如果 scale
变量更大,情况会变得更糟。
谁能解释一下这个案例中发生了什么?
编辑:
我又做了一个实验:
static async Task Main( string[] args )
{
Stopwatch stopwatch = Stopwatch.StartNew();
int scale = 8;
Task[] tasks = Enumerable.Range( 0, scale ).Select( id => Run( scale, id ) ).ToArray();
await Task.WhenAll( tasks );
Console.WriteLine( $"ElapsedTime={stopwatch.ElapsedMilliseconds}ms" );
}
static Task Run( int scale, int id )
{
return Task.Run( () =>
{
Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
{
for ( int k = 0; k < scale; k++ )
{
}
Console.WriteLine( $"[{DateTimeOffset.Now.ToUnixTimeMilliseconds()}]Task {id} for loop {j} end" );
} );
} );
}
这是结果的一部分:
[1557475215796]Task 0 for loop 6 end
[1557475215796]Task 0 for loop 7 end
[1557475216776]Task 4 for loop 0 end
[1557475216776]Task 4 for loop 1 end
[1557475216777]Task 4 for loop 2 end
[1557475216777]Task 4 for loop 3 end
[1557475216778]Task 4 for loop 4 end
[1557475216778]Task 4 for loop 5 end
[1557475216779]Task 4 for loop 6 end
[1557475216780]Task 4 for loop 7 end
[1557475217774]Task 5 for loop 0 end
[1557475217774]Task 5 for loop 1 end
[1557475217775]Task 5 for loop 2 end
查看每个任务之间的时间戳,你会发现每当移动到下一个任务时,都会有一个神秘的 1000 毫秒延迟。我想 plinq 或任务中有一种机制会在某些情况下暂停一秒钟,这会显着减慢进程。
感谢@StephenCleary的解释,现在我明白了延迟来自线程的创建。我再次调整我的实验,发现 ForAll
方法将阻止任务,直到完成不同任务中的所有其他 ForAll
方法。
static Task Run( int scale, int id )
{
return Task.Run( () =>
{
Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
{
for ( int k = 0; k < scale; k++ )
{
}
Console.WriteLine( $"[{DateTimeOffset.Now.ToUnixTimeMilliseconds()}]Task {id} for loop {j} end, thread count = {Process.GetCurrentProcess().Threads.Count}" );
} );
Console.WriteLine( $"[{DateTimeOffset.Now.ToUnixTimeMilliseconds()}]Task {id} finished" );
} );
}
结果是:
[1557478553656]Task 6 for loop 6 end, thread count = 19
[1557478553657]Task 6 for loop 7 end, thread count = 19
[1557478554645]Task 7 for loop 0 end, thread count = 20
[1557478554647]Task 7 for loop 1 end, thread count = 20
[1557478554649]Task 7 for loop 2 end, thread count = 20
[1557478554651]Task 7 for loop 3 end, thread count = 20
[1557478554653]Task 7 for loop 4 end, thread count = 20
[1557478554655]Task 7 for loop 5 end, thread count = 20
[1557478554657]Task 7 for loop 6 end, thread count = 20
[1557478554659]Task 7 for loop 7 end, thread count = 20
[1557478555644]Task 1 finished
[1557478555644]Task 0 finished
[1557478555644]Task 3 finished
[1557478555644]Task 2 finished
[1557478555644]Task 4 finished
[1557478555644]Task 6 finished
[1557478555644]Task 5 finished
[1557478555644]Task 7 finished
我希望 ForAll
方法应该立即 return。为什么会阻塞任务和线程?
您的代码中显然存在问题,让我们回顾一下各种代码片段,尤其是使用 Task
的代码片段,因为 PLinq
中的 PLinq
非常简单,几乎使用了所有代码尽可能快地处理可能的线程/内核,因为处理在内存中并且速度很快,所以不会有太多的上下文转换。事实上 PLinq
本身将管理/控制 并行调用 的数量,不像 Task.Run
相对独立。
Snippet 1
int scale = 32;
Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( async () =>
{
Task[] _tasks = Enumerable.Range( 0, scale ).Select( j => Task.Run( () =>
{
for ( int k = 0; k < scale; k++ ) { }
} ) ).ToArray();
await Task.WhenAll( _tasks );
} ) ).ToArray();
await Task.WhenAll( tasks );
- 这里你的完整处理是在内存中,每个外部任务,异步调度内部循环,而任务本身不会阻塞线程等待内部任务完成,所以外部
Task.Run
,会被通知当内部Task.Run
完成时异步
现在在较慢的代码中发生了什么,让我们回顾一下
Snippet 2
int scale = 32;
Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( () =>
{
Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
{
for ( int k = 0; k < scale; k++ ) { }
} );
} ) ).ToArray();
await Task.WhenAll( tasks );
- 这里每个
Task.Run
不会异步地将请求移交给内部 PLinq 调用,会发生什么情况是Task.Run
调用的线程将被阻止以完成内部 PLinq,这是主要的问题的根源在这里,从而导致激烈的争用。
如上所述,Task.Run
调用 PLinq
与 PLinq
调用 PLinq
之间存在实质性差异,因此关键在于了解这些不同之处API 单独工作,将它们组合起来一起工作有什么影响,如您的代码所期望的那样。