将每个异步结果存储在它自己的数组元素中
Storing each async result in its own array element
假设我想从网站下载 1000 个食谱。这些网站最多接受 10 个并发连接。 每个食谱都应该存储在一个数组中,在其相应的索引处。(我不想将数组发送到 DownloadRecipe
方法。)
从技术上讲,我已经解决了这个问题,但我想知道是否有更简洁的方法来使用 async/await 或其他方法来实现它?
static async Task MainAsync()
{
int recipeCount = 1000;
int connectionCount = 10;
string[] recipes = new string[recipeCount];
Task<string>[] tasks = new Task<string>[connectionCount];
int r = 0;
while (r < recipeCount)
{
for (int t = 0; t < tasks.Length; t++)
{
tasks[t] = Task.Run(async () => recipes[r] = await DownloadRecipe(r));
r++;
}
await Task.WhenAll(tasks);
}
}
static async Task<string> DownloadRecipe(int index)
{
// ... await calls to download recipe
}
此外,此解决方案也不是最佳解决方案,因为在完成所有 10 运行 次下载之前,它不会费心开始新的下载。有什么我们可以在不使代码膨胀太多的情况下改进的地方吗?线程池限制为 10 个线程?
有很多方法可以做到这一点。一种方法是使用 ActionBlock
,它可以让您相当轻松地访问 MaxDegreeOfParallelism
,并且可以很好地与 async
方法一起使用
static async Task MainAsync()
{
var recipeCount = 1000;
var connectionCount = 10;
var recipes = new string[recipeCount];
async Task Action(int i) => recipes[i] = await DownloadRecipe(i);
var processor = new ActionBlock<int>(Action, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = connectionCount,
SingleProducerConstrained = true
});
for (var i = 0; i < recipeCount; i++)
await processor.SendAsync(i);
processor.Complete();
await processor.Completion;
}
static async Task<string> DownloadRecipe(int index)
{
...
}
另一种方法可能是使用 SemaphoreSlim
var slim = new SemaphoreSlim(connectionCount, connectionCount);
var tasks = Enumerable
.Range(0, recipeCount)
.Select(Selector);
async Task<string> Selector(int i)
{
await slim.WaitAsync()
try
{
return await DownloadRecipe(i)
}
finally
{
slim.Release();
}
}
var recipes = await Task.WhenAll(tasks);
另一组方法是使用 Reactive Extensions (Rx)...同样有很多方法可以做到这一点,这只是一种 awaitable 方法(并且可能综合考虑会更好)
var results = await Enumerable
.Range(0, recipeCount)
.ToObservable()
.Select(i => Observable.FromAsync(() => DownloadRecipe(i)))
.Merge(connectionCount)
.ToArray()
.ToTask();
有 10 个“池”的替代方法将“同时”加载数据。
您不需要用单独的线程包装 IO 操作。使用单独的线程进行IO操作只是浪费资源
请注意,下载数据的线程什么都不做,只是等待响应。这是 async-await
方法非常方便的地方 - 我们可以发送多个请求而无需等待它们完成并且不会浪费线程。
static async Task MainAsync()
{
var requests = Enumerable.Range(0, 1000).ToArray();
var maxConnections = 10;
var pools = requests
.GroupBy(i => i % maxConnections)
.Select(group => DownloadRecipesFor(group.ToArray()))
.ToArray();
await Task.WhenAll(pools);
var recipes = pools.SelectMany(pool => pool.Result).ToArray();
}
static async Task<IEnumerable<string>> DownLoadRecipesFor(params int[] requests)
{
var recipes = new List<string>();
foreach (var request in requests)
{
var recipe = await DownloadRecipe(request);
recipes.Add(recipe);
}
return recipes;
}
因为在池中(DownloadRecipesFor
方法)我们一个一个地下载结果 - 我们确保始终有不超过 10 个活动请求。
这比原来的要有效一点,因为我们不会等到 10 个任务完成后再开始下一个“bunch”。
这并不理想,因为如果最后一个“池”比其他“池”提前完成,它就无法处理下一个请求。
最终结果会有相应的索引,因为我们将按照创建它们的顺序处理“池”和内部请求。
假设我想从网站下载 1000 个食谱。这些网站最多接受 10 个并发连接。 每个食谱都应该存储在一个数组中,在其相应的索引处。(我不想将数组发送到 DownloadRecipe
方法。)
从技术上讲,我已经解决了这个问题,但我想知道是否有更简洁的方法来使用 async/await 或其他方法来实现它?
static async Task MainAsync()
{
int recipeCount = 1000;
int connectionCount = 10;
string[] recipes = new string[recipeCount];
Task<string>[] tasks = new Task<string>[connectionCount];
int r = 0;
while (r < recipeCount)
{
for (int t = 0; t < tasks.Length; t++)
{
tasks[t] = Task.Run(async () => recipes[r] = await DownloadRecipe(r));
r++;
}
await Task.WhenAll(tasks);
}
}
static async Task<string> DownloadRecipe(int index)
{
// ... await calls to download recipe
}
此外,此解决方案也不是最佳解决方案,因为在完成所有 10 运行 次下载之前,它不会费心开始新的下载。有什么我们可以在不使代码膨胀太多的情况下改进的地方吗?线程池限制为 10 个线程?
有很多方法可以做到这一点。一种方法是使用 ActionBlock
,它可以让您相当轻松地访问 MaxDegreeOfParallelism
,并且可以很好地与 async
方法一起使用
static async Task MainAsync()
{
var recipeCount = 1000;
var connectionCount = 10;
var recipes = new string[recipeCount];
async Task Action(int i) => recipes[i] = await DownloadRecipe(i);
var processor = new ActionBlock<int>(Action, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = connectionCount,
SingleProducerConstrained = true
});
for (var i = 0; i < recipeCount; i++)
await processor.SendAsync(i);
processor.Complete();
await processor.Completion;
}
static async Task<string> DownloadRecipe(int index)
{
...
}
另一种方法可能是使用 SemaphoreSlim
var slim = new SemaphoreSlim(connectionCount, connectionCount);
var tasks = Enumerable
.Range(0, recipeCount)
.Select(Selector);
async Task<string> Selector(int i)
{
await slim.WaitAsync()
try
{
return await DownloadRecipe(i)
}
finally
{
slim.Release();
}
}
var recipes = await Task.WhenAll(tasks);
另一组方法是使用 Reactive Extensions (Rx)...同样有很多方法可以做到这一点,这只是一种 awaitable 方法(并且可能综合考虑会更好)
var results = await Enumerable
.Range(0, recipeCount)
.ToObservable()
.Select(i => Observable.FromAsync(() => DownloadRecipe(i)))
.Merge(connectionCount)
.ToArray()
.ToTask();
有 10 个“池”的替代方法将“同时”加载数据。
您不需要用单独的线程包装 IO 操作。使用单独的线程进行IO操作只是浪费资源
请注意,下载数据的线程什么都不做,只是等待响应。这是 async-await
方法非常方便的地方 - 我们可以发送多个请求而无需等待它们完成并且不会浪费线程。
static async Task MainAsync()
{
var requests = Enumerable.Range(0, 1000).ToArray();
var maxConnections = 10;
var pools = requests
.GroupBy(i => i % maxConnections)
.Select(group => DownloadRecipesFor(group.ToArray()))
.ToArray();
await Task.WhenAll(pools);
var recipes = pools.SelectMany(pool => pool.Result).ToArray();
}
static async Task<IEnumerable<string>> DownLoadRecipesFor(params int[] requests)
{
var recipes = new List<string>();
foreach (var request in requests)
{
var recipe = await DownloadRecipe(request);
recipes.Add(recipe);
}
return recipes;
}
因为在池中(DownloadRecipesFor
方法)我们一个一个地下载结果 - 我们确保始终有不超过 10 个活动请求。
这比原来的要有效一点,因为我们不会等到 10 个任务完成后再开始下一个“bunch”。
这并不理想,因为如果最后一个“池”比其他“池”提前完成,它就无法处理下一个请求。
最终结果会有相应的索引,因为我们将按照创建它们的顺序处理“池”和内部请求。