一个接一个地执行非阻塞异步任务

Executing non-blocking asynchronous tasks one after the other

我正在使用 C#、TPL。我有一个 class,其中包含一些执行一些子任务的异步方法,为简单起见,我将只考虑一种方法和一个子任务:

class Test1
{
    private Task SubTask() => Task.Delay(1000);

    public async Task FullTask()
    {
        Console.WriteLine("Task Start");
        await SubTask();
        Console.WriteLine("Task Middle");
        await SubTask();
        Console.WriteLine("Task End");
    }

    static async Task Main()
    {
        Test1 Test = new Test1();
        Task Task1 = Test.FullTask();
        Task Task2 = Test.FullTask();
        await Task.WhenAll(Task1, Task2);
    }
}

执行后,控制台将打印以下(预期)结果:

Task Start
Task Start
Task Middle
Task Middle
Task End
Task End

问题是每次调用FullTask必须运行在前一个调用完成之后,如果同时发生多次调用FullTask,则必须一一处理。我的第一个想法是使用 ContinueWith 方法:

class Test2
{
    private Task LastTask = Task.CompletedTask;

    private Task SubTask() => Task.Delay(1000);

    public Task FullTask()
    {
        lock(LastTask)
        {
            return LastTask = LastTask.ContinueWith(_ =>
            {
                Console.WriteLine("Task Start");
                SubTask().Wait();
                Console.WriteLine("Task Middle");
                SubTask().Wait();
                Console.WriteLine("Task End");
            });
        }
    }

    static async Task Main()
    {
        Test2 Test = new Test2();
        Task Task1 = Test.FullTask();
        Task Task2 = Test.FullTask();
        await Task.WhenAll(Task1, Task2);
    }
}

同样,执行后控制台中打印出以下(预期)结果:

Task Start
Task Middle
Task End
Task Start
Task Middle
Task End

问题是 FullTask 中的 lambda 阻塞了线程,因为它使用 SubTask().Wait(); 而不是 await SubTask();。如果存在多个Test2 class 实例,每个实例都执行FullTask 方法,就会发生线程池饥饿。将 ether FullTask 或 lambda(或两者)更改为异步并不能解决问题:

class Test3
{
    private Task LastTask = Task.CompletedTask;

    private Task SubTask() => Task.Delay(1000);

    public Task FullTask()
    {
        lock(LastTask)
        {
            return LastTask = LastTask.ContinueWith(async _ =>
            {
                Console.WriteLine("Task Start");
                await SubTask();
                Console.WriteLine("Task Middle");
                await SubTask();
                Console.WriteLine("Task End");
            });
        }
    }

    static async Task Main()
    {
        Test3 Test = new Test3();
        Task Task1 = Test.FullTask();
        Task Task2 = Test.FullTask();
        await Task.WhenAll(Task1, Task2);
    }
}

ContinueWithreturn是一个Task<Task>,外层的TaskLastTask之后要执行的定时任务。该任务将在第一个 await 处结束,并且它将 return 将在 lambda 结束时结束的内部(编译器生成的)任务。在这里,我与内部任务相交,该内部任务在外部任务到达第一个等待之前不会被创建。这样的方法是行不通的。

我想要的是一种产生与 Test 相同结果的非阻塞方法。有什么想法吗?

如果您想要 ContinueWith 方法的迭代(使用更现代的 await),像这样的方法应该可行:

private readonly object _lastTaskMutex = new object();
public Task FullTask()
{
  lock (_lastTaskMutex)
    return LastTask = RunAfterAsync(LastTask);

  async Task RunAfterAsync(Task lastTask)
  {
    try
    {
      await lastTask;
    }
    catch { }
    Console.WriteLine("Task Start");
    await SubTask();
    Console.WriteLine("Task Middle");
    await SubTask();
    Console.WriteLine("Task End");
  }
}

如果您只关心互斥而不关心确切的顺序,那么 SemaphoreSlim 就可以了:

private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1);
public async Task FullTask()
{
  await _mutex.WaitAsync();
  try
  {
    Console.WriteLine("Task Start");
    await SubTask();
    Console.WriteLine("Task Middle");
    await SubTask();
    Console.WriteLine("Task End");
  }
  finally
  {
    _mutex.Release();
  }
}

或者,如果您真正想要的是一个严格的 FIFO 操作队列,那么 Patagonias 建议的 Channel 或 ActionBlock 是合适的;在这种情况下,您通常希望传递一个 TaskCompletionSource<T> 来指示各个请求何时完成:

private readonly ActionBlock<TaskCompletionSource<object>> _block = new ActionBlock<TaskCompletionSource<object>>(async tcs =>
{
  try
  {
    Console.WriteLine("Task Start");
    await SubTask();
    Console.WriteLine("Task Middle");
    await SubTask();
    Console.WriteLine("Task End");
    tcs.TrySetResult(null);
  }
  catch (Exception ex)
  {
    tcs.TrySetException(ex);
  }
});

public Task FullTask()
{
  var tcs = new TaskCompletionSource<object>();
  _block.Post(tcs);
  return tcs.Task;
}