C# 使用 BlockingCollection 排队异步任务,仅在队列中的前一个 getter 任务返回值后才处理队列

C# Queuing async Task using BlockingCollection and process queue only after value returned for previous getter task in queue

最近,我需要对异步任务进行排队,在此 link 中向我介绍了 BlockingCollection 它奏效了,我的要求略有变化,需要您的指导。我在@Stephen Cleary 的回答中使用 BlockingCollection

这是来自 link

的 BlockingCollection
public sealed class ExecutionQueue
{
  //private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();//commented this
  private readonly BlockingCollection<Task> _queue = new BlockingCollection<Task>();

  public ExecutionQueue() => Complete = Task.Run(() => ProcessQueueAsync());

  public Task Completion { get; }

  public void Complete() => _queue.CompleteAdding();

  private async Task ProcessQueueAsync()
  {
    foreach (var value in _queue.GetConsumingEnumerable())
      await value();
  }
}

//public Task Run(Func<Task> lambda)
public Task Run(<Task> lambda)
{
  var tcs = new TaskCompletionSource<object>();
  _queue.Add(lamda);
  return tcs.Task;
}
  1. 我需要在常规 void 方法中对某些数据库任务进行排队。我可能无法更改此方法的签名。我该怎么做?
 public static ExecutionQueue taskQueue = new ExecutionQueue();

 private void SaveValesToDB(...)
 {
    var item = GetID(...);
    ...
    taskQueue.Run(Task.Run(() =>
    {
       DBInstance.DBSaveValue1(...); // is it correct to wrap with Task.Run and add to queue? it should be queued and run asynchronously
     });
    ...
 }
  1. 我们断断续续地从数据库中保存和检索数据。因此,当我们对 returning 类似 getter 的数据库调用进行排队时,我们希望确保在收到 return 值之前,我们不会处理其他排队的项目.
private void SaveValesToDB(...)
{
 ...
 taskQueue.Run(Task.Run(() =>
 {
    DBInstance.DBSaveValue1(...); // is this correct? it should be queued and run asynchronously
  });
 ...
 taskQueue.Run(Task.Run(() =>
 {
    var result1 = DBInstance.DBGetValue2(...); // should be queued and run asynchronously; 
    LogData(result1);// not a DB call but believe it should be wrapped in here for the result1, correct?
 });

 /*so in above Task.Run,  i want to ensure that until i receive result1 
 i don't process other items in the queue even 
 if they are added. how can i do that ? 
 The main thread should continue. */
 ...
 var result 2 = DBInstance.DBGetValue3(...); // should be queued and run asynchronously

 UpdateAdvancedLod(result1 +" "+result2);// here, should i block main thread until i get result1 ?
}

  1. 如何处理错误?

请指导我。

已编辑:

if using Func<Task> in public Task Run(Func<Task> lambda) then is the below correct?

taskQueue.Run(async () =>
                {
                    await Task.Run(() =>
                    {
                        DBInstance.DBSaveValue1(...);//is this correct
                    });
                }
                );

您可以将此方法添加到 Stephen Cleary 的 class:

public Task Run(Action action)
{
    return Run(() => Task.Run(action));
}

这是现有 public Task Run(Func<Task> lambda) 方法的重载。这个将提供的 action 的执行委托给 ThreadPool 线程。

用法示例:

var id = GetID();
var task = taskQueue.Run(() => DBInstance.DBSaveValue1(id));
await task; // Optional

更新: 要将错误通知传播到主线程,您可以使用 Error 事件增强 ExecutionQueue class,这将在捕获的上下文中调用(在创建实例时捕获)。

private readonly SynchronizationContext _capturedContext;

public event EventHandler<Exception> Error;

public ExecutionQueue() // Constructor
{
    _capturedContext = SynchronizationContext.Current ?? new SynchronizationContext();
    Completion = Task.Run(() => ProcessQueueAsync());
}

private void OnError(Exception ex)
{
    var handler = Error; if (handler == null) return;
    _capturedContext.Post(_ => handler.Invoke(this, ex), null);
}

OnError 应该从 catch (Exception ex) 块内部调用。这将适用于 Windows Forms 应用程序和 WPF 应用程序,因为它们的 UI 线程配备了 SynchronizationContext。它不适用于控制台应用程序,因为那里没有 SynchronizationContextError 事件 将在随机 ThreadPool 线程中提出。