我们如何管理对 Parallel.ForEach 的异步调用?
How we can manage async calls into Parallel.ForEach?
我是 异步 编程的新手,我使用以下代码从第三方 API 收集数据,每次我都得到不同的响应。我做错方法了吗?
Parallel.ForEach(products, item =>{
GetProductsInfo(item);
});
public async Task<Product> GetProductsInfo(Product product)
{
var restClientProduct = new RestClient("URL");
var restRequestProduct = new RestRequest(Method.POST);
var proudctRequestJson = JsonConvert.SerializeObject(new ProudctRequest()
{
product_code = product.product_code,
});
restRequestProduct.AddHeader("cache-control", "no-cache");
restRequestProduct.AddHeader("Content-Length", proudctRequestJson.Count().ToString());
restRequestProduct.AddHeader("Content-Type", "application/json");
restRequestProduct.AddHeader("Accept", "application/json");
restRequestProduct.AddParameter("undefined", proudctRequestJson, ParameterType.RequestBody);
var responseProduct = GetResponseContentAsync(restClientProduct, restRequestProduct).Result;
if (responseProduct.StatusCode == HttpStatusCode.OK)
{
// set values form the responseProduct to the product
}
return product;
}
private Task<IRestResponse> GetResponseContentAsync(RestClient theClient, RestRequest theRequest)
{
var tcs = new TaskCompletionSource<IRestResponse>();
theClient.ExecuteAsync(theRequest, response =>
{
tcs.SetResult(response);
});
return tcs.Task;
}
您向我们展示的部分代码并非 运行 异步。您正在 GetResponseContentAsync()
上呼叫 .Result
,will block the thread until it finishes。这意味着到 Parallel.ForEach
完成时,所有 HTTP 请求都将完成。
如果您在该代码块中的某处使用 await
,则您将其替换为
// set values form the responseProduct to the product
那么可能 在 Parallel.ForEach
完成之前 没有报告结果。那是因为Parallel.ForEach
不支持异步代码,所以不会等他们完成。
我们假设 GetProductsInfo
实际上是 运行 异步
那么问题是:Parellel.ForEach
没有等待我的异步操作完成。有几种方法可以解决这个问题。
- 实施您自己的
ForEachAsync
。这已被要求,并且可能最终会添加(至少添加到 .NET Core)。但实际上 the issue where this was requested: 中有一个示例实现
/// <summary>
/// Executes a foreach asynchronously.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source">The source.</param>
/// <param name="dop">The degrees of parallelism.</param>
/// <param name="body">The body.</param>
/// <returns></returns>
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in System.Collections.Concurrent.Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
{
while (partition.MoveNext())
await body(partition.Current);
}
}));
}
这是作为扩展方法编写的,因此您可以这样使用它:
await products.ForEachAsync(10, GetProductsInfo);
其中 10
是您希望一次 运行 的请求数。
- 您可以使用类似的东西:
Task.WaitAll(items.Select(i => GetProductsInfo(i));
这将 运行 异步请求,但会阻塞调用线程,直到它们全部完成。或者,您可以 await
它们,这样它就不会阻塞调用线程:
await Task.WhenAll(items.Select(i => GetProductsInfo(i))
但是,这两种方法都会立即触发 所有 请求。如果你知道你只会有一小部分,那很好。但是,如果您的数量可能非常大,则可能会淹没 Web 服务。使用 Parallel.ForEach
或上面 ForEachAsync
的实现将以块的形式发送它们。
如果您使用这些方法中的任何一种来等待响应,那么您真的应该等待 GetResponseContentAsync
而不是使用 .Result
:
var responseProduct = await GetResponseContentAsync(restClientProduct, restRequestProduct);
使用 async
/await
在 ASP.NET 中尤为重要,因为它可以使用最大线程数。
我是 异步 编程的新手,我使用以下代码从第三方 API 收集数据,每次我都得到不同的响应。我做错方法了吗?
Parallel.ForEach(products, item =>{
GetProductsInfo(item);
});
public async Task<Product> GetProductsInfo(Product product)
{
var restClientProduct = new RestClient("URL");
var restRequestProduct = new RestRequest(Method.POST);
var proudctRequestJson = JsonConvert.SerializeObject(new ProudctRequest()
{
product_code = product.product_code,
});
restRequestProduct.AddHeader("cache-control", "no-cache");
restRequestProduct.AddHeader("Content-Length", proudctRequestJson.Count().ToString());
restRequestProduct.AddHeader("Content-Type", "application/json");
restRequestProduct.AddHeader("Accept", "application/json");
restRequestProduct.AddParameter("undefined", proudctRequestJson, ParameterType.RequestBody);
var responseProduct = GetResponseContentAsync(restClientProduct, restRequestProduct).Result;
if (responseProduct.StatusCode == HttpStatusCode.OK)
{
// set values form the responseProduct to the product
}
return product;
}
private Task<IRestResponse> GetResponseContentAsync(RestClient theClient, RestRequest theRequest)
{
var tcs = new TaskCompletionSource<IRestResponse>();
theClient.ExecuteAsync(theRequest, response =>
{
tcs.SetResult(response);
});
return tcs.Task;
}
您向我们展示的部分代码并非 运行 异步。您正在 GetResponseContentAsync()
上呼叫 .Result
,will block the thread until it finishes。这意味着到 Parallel.ForEach
完成时,所有 HTTP 请求都将完成。
如果您在该代码块中的某处使用 await
,则您将其替换为
// set values form the responseProduct to the product
那么可能 在 Parallel.ForEach
完成之前 没有报告结果。那是因为Parallel.ForEach
不支持异步代码,所以不会等他们完成。
我们假设 GetProductsInfo
实际上是 运行 异步
那么问题是:Parellel.ForEach
没有等待我的异步操作完成。有几种方法可以解决这个问题。
- 实施您自己的
ForEachAsync
。这已被要求,并且可能最终会添加(至少添加到 .NET Core)。但实际上 the issue where this was requested: 中有一个示例实现
/// <summary>
/// Executes a foreach asynchronously.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source">The source.</param>
/// <param name="dop">The degrees of parallelism.</param>
/// <param name="body">The body.</param>
/// <returns></returns>
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in System.Collections.Concurrent.Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
{
while (partition.MoveNext())
await body(partition.Current);
}
}));
}
这是作为扩展方法编写的,因此您可以这样使用它:
await products.ForEachAsync(10, GetProductsInfo);
其中 10
是您希望一次 运行 的请求数。
- 您可以使用类似的东西:
Task.WaitAll(items.Select(i => GetProductsInfo(i));
这将 运行 异步请求,但会阻塞调用线程,直到它们全部完成。或者,您可以 await
它们,这样它就不会阻塞调用线程:
await Task.WhenAll(items.Select(i => GetProductsInfo(i))
但是,这两种方法都会立即触发 所有 请求。如果你知道你只会有一小部分,那很好。但是,如果您的数量可能非常大,则可能会淹没 Web 服务。使用 Parallel.ForEach
或上面 ForEachAsync
的实现将以块的形式发送它们。
如果您使用这些方法中的任何一种来等待响应,那么您真的应该等待 GetResponseContentAsync
而不是使用 .Result
:
var responseProduct = await GetResponseContentAsync(restClientProduct, restRequestProduct);
使用 async
/await
在 ASP.NET 中尤为重要,因为它可以使用最大线程数。