遍历异步方法的最佳方法是什么?

What is the best way to loop over async method?

我想知道循环异步方法的最佳方法是什么。 假设我有一个方法:

public async Task<bool> DownloadThenWriteThenReturnResult(string id)
{
    // async/await stuff....
}

假设我已经有一个名为“_myStrings”的参数的 10 000 个字符串列表,我想调用此方法 10 000 次。 我希望最多有 4 个线程来共享这项工作(在生产中我会使用 ProcessorCount - 1)。我希望能够取消一切。最后我想要每次调用的结果。 我想知道有什么区别,最好的方法是什么,为什么:

*1 -

var allTasks = _myStrings.Select(st =>DownloadThenWriteThenReturnResult(st));
bool[] syncSuccs = await Task.WhenAll(syncTasks);

*2 -

await Task.Run(() =>
{
    var result = new ConcurrentQueue<V>();
    var po = new ParallelOptions(){MaxDegreeOfParallelism = 4};
    Parallel.ForEach(_myStrings, po, (st) =>
    {
        result.Enqueue(DownloadThenWriteThenReturnResult(st).Result);
        po.CancellationToken.ThrowIfCancellationRequested();
    });
});

*3 -

using (SemaphoreSlim throttler = new SemaphoreSlim(initialCount: 4))
{
    var results = new List<bool>();
    var allTasks = new List<Task>();
    foreach (var st in _myStrings)
    {
        await throttler.WaitAsync();
        allTasks.Add(Task.Run(async () =>
        {
            try
            {
                results.Add(await DownloadThenWriteThenReturnResult(st));
            }
            finally
            {
                throttler.Release();
            }
        }));
    }
    await Task.WhenAll(allTasks);
}

*4 -

var block = new TransformBlock<string, bool>(
async st =>
{
    return await DownloadThenWriteThenReturnResult(st);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4});

foreach (var st in _myStrings)
{
    await block.SendAsync(st);
}

var results = new List<bool>();
foreach (var st in _myStrings)
{
    results.Add(await block.ReceiveAsync());
}

还有别的办法吗?这 4 个给了我类似的结果,而只有 *2、*3 和 *4 使用 4 个线程。 你能确认一下吗:

*4 似乎是我最好的镜头。很容易理解发生了什么,如果需要,我将能够创建新块并 link 它们。它似乎也完全异步。但我想了解 DownLoadThenWriteThenReturnResult 中所有 Async/Await 代码中的嵌套任务在哪里执行,以及这是否是最好的方法。

感谢任何提示!

我会尽力回答你所有的问题。

我的提议

首先,这就是我要做的。我尽量减少任务数量并保持代码简单。

您的问题看起来像是某种 producer/consumer 案例。我会选择像这样简单的东西:

public async Task Work(ConcurrentQueue<string> input, ConcurrentQueue<bool> output)
{
    string current;
    while (input.TryDequeue(out current))
    {
        output.Enqueue(await DownloadThenWriteThenReturnResult(current));
    }
}

var nbThread = 4;
var input = new ConcurrentQueue<string>(_myStrings);
var output = new ConcurrentQueue<bool>();

var workers = new List<Task>(nbThread);

for (int i = 0; i < nbThread; i++)
{
    workers.Add(Task.Run(async () => await this.Work(input, output)));
}

await Task.WhenAll(workers);

我不确定线程​​数与处理器数是否相关。如果您正在处理 CPU-绑定操作,这将是正确的。在这种情况下,您应该 运行 尽可能同步,因为系统引入的从一个上下文切换到另一个上下文的过载很重。所以在那种情况下,一个线程操作就是一种方式。

但在您的情况下,由于您大部分时间都在等待 I/O(用于 http 调用的网络、用于写入的磁盘等),您可能可以并行启动更多任务。每次任务等待 I/O 时,系统可以暂停它并切换到另一个任务。这里的重载并没有被浪费,因为另一方面线程会等待什么也不做。

您应该对 4、5、6 等任务进行基准测试,找出哪一项效率更高。

我在这里看到的一个问题是您不知道哪个输入产生了哪个输出。您可以使用 ConcurrentDictionary 而不是两个 ConcurrentQueue_myStrings.

中不能重复

你的解决方案

这是我对您的解决方案的看法。

解决方案*1

如您所说,它将创建 10 000 个任务。据我所知(但我不是该领域的专家),系统将在任务之间共享 ThreadPool 线程,应用一些 Round Robin 算法。我认为同一个任务甚至可以在第一个线程上开始执行,被系统暂停,然后在第二个线程上完成执行。这将引入不必要的开销,并导致整体 运行 时间变慢。

我认为这是绝对要避免的!

解*2

我读到并行 API 不能很好地处理异步操作。我也读过很多次 you don't want to call .Result 除非绝对需要。

所以我也会避免这个解决方案。

解*3

老实说,我无法想象这到底能做什么 ^^。这可能是一个很好的解决方案,因为您不是一次创建所有任务。无论如何,你仍然要创建 10 000 个任务,所以我会避免它。

解*4

老实说,我什至不知道这个 API,所以我无法真正发表评论。但是因为涉及到第三方库,所以尽量避免。