将每个异步结果存储在它自己的数组元素中

Storing each async result in its own array element

假设我想从网站下载 1000 个食谱。这些网站最多接受 10 个并发连接。 每个食谱都应该存储在一个数组中,在其相应的索引处。(我不想将数组发送到 DownloadRecipe 方法。)

从技术上讲,我已经解决了这个问题,但我想知道是否有更简洁的方法来使用 async/await 或其他方法来实现它?

    static async Task MainAsync()
    {
        int recipeCount = 1000;
        int connectionCount = 10;
        string[] recipes = new string[recipeCount];
        Task<string>[] tasks = new Task<string>[connectionCount];
        int r = 0;

        while (r < recipeCount)
        {
            for (int t = 0; t < tasks.Length; t++)
            {
                tasks[t] = Task.Run(async () => recipes[r] = await DownloadRecipe(r));
                r++;
            }

            await Task.WhenAll(tasks);
        }
    }

    static async Task<string> DownloadRecipe(int index)
    {
        // ... await calls to download recipe
    }

此外,此解决方案也不是最佳解决方案,因为在完成所有 10 运行 次下载之前,它不会费心开始新的下载。有什么我们可以在不使代码膨胀太多的情况下改进的地方吗?线程池限制为 10 个线程?

有很多方法可以做到这一点。一种方法是使用 ActionBlock,它可以让您相当轻松地访问 MaxDegreeOfParallelism,并且可以很好地与 async 方法一起使用

static async Task MainAsync()
{
   var recipeCount = 1000;
   var connectionCount = 10;
   var recipes = new string[recipeCount];

   async Task Action(int i) => recipes[i] = await DownloadRecipe(i);
   
   var processor = new ActionBlock<int>(Action, new ExecutionDataflowBlockOptions()
   {
      MaxDegreeOfParallelism = connectionCount,
      SingleProducerConstrained = true
   });

   for (var i = 0; i < recipeCount; i++)
      await processor.SendAsync(i);

   processor.Complete();
   await processor.Completion;
}

static async Task<string> DownloadRecipe(int index)
{
   ...
}

另一种方法可能是使用 SemaphoreSlim

var slim = new SemaphoreSlim(connectionCount, connectionCount);

var tasks = Enumerable
   .Range(0, recipeCount)
   .Select(Selector);
   
async Task<string> Selector(int i)
{
   await slim.WaitAsync()
   try
   {
      return await DownloadRecipe(i)
   }
   finally
   {
      slim.Release();
   }
}

var recipes = await Task.WhenAll(tasks);

另一组方法是使用 Reactive Extensions (Rx)...同样有很多方法可以做到这一点,这只是一种 awaitable 方法(并且可能综合考虑会更好)

var results = await Enumerable
        .Range(0, recipeCount)
        .ToObservable()
        .Select(i => Observable.FromAsync(() => DownloadRecipe(i)))
        .Merge(connectionCount)
        .ToArray()
        .ToTask();

有 10 个“池”的替代方法将“同时”加载数据。

您不需要用单独的线程包装 IO 操作。使用单独的线程进行IO操作只是浪费资源
请注意,下载数据的线程什么都不做,只是等待响应。这是 async-await 方法非常方便的地方 - 我们可以发送多个请求而无需等待它们完成并且不会浪费线程。

static async Task MainAsync()
{
    var requests = Enumerable.Range(0, 1000).ToArray();
    var maxConnections = 10;
    var pools = requests
        .GroupBy(i => i % maxConnections)
        .Select(group => DownloadRecipesFor(group.ToArray()))
        .ToArray();

    await Task.WhenAll(pools);

    var recipes = pools.SelectMany(pool => pool.Result).ToArray();
}

static async Task<IEnumerable<string>> DownLoadRecipesFor(params int[] requests)
{
    var recipes = new List<string>();
    foreach (var request in requests)
    {
        var recipe = await DownloadRecipe(request);
        recipes.Add(recipe);
    }

    return recipes;
}

因为在池中(DownloadRecipesFor 方法)我们一个一个地下载结果 - 我们确保始终有不超过 10 个活动请求。

这比原来的要有效一点,因为我们不会等到 10 个任务完成后再开始下一个“bunch”。
这并不理想,因为如果最后一个“池”比其他“池”提前完成,它就无法处理下一个请求。

最终结果会有相应的索引,因为我们将按照创建它们的顺序处理“池”和内部请求。