我们如何管理对 Parallel.ForEach 的异步调用?

How we can manage async calls into Parallel.ForEach?

我是 异步 编程的新手,我使用以下代码从第三方 API 收集数据,每次我都得到不同的响应。我做错方法了吗?

Parallel.ForEach(products, item =>{
 GetProductsInfo(item);
});



public async Task<Product> GetProductsInfo(Product product)
{
    var restClientProduct = new RestClient("URL");
    var restRequestProduct = new RestRequest(Method.POST);
    var proudctRequestJson = JsonConvert.SerializeObject(new ProudctRequest()
    {
        product_code = product.product_code,

    });
    restRequestProduct.AddHeader("cache-control", "no-cache");
    restRequestProduct.AddHeader("Content-Length", proudctRequestJson.Count().ToString());
    restRequestProduct.AddHeader("Content-Type", "application/json");
    restRequestProduct.AddHeader("Accept", "application/json");
    restRequestProduct.AddParameter("undefined", proudctRequestJson, ParameterType.RequestBody);
    var responseProduct = GetResponseContentAsync(restClientProduct, restRequestProduct).Result;
    if (responseProduct.StatusCode == HttpStatusCode.OK)
    {
        // set values form the responseProduct to the product
    }
    return product;
}

private Task<IRestResponse> GetResponseContentAsync(RestClient theClient, RestRequest theRequest)
    {
        var tcs = new TaskCompletionSource<IRestResponse>();
        theClient.ExecuteAsync(theRequest, response =>
        {
            tcs.SetResult(response);
        });
        return tcs.Task;
    }

您向我们展示的部分代码并非 运行 异步。您正在 GetResponseContentAsync() 上呼叫 .Resultwill block the thread until it finishes。这意味着到 Parallel.ForEach 完成时,所有 HTTP 请求都将完成。

如果您在该代码块中的某处使用 await,则您将其替换为

// set values form the responseProduct to the product

那么可能 Parallel.ForEach 完成之前 没有报告结果。那是因为Parallel.ForEach不支持异步代码,所以不会等他们完成。

我们假设 GetProductsInfo 实际上是 运行 异步

那么问题是:Parellel.ForEach 没有等待我的异步操作完成。有几种方法可以解决这个问题。

  1. 实施您自己的 ForEachAsync。这已被要求,并且可能最终会添加(至少添加到 .NET Core)。但实际上 the issue where this was requested:
  2. 中有一个示例实现
/// <summary>
///     Executes a foreach asynchronously.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source">The source.</param>
/// <param name="dop">The degrees of parallelism.</param>
/// <param name="body">The body.</param>
/// <returns></returns>
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in System.Collections.Concurrent.Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate
        {
            using (partition)
            {
                while (partition.MoveNext())
                    await body(partition.Current);
            }
        }));
}

这是作为扩展方法编写的,因此您可以这样使用它:

await products.ForEachAsync(10, GetProductsInfo);

其中 10 是您希望一次 运行 的请求数。

  1. 您可以使用类似的东西:
Task.WaitAll(items.Select(i => GetProductsInfo(i));

这将 运行 异步请求,但会阻塞调用线程,直到它们全部完成。或者,您可以 await 它们,这样它就不会阻塞调用线程:

await Task.WhenAll(items.Select(i => GetProductsInfo(i))

但是,这两种方法都会立即触发 所有 请求。如果你知道你只会有一小部分,那很好。但是,如果您的数量可能非常大,则可能会淹没 Web 服务。使用 Parallel.ForEach 或上面 ForEachAsync 的实现将以块的形式发送它们。

如果您使用这些方法中的任何一种来等待响应,那么您真的应该等待 GetResponseContentAsync 而不是使用 .Result:

var responseProduct = await GetResponseContentAsync(restClientProduct, restRequestProduct);

使用 async/await 在 ASP.NET 中尤为重要,因为它可以使用最大线程数。