异步 Task.Run 方法与等待 Synchronus 方法和超时的死锁
Deadlock with async Task.Run method with Wait from Synchronus method and timeout
我有一个方法定义为:
public Task<ReturnsMessage> Function() {
var task = Task.Run(() =>
{
var result = SyncMethod();
return new ReturnMessage(result);
});
if (task.Wait(delay)) {
return task;
}
var tcs = new TaskCompletionSource<ReturnMessage>();
tcs.SetCanceled();
return tcs.Task;
}
现在根据 maxAttempts 值在循环中调用它:
(方法名称 RetryableInvoke)
for (var i = 0; i < maxAttempts; i++)
{
try
{
return Function().Result;
}
catch (Exception e)
{
}
}
它工作得很好,但是当有大量负载时,我发现线程正在急剧增加并且转储向我显示以下警告:
任何人都可以建议我处理这种情况的最佳方法,这样我就不会看到任何类型的死锁吗?
您正在使用 Task.Run 启动任务,然后如果超时则返回取消,但您永远不会停止任务。他们只是在后台继续运行。
您的代码应该是 async/await 并使用 CancellationSource 并在 SyncMethod() 中处理取消标记。但是,如果您不能,并且据我了解,您想要异步 运行 一个方法并在一段时间后强行终止它,您可能应该使用线程并中止它们。
警告:除非您知道自己在做什么,否则中止线程是不安全的,甚至可能在未来版本中将其从 .NET 中删除。
其实我前段时间研究过这个:https://siderite.dev/blog/how-to-timeout-task-and-make-sure-it.html
您正在使应用程序陷入僵局,因为您没有使用 async
/await
或 ConfigureAwait(false
) 但您选择使用 Task.Wait
和 Task.Result
相反。
你应该首先知道 Task.Run
捕获它执行的线程的 SynchronizationContext
。然后 Task.Run
在新的 ThreadPool
线程上运行。完成后它会return交给父线程继续执行剩下的代码。当 returning 时,它将 return 到捕获的 SyncronizationContext
。您使用 Task.Wait
和 Task.Result
打破了这个概念。两个 Task
成员将同步调用 Task
,这意味着父线程将阻塞自身,直到子线程完成。子线程完成但是Task
不能return到捕获的SynchronizationContext
去执行剩下的代码(Task.Wait
之后的代码),因为父线程还在阻塞自己等待任务 运行 完成。
因为您在一处使用了 Task.Wait
而在另一处使用了 Task.Result
,因此您造成了两种潜在的死锁情况:
让我们单步执行 Function()
代码:
public Task<ReturnsMessage> Function() {
1) 创建任务并启动它:
var task = Task.Run(
() =>
{
var result = SyncMethod();
return new ReturnMessage(result);
});
重要的事情发生在这里:
Task.Run
捕获当前的 SynchronizationContext
并在后台开始执行,而父线程继续执行。 (如果这里使用 await
,那么 await
之后的剩余代码将被排入一个后续队列以供稍后执行。目的是当前线程可以 return (离开当前上下文)这样它就不需要等待和阻塞。一旦子线程完成运行,剩余的代码将由Task
执行,因为它之前已经在继续队列中排队).
2) task.Wait()
等待后台线程完成。等待意味着阻止线程继续执行。调用堆栈已停放。这等于后台线程的同步,因为父线程不再继续执行而是阻塞,所以不再并行执行:
// Dangerous implementation of a timeout for the executing `task` object
if (task.Wait(delay)) {
return task;
}
重要的事情发生在这里:
task.Wait()
阻塞当前线程(SynchronizationContext
)等待子线程完成。子任务完成并且 Task
尝试执行捕获的 SynchronizationContext
中继续队列中先前排队的剩余代码。但是这个上下文被等待子任务完成的线程阻塞了。潜在的僵局情况一。
以下剩余代码将无法访问:
var tcs = new TaskCompletionSource<ReturnMessage>();
tcs.SetCanceled();
return tcs.Task;
async
和 await
的引入是为了摆脱阻塞等待。 await
允许父线程 return 并继续。 await
之后的剩余代码将作为捕获的 SynchronizationContext
.
中的继续执行
这是对第一个死锁的修复,也使用适当的任务超时解决方案,该解决方案使用 Task.WhenAny
(不是首选):
public async Task<ReturnsMessage> FunctionAsync()
{
using (var cancellationTokenSource = new CancellationTokenSource())
{
try
{
var task = Task.Run(
() =>
{
// Check if the task needs to be cancelled
// because the timeout task ran to completion first
cancellationToken.ThrowIfCancellationRequested();
var result = SyncMethod();
return result;
}, cancellationTokenSource.Token);
int delay = 500;
Task timoutTask = Task.Delay(delay, cancellationTokenSource.Token);
Task firstCompletedTask = await Task.WhenAny(task, timoutTask);
if (firstCompletedTask == task)
{
// The 'task' has won the race, therefore
// cancel the 'timeoutTask'
cancellationTokenSource.Cancel();
return await task;
}
}
catch (OperationCanceledException)
{}
// The 'timeoutTask' has won the race, therefore
// cancel the 'task' instance
cancellationTokenSource.Cancel();
var tcs = new TaskCompletionSource<string>();
tcs.SetCanceled();
return await tcs.Task;
}
}
或者使用 CancellationTokenSouce
超时构造函数重载(首选)的替代和更好的超时方法修复第一个死锁:
public async Task<ReturnsMessage> FunctionAsync()
{
var timeout = 50;
using (var timeoutCancellationTokenSource = new CancellationTokenSource(timeout))
{
try
{
return await Task.Run(
() =>
{
// Check if the timeout elapsed
timeoutCancellationTokenSource.Token.ThrowIfCancellationRequested();
var result = SyncMethod();
return result;
}, timeoutCancellationTokenSource.Token);
}
catch (OperationCanceledException)
{
var tcs = new TaskCompletionSource<string>();
tcs.SetCanceled();
return await tcs.Task;
}
}
}
第二个潜在的死锁代码是Function()
的消耗:
for (var i = 0; i < maxAttempts; i++)
{
return Function().Result;
}
Accessing the property's [Task.Result
] get accessor blocks the calling thread until the asynchronous operation is complete; it is equivalent to calling the Wait method.
死锁的原因与之前解释的相同:阻塞 SynchronizationContext
阻止了计划继续的执行。
要修复第二个死锁,我们可以使用 async/await(首选)或 ConfigreAwait(false)
:
for (var i = 0; i < maxAttempts; i++)
{
return await FunctionAsync();
}
或ConfigreAwait(false)
。这种方法可用于强制同步执行异步方法:
for (var i = 0; i < maxAttempts; i++)
{
return FunctionAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}
ConfigreAwait(false)
指示 Task
忽略捕获的 SynchronizationContext
并在另一个永远不会成为父线程的 ThreadPool
线程上继续执行继续队列.
我有一个方法定义为:
public Task<ReturnsMessage> Function() {
var task = Task.Run(() =>
{
var result = SyncMethod();
return new ReturnMessage(result);
});
if (task.Wait(delay)) {
return task;
}
var tcs = new TaskCompletionSource<ReturnMessage>();
tcs.SetCanceled();
return tcs.Task;
}
现在根据 maxAttempts 值在循环中调用它:
(方法名称 RetryableInvoke)
for (var i = 0; i < maxAttempts; i++)
{
try
{
return Function().Result;
}
catch (Exception e)
{
}
}
它工作得很好,但是当有大量负载时,我发现线程正在急剧增加并且转储向我显示以下警告:
您正在使用 Task.Run 启动任务,然后如果超时则返回取消,但您永远不会停止任务。他们只是在后台继续运行。
您的代码应该是 async/await 并使用 CancellationSource 并在 SyncMethod() 中处理取消标记。但是,如果您不能,并且据我了解,您想要异步 运行 一个方法并在一段时间后强行终止它,您可能应该使用线程并中止它们。
警告:除非您知道自己在做什么,否则中止线程是不安全的,甚至可能在未来版本中将其从 .NET 中删除。
其实我前段时间研究过这个:https://siderite.dev/blog/how-to-timeout-task-and-make-sure-it.html
您正在使应用程序陷入僵局,因为您没有使用 async
/await
或 ConfigureAwait(false
) 但您选择使用 Task.Wait
和 Task.Result
相反。
你应该首先知道 Task.Run
捕获它执行的线程的 SynchronizationContext
。然后 Task.Run
在新的 ThreadPool
线程上运行。完成后它会return交给父线程继续执行剩下的代码。当 returning 时,它将 return 到捕获的 SyncronizationContext
。您使用 Task.Wait
和 Task.Result
打破了这个概念。两个 Task
成员将同步调用 Task
,这意味着父线程将阻塞自身,直到子线程完成。子线程完成但是Task
不能return到捕获的SynchronizationContext
去执行剩下的代码(Task.Wait
之后的代码),因为父线程还在阻塞自己等待任务 运行 完成。
因为您在一处使用了 Task.Wait
而在另一处使用了 Task.Result
,因此您造成了两种潜在的死锁情况:
让我们单步执行 Function()
代码:
public Task<ReturnsMessage> Function() {
1) 创建任务并启动它:
var task = Task.Run(
() =>
{
var result = SyncMethod();
return new ReturnMessage(result);
});
重要的事情发生在这里:
Task.Run
捕获当前的 SynchronizationContext
并在后台开始执行,而父线程继续执行。 (如果这里使用 await
,那么 await
之后的剩余代码将被排入一个后续队列以供稍后执行。目的是当前线程可以 return (离开当前上下文)这样它就不需要等待和阻塞。一旦子线程完成运行,剩余的代码将由Task
执行,因为它之前已经在继续队列中排队).
2) task.Wait()
等待后台线程完成。等待意味着阻止线程继续执行。调用堆栈已停放。这等于后台线程的同步,因为父线程不再继续执行而是阻塞,所以不再并行执行:
// Dangerous implementation of a timeout for the executing `task` object
if (task.Wait(delay)) {
return task;
}
重要的事情发生在这里:
task.Wait()
阻塞当前线程(SynchronizationContext
)等待子线程完成。子任务完成并且 Task
尝试执行捕获的 SynchronizationContext
中继续队列中先前排队的剩余代码。但是这个上下文被等待子任务完成的线程阻塞了。潜在的僵局情况一。
以下剩余代码将无法访问:
var tcs = new TaskCompletionSource<ReturnMessage>();
tcs.SetCanceled();
return tcs.Task;
async
和 await
的引入是为了摆脱阻塞等待。 await
允许父线程 return 并继续。 await
之后的剩余代码将作为捕获的 SynchronizationContext
.
这是对第一个死锁的修复,也使用适当的任务超时解决方案,该解决方案使用 Task.WhenAny
(不是首选):
public async Task<ReturnsMessage> FunctionAsync()
{
using (var cancellationTokenSource = new CancellationTokenSource())
{
try
{
var task = Task.Run(
() =>
{
// Check if the task needs to be cancelled
// because the timeout task ran to completion first
cancellationToken.ThrowIfCancellationRequested();
var result = SyncMethod();
return result;
}, cancellationTokenSource.Token);
int delay = 500;
Task timoutTask = Task.Delay(delay, cancellationTokenSource.Token);
Task firstCompletedTask = await Task.WhenAny(task, timoutTask);
if (firstCompletedTask == task)
{
// The 'task' has won the race, therefore
// cancel the 'timeoutTask'
cancellationTokenSource.Cancel();
return await task;
}
}
catch (OperationCanceledException)
{}
// The 'timeoutTask' has won the race, therefore
// cancel the 'task' instance
cancellationTokenSource.Cancel();
var tcs = new TaskCompletionSource<string>();
tcs.SetCanceled();
return await tcs.Task;
}
}
或者使用 CancellationTokenSouce
超时构造函数重载(首选)的替代和更好的超时方法修复第一个死锁:
public async Task<ReturnsMessage> FunctionAsync()
{
var timeout = 50;
using (var timeoutCancellationTokenSource = new CancellationTokenSource(timeout))
{
try
{
return await Task.Run(
() =>
{
// Check if the timeout elapsed
timeoutCancellationTokenSource.Token.ThrowIfCancellationRequested();
var result = SyncMethod();
return result;
}, timeoutCancellationTokenSource.Token);
}
catch (OperationCanceledException)
{
var tcs = new TaskCompletionSource<string>();
tcs.SetCanceled();
return await tcs.Task;
}
}
}
第二个潜在的死锁代码是Function()
的消耗:
for (var i = 0; i < maxAttempts; i++)
{
return Function().Result;
}
Accessing the property's [
Task.Result
] get accessor blocks the calling thread until the asynchronous operation is complete; it is equivalent to calling the Wait method.
死锁的原因与之前解释的相同:阻塞 SynchronizationContext
阻止了计划继续的执行。
要修复第二个死锁,我们可以使用 async/await(首选)或 ConfigreAwait(false)
:
for (var i = 0; i < maxAttempts; i++)
{
return await FunctionAsync();
}
或ConfigreAwait(false)
。这种方法可用于强制同步执行异步方法:
for (var i = 0; i < maxAttempts; i++)
{
return FunctionAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}
ConfigreAwait(false)
指示 Task
忽略捕获的 SynchronizationContext
并在另一个永远不会成为父线程的 ThreadPool
线程上继续执行继续队列.