C#中的队列异步任务

Queuing asynchronous task in C#

我有几种方法可以向数据库报告一些数据。我们希望异步调用对数据服务的所有调用。这些对数据服务的调用都结束了,因此我们要确保这些 DS 调用在任何给定时间都按顺序执行。最初,我对这些方法中的每一个都使用了 async await 并且每个调用都是异步执行的,但我们发现如果它们顺序不正确,那么就有错误的余地。

所以,我认为我们应该将所有这些异步任务排队并在单独的线程中发送它们,但我想知道我们有哪些选择?我遇到了 'SemaphoreSlim' 。这适合我的用例吗? 或者还有哪些其他选项适合我的用例?请指导我。

所以,我目前的代码中有什么

public static SemaphoreSlim mutex = new SemaphoreSlim(1);

//first DS call 

 public async Task SendModuleDataToDSAsync(Module parameters)
    {
        var tasks1 = new List<Task>();
        var tasks2 = new List<Task>();

        //await mutex.WaitAsync(); **//is this correct way to use SemaphoreSlim ?**
        foreach (var setting in Module.param)
        {
           Task job1 = SaveModule(setting);
           tasks1.Add(job1);
           Task job2= SaveModule(GetAdvancedData(setting));
           tasks2.Add(job2);
        }

        await Task.WhenAll(tasks1);
        await Task.WhenAll(tasks2);

        //mutex.Release(); // **is this correct?**
    }

 private async Task SaveModule(Module setting)
    {
        await Task.Run(() =>
            {
             // Invokes Calls to DS
             ... 
            });
    }

//在主线程的某个地方,调用对 DS

的第二次调用
  //Second DS Call
 private async Task SendInstrumentSettingsToDS(<param1>, <param2>)
 {
    //await mutex.WaitAsync();// **is this correct?**
    await Task.Run(() =>
            {
                 //TrackInstrumentInfoToDS
                 //mutex.Release();// **is this correct?**
            });
    if(param2)
    {
        await Task.Run(() =>
               {
                  //TrackParam2InstrumentInfoToDS
               });
    }
 }

请记住,您将所有任务排入列表的第一个解决方案并不能确保任务一个接一个地执行。它们都是 运行 并行的,因为在下一个任务开始之前不会等待它们。

所以是的,您必须使用 SemapohoreSlim 来使用异步锁定和等待。一个简单的实现可能是:

private readonly SemaphoreSlim _syncRoot = new SemaphoreSlim(1);

public async Task SendModuleDataToDSAsync(Module parameters)
{
    await this._syncRoot.WaitAsync();
    try
    {
        foreach (var setting in Module.param)
        {
           await SaveModule(setting);
           await SaveModule(GetAdvancedData(setting));
        }
    }
    finally
    {
        this._syncRoot.Release();
    }
}

如果可以使用Nito.AsyncEx代码可以简化为:

public async Task SendModuleDataToDSAsync(Module parameters)
{
    using var lockHandle = await this._syncRoot.LockAsync();

    foreach (var setting in Module.param)
    {
       await SaveModule(setting);
       await SaveModule(GetAdvancedData(setting));
    }
}

一个选项是对将创建任务的操作进行排队,而不是像问题中的代码那样对已经 运行 的任务进行排队。

没有锁定的伪代码:

 Queue<Func<Task>> tasksQueue = new Queue<Func<Task>>();

 async Task RunAllTasks()
 {
      while (tasksQueue.Count > 0)
      { 
           var taskCreator = tasksQueue.Dequeu(); // get creator 
           var task = taskCreator(); // staring one task at a time here
           await task; // wait till task completes
      }
  }

  // note that declaring createSaveModuleTask does not  
  // start SaveModule task - it will only happen after this func is invoked
  // inside RunAllTasks
  Func<Task> createSaveModuleTask = () => SaveModule(setting);

  tasksQueue.Add(createSaveModuleTask);
  tasksQueue.Add(() => SaveModule(GetAdvancedData(setting)));
  // no DB operations started at this point

  // this will start tasks from the queue one by one.
  await RunAllTasks();

在实际代码中使用 ConcurrentQueue 可能是正确的做法。您还需要知道当所有操作都开始并一个接一个地等待时要停止的预期操作总数。

根据您在 Alexeis 回答下的评论,您对 SemaphoreSlim 的处理是正确的。

假设方法 SendInstrumentSettingsToDSSendModuleDataToDSAsync 是同一个 class 的成员。您 simplay 需要 SemaphoreSlim 的实例变量,然后在每个需要同步的方法开始时调用 await lock.WaitAsync() 并在 finally 块中调用 lock.Release()

public async Task SendModuleDataToDSAsync(Module parameters)
{
    await lock.WaitAsync();
    try
    {
        ...
    }
    finally
    {
        lock.Release();
    }
}

private async Task SendInstrumentSettingsToDS(<param1>, <param2>)
{
    await lock.WaitAsync();
    try
    {
        ...
    }
    finally
    {
        lock.Release();
    }
}

重要的是 lock.Release() 的调用在 finally 块中,因此如果在 try 块的代码中某处抛出异常,信号量将被释放。

Initially, i was using async await on each of these methods and each of the calls were executed asynchronously but we found out if they are out of sequence then there are room for errors.

So, i thought we should queue all these asynchronous tasks and send them in a separate thread but i want to know what options we have? I came across 'SemaphoreSlim' .

SemaphoreSlim 确实将异步代码限制为 运行 一次一个 ,并且是 互斥 [=] 的有效形式35=]。但是,由于“乱序”调用会导致错误,因此 SemaphoreSlim 不是 合适的解决方案,因为它不保证 FIFO。

从更一般的意义上讲,没有任何同步原语可以保证 FIFO,因为这可能会由于诸如锁定车队之类的副作用而导致问题。另一方面,数据结构严格的 FIFO 是很自然的。

因此,您需要使用自己的 FIFO 队列,而不是使用隐式执行队列。 Channels 是一个很好的、高性能的、异步兼容的队列,但由于您使用的是旧版本的 C#/.NET,BlockingCollection<T> 可以工作:

public sealed class ExecutionQueue
{
  private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();

  public ExecutionQueue() => Completion = Task.Run(() => ProcessQueueAsync());

  public Task Completion { get; }

  public void Complete() => _queue.CompleteAdding();

  private async Task ProcessQueueAsync()
  {
    foreach (var value in _queue.GetConsumingEnumerable())
      await value();
  }
}

此设置唯一棘手的部分是如何对工作进行排队。从代码排队工作的角度来看,他们想知道 lambda 何时执行,而不是 lambda 何时排队。从队列方法(我称之为Run)的角度来看,该方法只需要在lambda 执行后完成其返回的任务。所以,你可以这样写队列方法:

public Task Run(Func<Task> lambda)
{
  var tcs = new TaskCompletionSource<object>();
  _queue.Add(async () =>
  {
    // Execute the lambda and propagate the results to the Task returned from Run
    try
    {
      await lambda();
      tcs.TrySetResult(null);
    }
    catch (OperationCanceledException ex)
    {
      tcs.TrySetCanceled(ex.CancellationToken);
    }
    catch (Exception ex)
    {
      tcs.TrySetException(ex);
    }
  });
  return tcs.Task;
}

这种排队方法并不完美。如果一个任务完成时出现多个异常(这对于并行代码来说是正常的),则只保留第一个异常(这对于异步代码来说是正常的)。 OperationCanceledException 处理也存在边缘情况。但是这段代码对于大多数情况来说已经足够好了。

现在你可以这样使用了:

public static ExecutionQueue _queue = new ExecutionQueue();

public async Task SendModuleDataToDSAsync(Module parameters)
{
  var tasks1 = new List<Task>();
  var tasks2 = new List<Task>();

  foreach (var setting in Module.param)
  {
    Task job1 = _queue.Run(() => SaveModule(setting));
    tasks1.Add(job1);
    Task job2 = _queue.Run(() => SaveModule(GetAdvancedData(setting)));
    tasks2.Add(job2);
  }

  await Task.WhenAll(tasks1);
  await Task.WhenAll(tasks2);
}