异步 Task.Run 方法与等待 Synchronus 方法和超时的死锁

Deadlock with async Task.Run method with Wait from Synchronus method and timeout

我有一个方法定义为:

public Task<ReturnsMessage> Function() {
var task = Task.Run(() =>
{
     var result = SyncMethod();
     return new ReturnMessage(result);
});

 if (task.Wait(delay)) {
           return task;
}

  var tcs = new TaskCompletionSource<ReturnMessage>();
            tcs.SetCanceled();
            return tcs.Task;
}

现在根据 maxAttempts 值在循环中调用它:

(方法名称 RetryableInvoke)

 for (var i = 0; i < maxAttempts; i++)
 {
    try
    {
        return Function().Result;
    }
    catch (Exception e)
    {
    }
  }

它工作得很好,但是当有大量负载时,我发现线程正在急剧增加并且转储向我显示以下警告:

任何人都可以建议我处理这种情况的最佳方法,这样我就不会看到任何类型的死锁吗?

您正在使用 Task.Run 启动任务,然后如果超时则返回取消,但您永远不会停止任务。他们只是在后台继续运行。

您的代码应该是 async/await 并使用 CancellationSource 并在 SyncMethod() 中处理取消标记。但是,如果您不能,并且据我了解,您想要异步 运行 一个方法并在一段时间后强行终止它,您可能应该使用线程并中止它们。

警告:除非您知道自己在做什么,否则中止线程是不安全的,甚至可能在未来版本中将其从 .NET 中删除。

其实我前段时间研究过这个:https://siderite.dev/blog/how-to-timeout-task-and-make-sure-it.html

您正在使应用程序陷入僵局,因为您没有使用 async/awaitConfigureAwait(false) 但您选择使用 Task.WaitTask.Result相反。

你应该首先知道 Task.Run 捕获它执行的线程的 SynchronizationContext。然后 Task.Run 在新的 ThreadPool 线程上运行。完成后它会return交给父线程继续执行剩下的代码。当 returning 时,它将 return 到捕获的 SyncronizationContext。您使用 Task.WaitTask.Result 打破了这个概念。两个 Task 成员将同步调用 Task,这意味着父线程将阻塞自身,直到子线程完成。子线程完成但是Task不能return到捕获的SynchronizationContext去执行剩下的代码(Task.Wait之后的代码),因为父线程还在阻塞自己等待任务 运行 完成。

因为您在一处使用了 Task.Wait 而在另一处使用了 Task.Result,因此您造成了两种潜在的死锁情况:

让我们单步执行 Function() 代码:

public Task<ReturnsMessage> Function() {

1) 创建任务并启动它:

var task = Task.Run(
  () => 
  { 
    var result = SyncMethod();
    return new ReturnMessage(result);
   });

重要的事情发生在这里:
Task.Run 捕获当前的 SynchronizationContext 并在后台开始执行,而父线程继续执行。 (如果这里使用 await,那么 await 之后的剩余代码将被排入一个后续队列以供稍后执行。目的是当前线程可以 return (离开当前上下文)这样它就不需要等待和阻塞。一旦子线程完成运行,剩余的代码将由Task执行,因为它之前已经在继续队列中排队).

2) task.Wait() 等待后台线程完成。等待意味着阻止线程继续执行。调用堆栈已停放。这等于后台线程的同步,因为父线程不再继续执行而是阻塞,所以不再并行执行:

 // Dangerous implementation of a timeout for the executing `task` object
 if (task.Wait(delay)) {
     return task;
 }

重要的事情发生在这里:
task.Wait() 阻塞当前线程(SynchronizationContext)等待子线程完成。子任务完成并且 Task 尝试执行捕获的 SynchronizationContext 中继续队列中先前排队的剩余代码。但是这个上下文被等待子任务完成的线程阻塞了。潜在的僵局情况一。

以下剩余代码将无法访问:

var tcs = new TaskCompletionSource<ReturnMessage>();
tcs.SetCanceled();
return tcs.Task;

asyncawait 的引入是为了摆脱阻塞等待。 await 允许父线程 return 并继续。 await 之后的剩余代码将作为捕获的 SynchronizationContext.

中的继续执行

这是对第一个死锁的修复,也使用适当的任务超时解决方案,该解决方案使用 Task.WhenAny(不是首选):

public async Task<ReturnsMessage> FunctionAsync()
{
  using (var cancellationTokenSource = new CancellationTokenSource())
  {
    try
    {
      var task = Task.Run(
        () =>
        {
          // Check if the task needs to be cancelled
          // because the timeout task ran to completion first
          cancellationToken.ThrowIfCancellationRequested();

          var result = SyncMethod();
          return result;
        }, cancellationTokenSource.Token);

      int delay = 500;
      Task timoutTask = Task.Delay(delay, cancellationTokenSource.Token);
      Task firstCompletedTask = await Task.WhenAny(task, timoutTask);

      if (firstCompletedTask == task)
      {
        // The 'task' has won the race, therefore
        // cancel the 'timeoutTask'
        cancellationTokenSource.Cancel();
        return await task;
      }
    }
    catch (OperationCanceledException)
    {}

    // The 'timeoutTask' has won the race, therefore
    // cancel the 'task' instance
    cancellationTokenSource.Cancel();

    var tcs = new TaskCompletionSource<string>();
    tcs.SetCanceled();
    return await tcs.Task;
  }
}

或者使用 CancellationTokenSouce 超时构造函数重载(首选)的替代和更好的超时方法修复第一个死锁:

public async Task<ReturnsMessage> FunctionAsync()
{
  var timeout = 50;
  using (var timeoutCancellationTokenSource = new CancellationTokenSource(timeout))
  {
    try
    {
      return await Task.Run(
        () =>
        {
          // Check if the timeout elapsed
          timeoutCancellationTokenSource.Token.ThrowIfCancellationRequested();

          var result = SyncMethod();
          return result;
        }, timeoutCancellationTokenSource.Token);
    }
    catch (OperationCanceledException)
    {
      var tcs = new TaskCompletionSource<string>();
      tcs.SetCanceled();
      return await tcs.Task;
    }
  }
}

第二个潜在的死锁代码是Function()的消耗:

for (var i = 0; i < maxAttempts; i++)
{
  return Function().Result;
}

来自Microsoft Docs:

Accessing the property's [Task.Result] get accessor blocks the calling thread until the asynchronous operation is complete; it is equivalent to calling the Wait method.

死锁的原因与之前解释的相同:阻塞 SynchronizationContext 阻止了计划继续的执行。
要修复第二个死锁,我们可以使用 async/await(首选)或 ConfigreAwait(false):

for (var i = 0; i < maxAttempts; i++)
{
  return await FunctionAsync();
}

ConfigreAwait(false)。这种方法可用于强制同步执行异步方法:

for (var i = 0; i < maxAttempts; i++)
{
  return FunctionAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}

ConfigreAwait(false) 指示 Task 忽略捕获的 SynchronizationContext 并在另一个永远不会成为父线程的 ThreadPool 线程上继续执行继续队列.