C# 使用 BlockingCollection 排队异步任务,仅在队列中的前一个 getter 任务返回值后才处理队列
C# Queuing async Task using BlockingCollection and process queue only after value returned for previous getter task in queue
最近,我需要对异步任务进行排队,在此 link 中向我介绍了 BlockingCollection
它奏效了,我的要求略有变化,需要您的指导。我在@Stephen Cleary 的回答中使用 BlockingCollection
这是来自 link
的 BlockingCollection
public sealed class ExecutionQueue
{
//private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();//commented this
private readonly BlockingCollection<Task> _queue = new BlockingCollection<Task>();
public ExecutionQueue() => Complete = Task.Run(() => ProcessQueueAsync());
public Task Completion { get; }
public void Complete() => _queue.CompleteAdding();
private async Task ProcessQueueAsync()
{
foreach (var value in _queue.GetConsumingEnumerable())
await value();
}
}
//public Task Run(Func<Task> lambda)
public Task Run(<Task> lambda)
{
var tcs = new TaskCompletionSource<object>();
_queue.Add(lamda);
return tcs.Task;
}
- 我需要在常规 void 方法中对某些数据库任务进行排队。我可能无法更改此方法的签名。我该怎么做?
public static ExecutionQueue taskQueue = new ExecutionQueue();
private void SaveValesToDB(...)
{
var item = GetID(...);
...
taskQueue.Run(Task.Run(() =>
{
DBInstance.DBSaveValue1(...); // is it correct to wrap with Task.Run and add to queue? it should be queued and run asynchronously
});
...
}
- 我们断断续续地从数据库中保存和检索数据。因此,当我们对 returning 类似 getter 的数据库调用进行排队时,我们希望确保在收到 return 值之前,我们不会处理其他排队的项目.
private void SaveValesToDB(...)
{
...
taskQueue.Run(Task.Run(() =>
{
DBInstance.DBSaveValue1(...); // is this correct? it should be queued and run asynchronously
});
...
taskQueue.Run(Task.Run(() =>
{
var result1 = DBInstance.DBGetValue2(...); // should be queued and run asynchronously;
LogData(result1);// not a DB call but believe it should be wrapped in here for the result1, correct?
});
/*so in above Task.Run, i want to ensure that until i receive result1
i don't process other items in the queue even
if they are added. how can i do that ?
The main thread should continue. */
...
var result 2 = DBInstance.DBGetValue3(...); // should be queued and run asynchronously
UpdateAdvancedLod(result1 +" "+result2);// here, should i block main thread until i get result1 ?
}
- 如何处理错误?
请指导我。
已编辑:
if using Func<Task> in public Task Run(Func<Task> lambda) then is the below correct?
taskQueue.Run(async () =>
{
await Task.Run(() =>
{
DBInstance.DBSaveValue1(...);//is this correct
});
}
);
您可以将此方法添加到 Stephen Cleary 的 class:
public Task Run(Action action)
{
return Run(() => Task.Run(action));
}
这是现有 public Task Run(Func<Task> lambda)
方法的重载。这个将提供的 action
的执行委托给 ThreadPool
线程。
用法示例:
var id = GetID();
var task = taskQueue.Run(() => DBInstance.DBSaveValue1(id));
await task; // Optional
更新: 要将错误通知传播到主线程,您可以使用 Error
事件增强 ExecutionQueue
class,这将在捕获的上下文中调用(在创建实例时捕获)。
private readonly SynchronizationContext _capturedContext;
public event EventHandler<Exception> Error;
public ExecutionQueue() // Constructor
{
_capturedContext = SynchronizationContext.Current ?? new SynchronizationContext();
Completion = Task.Run(() => ProcessQueueAsync());
}
private void OnError(Exception ex)
{
var handler = Error; if (handler == null) return;
_capturedContext.Post(_ => handler.Invoke(this, ex), null);
}
OnError
应该从 catch (Exception ex)
块内部调用。这将适用于 Windows Forms 应用程序和 WPF 应用程序,因为它们的 UI 线程配备了 SynchronizationContext
。它不适用于控制台应用程序,因为那里没有 SynchronizationContext
(Error
事件
将在随机 ThreadPool
线程中提出。
最近,我需要对异步任务进行排队,在此 link 中向我介绍了 BlockingCollection
这是来自 link
的 BlockingCollectionpublic sealed class ExecutionQueue
{
//private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();//commented this
private readonly BlockingCollection<Task> _queue = new BlockingCollection<Task>();
public ExecutionQueue() => Complete = Task.Run(() => ProcessQueueAsync());
public Task Completion { get; }
public void Complete() => _queue.CompleteAdding();
private async Task ProcessQueueAsync()
{
foreach (var value in _queue.GetConsumingEnumerable())
await value();
}
}
//public Task Run(Func<Task> lambda)
public Task Run(<Task> lambda)
{
var tcs = new TaskCompletionSource<object>();
_queue.Add(lamda);
return tcs.Task;
}
- 我需要在常规 void 方法中对某些数据库任务进行排队。我可能无法更改此方法的签名。我该怎么做?
public static ExecutionQueue taskQueue = new ExecutionQueue();
private void SaveValesToDB(...)
{
var item = GetID(...);
...
taskQueue.Run(Task.Run(() =>
{
DBInstance.DBSaveValue1(...); // is it correct to wrap with Task.Run and add to queue? it should be queued and run asynchronously
});
...
}
- 我们断断续续地从数据库中保存和检索数据。因此,当我们对 returning 类似 getter 的数据库调用进行排队时,我们希望确保在收到 return 值之前,我们不会处理其他排队的项目.
private void SaveValesToDB(...)
{
...
taskQueue.Run(Task.Run(() =>
{
DBInstance.DBSaveValue1(...); // is this correct? it should be queued and run asynchronously
});
...
taskQueue.Run(Task.Run(() =>
{
var result1 = DBInstance.DBGetValue2(...); // should be queued and run asynchronously;
LogData(result1);// not a DB call but believe it should be wrapped in here for the result1, correct?
});
/*so in above Task.Run, i want to ensure that until i receive result1
i don't process other items in the queue even
if they are added. how can i do that ?
The main thread should continue. */
...
var result 2 = DBInstance.DBGetValue3(...); // should be queued and run asynchronously
UpdateAdvancedLod(result1 +" "+result2);// here, should i block main thread until i get result1 ?
}
- 如何处理错误?
请指导我。
已编辑:
if using Func<Task> in public Task Run(Func<Task> lambda) then is the below correct?
taskQueue.Run(async () =>
{
await Task.Run(() =>
{
DBInstance.DBSaveValue1(...);//is this correct
});
}
);
您可以将此方法添加到 Stephen Cleary 的
public Task Run(Action action)
{
return Run(() => Task.Run(action));
}
这是现有 public Task Run(Func<Task> lambda)
方法的重载。这个将提供的 action
的执行委托给 ThreadPool
线程。
用法示例:
var id = GetID();
var task = taskQueue.Run(() => DBInstance.DBSaveValue1(id));
await task; // Optional
更新: 要将错误通知传播到主线程,您可以使用 Error
事件增强 ExecutionQueue
class,这将在捕获的上下文中调用(在创建实例时捕获)。
private readonly SynchronizationContext _capturedContext;
public event EventHandler<Exception> Error;
public ExecutionQueue() // Constructor
{
_capturedContext = SynchronizationContext.Current ?? new SynchronizationContext();
Completion = Task.Run(() => ProcessQueueAsync());
}
private void OnError(Exception ex)
{
var handler = Error; if (handler == null) return;
_capturedContext.Post(_ => handler.Invoke(this, ex), null);
}
OnError
应该从 catch (Exception ex)
块内部调用。这将适用于 Windows Forms 应用程序和 WPF 应用程序,因为它们的 UI 线程配备了 SynchronizationContext
。它不适用于控制台应用程序,因为那里没有 SynchronizationContext
(Error
事件
将在随机 ThreadPool
线程中提出。