遍历异步方法的最佳方法是什么?
What is the best way to loop over async method?
我想知道循环异步方法的最佳方法是什么。
假设我有一个方法:
public async Task<bool> DownloadThenWriteThenReturnResult(string id)
{
// async/await stuff....
}
假设我已经有一个名为“_myStrings”的参数的 10 000 个字符串列表,我想调用此方法 10 000 次。
我希望最多有 4 个线程来共享这项工作(在生产中我会使用 ProcessorCount - 1)。我希望能够取消一切。最后我想要每次调用的结果。
我想知道有什么区别,最好的方法是什么,为什么:
*1 -
var allTasks = _myStrings.Select(st =>DownloadThenWriteThenReturnResult(st));
bool[] syncSuccs = await Task.WhenAll(syncTasks);
*2 -
await Task.Run(() =>
{
var result = new ConcurrentQueue<V>();
var po = new ParallelOptions(){MaxDegreeOfParallelism = 4};
Parallel.ForEach(_myStrings, po, (st) =>
{
result.Enqueue(DownloadThenWriteThenReturnResult(st).Result);
po.CancellationToken.ThrowIfCancellationRequested();
});
});
*3 -
using (SemaphoreSlim throttler = new SemaphoreSlim(initialCount: 4))
{
var results = new List<bool>();
var allTasks = new List<Task>();
foreach (var st in _myStrings)
{
await throttler.WaitAsync();
allTasks.Add(Task.Run(async () =>
{
try
{
results.Add(await DownloadThenWriteThenReturnResult(st));
}
finally
{
throttler.Release();
}
}));
}
await Task.WhenAll(allTasks);
}
*4 -
var block = new TransformBlock<string, bool>(
async st =>
{
return await DownloadThenWriteThenReturnResult(st);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4});
foreach (var st in _myStrings)
{
await block.SendAsync(st);
}
var results = new List<bool>();
foreach (var st in _myStrings)
{
results.Add(await block.ReceiveAsync());
}
还有别的办法吗?这 4 个给了我类似的结果,而只有 *2、*3 和 *4 使用 4 个线程。
你能确认一下吗:
*1 在threadpool线程上创建10000个任务但只会在一个线程中执行
*2 将创建 4 个线程 T1 T2 T3 和 T4。它使用 .Result 因此它不是一直异步的(我应该在这里避免这种情况吗?)。由于 DownloadThenWriteThenReturnResult 是在 4 个线程 T1 T2 T3 或 T4 之一中执行的,
嵌套任务放在哪里(嵌套任务是指每个异步方法在等待时将 return 是什么)?在专用线程池线程中(比如说 T11 T21 T31 和 T41)?
*3 和 *4 的问题相同
*4 似乎是我最好的镜头。很容易理解发生了什么,如果需要,我将能够创建新块并 link 它们。它似乎也完全异步。但我想了解 DownLoadThenWriteThenReturnResult 中所有 Async/Await 代码中的嵌套任务在哪里执行,以及这是否是最好的方法。
感谢任何提示!
我会尽力回答你所有的问题。
我的提议
首先,这就是我要做的。我尽量减少任务数量并保持代码简单。
您的问题看起来像是某种 producer/consumer 案例。我会选择像这样简单的东西:
public async Task Work(ConcurrentQueue<string> input, ConcurrentQueue<bool> output)
{
string current;
while (input.TryDequeue(out current))
{
output.Enqueue(await DownloadThenWriteThenReturnResult(current));
}
}
var nbThread = 4;
var input = new ConcurrentQueue<string>(_myStrings);
var output = new ConcurrentQueue<bool>();
var workers = new List<Task>(nbThread);
for (int i = 0; i < nbThread; i++)
{
workers.Add(Task.Run(async () => await this.Work(input, output)));
}
await Task.WhenAll(workers);
我不确定线程数与处理器数是否相关。如果您正在处理 CPU-绑定操作,这将是正确的。在这种情况下,您应该 运行 尽可能同步,因为系统引入的从一个上下文切换到另一个上下文的过载很重。所以在那种情况下,一个线程操作就是一种方式。
但在您的情况下,由于您大部分时间都在等待 I/O(用于 http 调用的网络、用于写入的磁盘等),您可能可以并行启动更多任务。每次任务等待 I/O 时,系统可以暂停它并切换到另一个任务。这里的重载并没有被浪费,因为另一方面线程会等待什么也不做。
您应该对 4、5、6 等任务进行基准测试,找出哪一项效率更高。
我在这里看到的一个问题是您不知道哪个输入产生了哪个输出。您可以使用 ConcurrentDictionary
而不是两个 ConcurrentQueue
但 _myStrings
.
中不能重复
你的解决方案
这是我对您的解决方案的看法。
解决方案*1
如您所说,它将创建 10 000 个任务。据我所知(但我不是该领域的专家),系统将在任务之间共享 ThreadPool 线程,应用一些 Round Robin 算法。我认为同一个任务甚至可以在第一个线程上开始执行,被系统暂停,然后在第二个线程上完成执行。这将引入不必要的开销,并导致整体 运行 时间变慢。
我认为这是绝对要避免的!
解*2
我读到并行 API 不能很好地处理异步操作。我也读过很多次 you don't want to call .Result
除非绝对需要。
所以我也会避免这个解决方案。
解*3
老实说,我无法想象这到底能做什么 ^^。这可能是一个很好的解决方案,因为您不是一次创建所有任务。无论如何,你仍然要创建 10 000 个任务,所以我会避免它。
解*4
老实说,我什至不知道这个 API,所以我无法真正发表评论。但是因为涉及到第三方库,所以尽量避免。
我想知道循环异步方法的最佳方法是什么。 假设我有一个方法:
public async Task<bool> DownloadThenWriteThenReturnResult(string id)
{
// async/await stuff....
}
假设我已经有一个名为“_myStrings”的参数的 10 000 个字符串列表,我想调用此方法 10 000 次。 我希望最多有 4 个线程来共享这项工作(在生产中我会使用 ProcessorCount - 1)。我希望能够取消一切。最后我想要每次调用的结果。 我想知道有什么区别,最好的方法是什么,为什么:
*1 -
var allTasks = _myStrings.Select(st =>DownloadThenWriteThenReturnResult(st));
bool[] syncSuccs = await Task.WhenAll(syncTasks);
*2 -
await Task.Run(() =>
{
var result = new ConcurrentQueue<V>();
var po = new ParallelOptions(){MaxDegreeOfParallelism = 4};
Parallel.ForEach(_myStrings, po, (st) =>
{
result.Enqueue(DownloadThenWriteThenReturnResult(st).Result);
po.CancellationToken.ThrowIfCancellationRequested();
});
});
*3 -
using (SemaphoreSlim throttler = new SemaphoreSlim(initialCount: 4))
{
var results = new List<bool>();
var allTasks = new List<Task>();
foreach (var st in _myStrings)
{
await throttler.WaitAsync();
allTasks.Add(Task.Run(async () =>
{
try
{
results.Add(await DownloadThenWriteThenReturnResult(st));
}
finally
{
throttler.Release();
}
}));
}
await Task.WhenAll(allTasks);
}
*4 -
var block = new TransformBlock<string, bool>(
async st =>
{
return await DownloadThenWriteThenReturnResult(st);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4});
foreach (var st in _myStrings)
{
await block.SendAsync(st);
}
var results = new List<bool>();
foreach (var st in _myStrings)
{
results.Add(await block.ReceiveAsync());
}
还有别的办法吗?这 4 个给了我类似的结果,而只有 *2、*3 和 *4 使用 4 个线程。 你能确认一下吗:
*1 在threadpool线程上创建10000个任务但只会在一个线程中执行
*2 将创建 4 个线程 T1 T2 T3 和 T4。它使用 .Result 因此它不是一直异步的(我应该在这里避免这种情况吗?)。由于 DownloadThenWriteThenReturnResult 是在 4 个线程 T1 T2 T3 或 T4 之一中执行的, 嵌套任务放在哪里(嵌套任务是指每个异步方法在等待时将 return 是什么)?在专用线程池线程中(比如说 T11 T21 T31 和 T41)?
*3 和 *4 的问题相同
*4 似乎是我最好的镜头。很容易理解发生了什么,如果需要,我将能够创建新块并 link 它们。它似乎也完全异步。但我想了解 DownLoadThenWriteThenReturnResult 中所有 Async/Await 代码中的嵌套任务在哪里执行,以及这是否是最好的方法。
感谢任何提示!
我会尽力回答你所有的问题。
我的提议
首先,这就是我要做的。我尽量减少任务数量并保持代码简单。
您的问题看起来像是某种 producer/consumer 案例。我会选择像这样简单的东西:
public async Task Work(ConcurrentQueue<string> input, ConcurrentQueue<bool> output)
{
string current;
while (input.TryDequeue(out current))
{
output.Enqueue(await DownloadThenWriteThenReturnResult(current));
}
}
var nbThread = 4;
var input = new ConcurrentQueue<string>(_myStrings);
var output = new ConcurrentQueue<bool>();
var workers = new List<Task>(nbThread);
for (int i = 0; i < nbThread; i++)
{
workers.Add(Task.Run(async () => await this.Work(input, output)));
}
await Task.WhenAll(workers);
我不确定线程数与处理器数是否相关。如果您正在处理 CPU-绑定操作,这将是正确的。在这种情况下,您应该 运行 尽可能同步,因为系统引入的从一个上下文切换到另一个上下文的过载很重。所以在那种情况下,一个线程操作就是一种方式。
但在您的情况下,由于您大部分时间都在等待 I/O(用于 http 调用的网络、用于写入的磁盘等),您可能可以并行启动更多任务。每次任务等待 I/O 时,系统可以暂停它并切换到另一个任务。这里的重载并没有被浪费,因为另一方面线程会等待什么也不做。
您应该对 4、5、6 等任务进行基准测试,找出哪一项效率更高。
我在这里看到的一个问题是您不知道哪个输入产生了哪个输出。您可以使用 ConcurrentDictionary
而不是两个 ConcurrentQueue
但 _myStrings
.
你的解决方案
这是我对您的解决方案的看法。
解决方案*1
如您所说,它将创建 10 000 个任务。据我所知(但我不是该领域的专家),系统将在任务之间共享 ThreadPool 线程,应用一些 Round Robin 算法。我认为同一个任务甚至可以在第一个线程上开始执行,被系统暂停,然后在第二个线程上完成执行。这将引入不必要的开销,并导致整体 运行 时间变慢。
我认为这是绝对要避免的!
解*2
我读到并行 API 不能很好地处理异步操作。我也读过很多次 you don't want to call .Result
除非绝对需要。
所以我也会避免这个解决方案。
解*3
老实说,我无法想象这到底能做什么 ^^。这可能是一个很好的解决方案,因为您不是一次创建所有任务。无论如何,你仍然要创建 10 000 个任务,所以我会避免它。
解*4
老实说,我什至不知道这个 API,所以我无法真正发表评论。但是因为涉及到第三方库,所以尽量避免。