对于 I/O 绑定任务,Parallel.ForEach 比 Task.WaitAll 快?

Parallel.ForEach faster than Task.WaitAll for I/O bound tasks?

我有两个版本的程序向 Web 服务器提交约 3000 个 HTTP GET 请求。

第一个版本 是基于我阅读的here。该解决方案对我来说很有意义,因为发出网络请求是 I/O 绑定工作,并且 async/await 与 Task.WhenAll 或 Task.WaitAll 一起使用意味着您可以提交 100 个请求一次然后等待它们全部完成,然后再提交下 100 个请求,这样您就不会使 Web 服务器陷入困境。我很惊讶地看到这个版本在大约 12 分钟内完成了所有工作——比我预期的要慢得多。

第二个版本 在 Parallel.ForEach 循环中提交所有 3000 个 HTTP GET 请求。我使用 .Result 等待每个请求完成,然后循环迭代中的其余逻辑才能执行。我认为这将是一个效率低得多的解决方案,因为使用线程并行执行任务通常更适合执行 CPU 绑定工作,但我很惊讶地看到这个版本完成了所有工作~3 分钟!

我的问题是为什么 Parallel.ForEach 版本更快?这是一个额外的惊喜,因为当我对 different API/web 服务器应用相同的两种技术时,我的代码的 version 1 是实际上比版本 2 快了大约 6 分钟——这是我所期望的。两个不同版本的性能是否与 Web 服务器处理流量的方式有关?

您可以在下面看到我的代码的简化版本:

private async Task<ObjectDetails> TryDeserializeResponse(HttpResponseMessage response)
{
    try
    {
        using (Stream stream = await response.Content.ReadAsStreamAsync())
        using (StreamReader readStream = new StreamReader(stream, Encoding.UTF8))
        using (JsonTextReader jsonTextReader = new JsonTextReader(readStream))
        {
            JsonSerializer serializer = new JsonSerializer();
            ObjectDetails objectDetails = serializer.Deserialize<ObjectDetails>(
                jsonTextReader);
            return objectDetails;
        }
    }
    catch (Exception e)
    {
        // Log exception
        return null;
    }
}

private async Task<HttpResponseMessage> TryGetResponse(string urlStr)
{
    try
    {
        HttpResponseMessage response = await httpClient.GetAsync(urlStr)
            .ConfigureAwait(false);
        if (response.StatusCode != HttpStatusCode.OK)
        {
            throw new WebException("Response code is "
                + response.StatusCode.ToString() + "... not 200 OK.");
        }
        return response;
    }
    catch (Exception e)
    {
        // Log exception
        return null;
    }
}

private async Task<ListOfObjects> GetObjectDetailsAsync(string baseUrl, int id)
{
    string urlStr = baseUrl + @"objects/id/" + id + "/details";

    HttpResponseMessage response = await TryGetResponse(urlStr);

    ObjectDetails objectDetails = await TryDeserializeResponse(response);

    return objectDetails;
}

// With ~3000 objects to retrieve, this code will create 100 API calls
// in parallel, wait for all 100 to finish, and then repeat that process
// ~30 times. In other words, there will be ~30 batches of 100 parallel
// API calls.
private Dictionary<int, Task<ObjectDetails>> GetAllObjectDetailsInBatches(
    string baseUrl, Dictionary<int, MyObject> incompleteObjects)
{
    int batchSize = 100;
    int numberOfBatches = (int)Math.Ceiling(
        (double)incompleteObjects.Count / batchSize);
    Dictionary<int, Task<ObjectDetails>> objectTaskDict
        = new Dictionary<int, Task<ObjectDetails>>(incompleteObjects.Count);

    var orderedIncompleteObjects = incompleteObjects.OrderBy(pair => pair.Key);

    for (int i = 0; i < 1; i++)
    {
        var batchOfObjects = orderedIncompleteObjects.Skip(i * batchSize)
            .Take(batchSize);
        var batchObjectsTaskList = batchOfObjects.Select(
            pair => GetObjectDetailsAsync(baseUrl, pair.Key));
        Task.WaitAll(batchObjectsTaskList.ToArray());
        foreach (var objTask in batchObjectsTaskList)
            objectTaskDict.Add(objTask.Result.id, objTask);
    }

    return objectTaskDict;
}

public void GetObjectsVersion1()
{
    string baseUrl = @"https://mywebserver.com:/api";

    // GetIncompleteObjects is not shown, but it is not relevant to
    // the question
    Dictionary<int, MyObject> incompleteObjects = GetIncompleteObjects();

    Dictionary<int, Task<ObjectDetails>> objectTaskDict
        = GetAllObjectDetailsInBatches(baseUrl, incompleteObjects);

    foreach (KeyValuePair<int, MyObject> pair in incompleteObjects)
    {
        ObjectDetails objectDetails = objectTaskDict[pair.Key].Result
            .objectDetails;

        // Code here that copies fields from objectDetails to pair.Value
        // (the incompleteObject)

        AllObjects.Add(pair.Value);
    };
}

public void GetObjectsVersion2()
{
    string baseUrl = @"https://mywebserver.com:/api";

    // GetIncompleteObjects is not shown, but it is not relevant to
    // the question
    Dictionary<int, MyObject> incompleteObjects = GetIncompleteObjects();

    Parallel.ForEach(incompleteHosts, pair =>
    {
        ObjectDetails objectDetails = GetObjectDetailsAsync(
            baseUrl, pair.Key).Result.objectDetails;

        // Code here that copies fields from objectDetails to pair.Value
        // (the incompleteObject)

        AllObjects.Add(pair.Value);
    });
}

https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreach?view=netframework-4.8

基本上,parralel foreach 允许并行迭代 运行,因此您不会将迭代限制为串行 运行,在不受线程约束的主机上,这往往会导致改进吞吐量

简而言之:

  • Parallel.Foreach() 对 CPU 绑定任务最有用。
  • Task.WaitAll() 对 IO 绑定任务更有用。

因此,在您的情况下,您是从网络服务器获取信息,即 IO。如果正确实现异步方法,它不会阻塞任何线程。 (它将使用 IO 完成端口等待) 这样线程就可以做其他事情了。

通过运行异步方法GetObjectDetailsAsync(baseUrl, pair.Key).Result同步,它会阻塞一个线程。所以线程池将被等待线程淹没。

所以我认为任务解决方案更适合。

Parallel.ForEach 可能 运行 更快的一个可能原因是它会产生节流的副作用。最初 x 个线程正在处理前 x 个元素(其中 x 是可用内核的数量),并且可以根据内部启发式逐渐添加更多线程。限制 IO 操作是一件好事,因为它可以保护网络和处理请求的服务器不至于负担过重。您通过以 100 个为一组发出请求的替代临时限制方法远非理想,原因有很多,其中之一是 100 个并发请求是很多请求!另一个是单个长 运行ning 操作可能会延迟批处理的完成,直到其他 99 个操作完成后很长时间。

请注意,Parallel.ForEach 也不适合并行化 IO 操作。它恰好比替代方案表现更好,一直在浪费内存。如需更好的方法,请查看此处:How to limit the amount of concurrent async I/O operations?