是否可以在 .NET 6.0 中限制 Parallel.ForEachAsync 以避免速率限制?
Is it possible to throttle Parallel.ForEachAsync in .NET 6.0 to avoid rate limiting?
我对编程还很陌生(< 3 年经验),所以我对这个 post 中的主题没有很好的理解。请多多包涵。
我的团队正在开发与第三方系统的集成,第三方端点之一缺乏有意义的方法来获取符合条件的实体列表。
我们一直在通过遍历请求集合并将每个等待调用的结果添加到列表中来获取这些实体。这工作得很好,但是获取实体比从其他端点获取实体要花费更长的时间,这让我们可以通过提供 ID 列表来获取实体列表。
.NET 6.0 引入了 Parallel.ForEachAsync(),它让我们可以并行异步执行多个可等待的任务。
例如:
public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(List<IRestRequest> requests)
where TEntity : IEntity
{
var entities = new ConcurrentBag<TEntity>();
// Create a function that takes a RestRequest and returns the
// result of the request's execution, for each request
var requestExecutionTasks = requests.Select(i =>
new Func<Task<TEntity>>(() => GetAsync<TEntity>(i)));
// Execute each of the functions asynchronously in parallel,
// and add the results to the aggregate as they come in
await Parallel.ForEachAsync(requestExecutionTasks, new ParallelOptions
{
// This lets us limit the number of threads to use. -1 is unlimited
MaxDegreeOfParallelism = -1
}, async (func, _) => entities.Add(await func()));
return entities.ToList();
}
使用此代码而不是简单的 foreach 循环加快了在我的测试实例上获取 ~30 个实体所需的时间,平均缩短了 91%。棒极了。但是,我们担心当我们在可能有数千个实体的客户端系统上使用它时可能会出现速率限制。我们有一个系统可以检测到来自他们 API 的“你的速率受限”消息,并在重试之前提示请求一秒钟左右,但这并不是一个很好的解决方案,因为它是一项安全措施。
如果我们只是循环请求,我们可以通过在循环的每次迭代中执行类似 await Task.Delay(minimumDelay)
的操作来限制调用。如果我错了,请纠正我,但据我了解,这在并行执行请求时实际上不起作用 foreach,因为它会使所有请求在执行前等待相同的时间。有没有办法让每个单独的请求在执行前等待一定的时间,只有当我们接近速率限制时?如果可能的话,我想在不限制要使用的线程数的情况下执行此操作。
编辑
我想让这个问题搁置一点,以便更多人可以回答。由于没有添加新的答案或评论,我将我得到的一个答案标记为正确。话虽如此,答案暗示了一种与使用 Parallel.ForEachAsync.
不同的方法
如果我正确理解了当前的答案,那么我最初提出的关于是否可以节流 Parallel.ForEachAsync 的问题的答案将是: “不,它是不是。
我的建议是放弃 Parallel.ForEachAsync
方法,改用新的 Chunk
LINQ operator in combination with the Task.WhenAll
方法。您可以像这样每秒启动 100 个异步操作:
public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(
List<IRestRequest> requests) where TEntity : IEntity
{
var tasks = new List<Task<TEntity>>();
foreach (var chunk in requests.Chunk(100))
{
tasks.AddRange(chunk.Select(request => GetAsync<TEntity>(request)));
await Task.Delay(TimeSpan.FromSeconds(1.0));
}
return (await Task.WhenAll(tasks)).ToList();
}
假定启动异步操作(调用 GetAsync
方法)所需的时间可以忽略不计。
这种方法有一个固有的缺点,即在发生异常的情况下,在 所有 操作完成之前不会传播故障。为了比较,Parallel.ForEachAsync
方法在检测到第一次失败后停止调用异步委托并尽快完成。
我对编程还很陌生(< 3 年经验),所以我对这个 post 中的主题没有很好的理解。请多多包涵。
我的团队正在开发与第三方系统的集成,第三方端点之一缺乏有意义的方法来获取符合条件的实体列表。
我们一直在通过遍历请求集合并将每个等待调用的结果添加到列表中来获取这些实体。这工作得很好,但是获取实体比从其他端点获取实体要花费更长的时间,这让我们可以通过提供 ID 列表来获取实体列表。
.NET 6.0 引入了 Parallel.ForEachAsync(),它让我们可以并行异步执行多个可等待的任务。
例如:
public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(List<IRestRequest> requests)
where TEntity : IEntity
{
var entities = new ConcurrentBag<TEntity>();
// Create a function that takes a RestRequest and returns the
// result of the request's execution, for each request
var requestExecutionTasks = requests.Select(i =>
new Func<Task<TEntity>>(() => GetAsync<TEntity>(i)));
// Execute each of the functions asynchronously in parallel,
// and add the results to the aggregate as they come in
await Parallel.ForEachAsync(requestExecutionTasks, new ParallelOptions
{
// This lets us limit the number of threads to use. -1 is unlimited
MaxDegreeOfParallelism = -1
}, async (func, _) => entities.Add(await func()));
return entities.ToList();
}
使用此代码而不是简单的 foreach 循环加快了在我的测试实例上获取 ~30 个实体所需的时间,平均缩短了 91%。棒极了。但是,我们担心当我们在可能有数千个实体的客户端系统上使用它时可能会出现速率限制。我们有一个系统可以检测到来自他们 API 的“你的速率受限”消息,并在重试之前提示请求一秒钟左右,但这并不是一个很好的解决方案,因为它是一项安全措施。
如果我们只是循环请求,我们可以通过在循环的每次迭代中执行类似 await Task.Delay(minimumDelay)
的操作来限制调用。如果我错了,请纠正我,但据我了解,这在并行执行请求时实际上不起作用 foreach,因为它会使所有请求在执行前等待相同的时间。有没有办法让每个单独的请求在执行前等待一定的时间,只有当我们接近速率限制时?如果可能的话,我想在不限制要使用的线程数的情况下执行此操作。
编辑
我想让这个问题搁置一点,以便更多人可以回答。由于没有添加新的答案或评论,我将我得到的一个答案标记为正确。话虽如此,答案暗示了一种与使用 Parallel.ForEachAsync.
不同的方法如果我正确理解了当前的答案,那么我最初提出的关于是否可以节流 Parallel.ForEachAsync 的问题的答案将是: “不,它是不是。
我的建议是放弃 Parallel.ForEachAsync
方法,改用新的 Chunk
LINQ operator in combination with the Task.WhenAll
方法。您可以像这样每秒启动 100 个异步操作:
public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(
List<IRestRequest> requests) where TEntity : IEntity
{
var tasks = new List<Task<TEntity>>();
foreach (var chunk in requests.Chunk(100))
{
tasks.AddRange(chunk.Select(request => GetAsync<TEntity>(request)));
await Task.Delay(TimeSpan.FromSeconds(1.0));
}
return (await Task.WhenAll(tasks)).ToList();
}
假定启动异步操作(调用 GetAsync
方法)所需的时间可以忽略不计。
这种方法有一个固有的缺点,即在发生异常的情况下,在 所有 操作完成之前不会传播故障。为了比较,Parallel.ForEachAsync
方法在检测到第一次失败后停止调用异步委托并尽快完成。