异步队列实现 .Wait() 比等待更快
Async Queue implementation .Wait() faster than await
为了提供可以依次排队和执行 async 方法的生产者-消费者功能,我正在尝试实现一个异步队列。我注意到在大型应用程序中使用它时存在主要性能问题。
async Task Loop() {
while (Verify()) {
if (!_blockingCollection.TryTake(out var func, 1000, _token)) continue;
await func.Invoke();
}
}
实施AsyncQueue.Add:
public void Add(Func<Task> func) {
_blockingCollection.Add(func);
}
来自任意线程的示例用法:
controller.OnEvent += (o, a) => _queue.Add(async (t) => await handle(a));
执行路径取决于应用程序的状态,包括
内部使用 TaskCompletionSource 的异步网络请求 return 结果
IO操作
已添加到列表并等待使用 Task.WhenAll(...)
的任务
转换数组并等待网络请求的async void方法
症状:
应用程序逐渐变慢。
当我将 await func.Invoke()
替换为 func.Invoke().Wait()
而不是正确等待它时,性能会显着提高并且不会降低速度。
这是为什么?使用 BlockingCollection
的异步队列是个坏主意吗?
什么是更好的选择?
Why is that?
问题中的信息不足,无法提供答案。
正如其他人所指出的,目前的循环存在 CPU 消耗自旋的问题。
同时,我至少可以回答这部分:
Is an async Queue that uses BlockingCollection a bad idea?
是的。
What is a better alternative?
使用异步兼容队列。例如,来自 TPL 数据流的 Channels
或 BufferBlock
/ActionBlock
。
使用频道的示例:
async Task Loop() {
await foreach (var func in channelReader.ReadAllAsync()) {
await func.Invoke();
}
}
或者如果您还没有使用 .NET Core:
async Task Loop() {
while (await channelReader.WaitToReadAsync()) {
while (channelReader.TryRead(out var func)) {
await func.Invoke();
}
}
}
为了提供可以依次排队和执行 async 方法的生产者-消费者功能,我正在尝试实现一个异步队列。我注意到在大型应用程序中使用它时存在主要性能问题。
async Task Loop() {
while (Verify()) {
if (!_blockingCollection.TryTake(out var func, 1000, _token)) continue;
await func.Invoke();
}
}
实施AsyncQueue.Add:
public void Add(Func<Task> func) {
_blockingCollection.Add(func);
}
来自任意线程的示例用法:
controller.OnEvent += (o, a) => _queue.Add(async (t) => await handle(a));
执行路径取决于应用程序的状态,包括
内部使用 TaskCompletionSource 的异步网络请求 return 结果
IO操作
已添加到列表并等待使用 Task.WhenAll(...)
的任务
转换数组并等待网络请求的async void方法
症状: 应用程序逐渐变慢。
当我将 await func.Invoke()
替换为 func.Invoke().Wait()
而不是正确等待它时,性能会显着提高并且不会降低速度。
这是为什么?使用 BlockingCollection
的异步队列是个坏主意吗?
什么是更好的选择?
Why is that?
问题中的信息不足,无法提供答案。
正如其他人所指出的,目前的循环存在 CPU 消耗自旋的问题。
同时,我至少可以回答这部分:
Is an async Queue that uses BlockingCollection a bad idea?
是的。
What is a better alternative?
使用异步兼容队列。例如,来自 TPL 数据流的 Channels
或 BufferBlock
/ActionBlock
。
使用频道的示例:
async Task Loop() {
await foreach (var func in channelReader.ReadAllAsync()) {
await func.Invoke();
}
}
或者如果您还没有使用 .NET Core:
async Task Loop() {
while (await channelReader.WaitToReadAsync()) {
while (channelReader.TryRead(out var func)) {
await func.Invoke();
}
}
}