调度员,Async/Await,并发工作
Dispatcher, Async/Await, Concurrent work
我有一堆异步方法,我从 Dispatcher 调用它们。这些方法不在后台执行任何工作,它们只是等待一些 I/O 操作,或等待网络服务器的响应。
async Task FetchAsync()
{
// Prepare request in UI thread
var response = await new WebClient().DownloadDataTaskAsync();
// Process response in UI thread
}
现在,我想通过并行调用多个 FetchAsync()
并达到一定的最大并行度来执行负载测试。
我的第一次尝试是使用 Paralell.Foreach()
,但 id 在 async/await 中效果不佳。
var option = new ParallelOptions {MaxDegreeOfParallelism = 10};
Parallel.ForEach(UnitsOfWork, uow => uow.FetchAsync().Wait());
我一直在研究响应式扩展,但我仍然无法利用 Dispatcher 和 async/await。
我的目标是不为每个 FetchAsync()
创建单独的线程。你能给我一些提示吗?
只需调用 Fetchasync
而无需等待每个调用,然后使用 Task.WhenAll
一起等待所有调用。
var tasks = new List<Task>();
var max = 10;
for(int i = 0; i < max; i++)
{
tasks.Add(FetchAsync());
}
await Task.WhenAll(tasks);
这是针对您的问题的通用可重用解决方案,您不仅可以将其用于 FetchAsync 方法,还可以将其用于任何具有相同签名的异步方法。 api 还包括实时并发限制支持:
参数不言自明:
totalRequestCount:是你总共要执行多少个异步请求(FatchAsync 调用),异步处理器是 FetchAsync 方法本身,maxDegreeOfParallelism 是可选的可为 null 的参数。如果你想对最大并发异步请求数进行实时并发限制,请设置它,否则不要设置。
public static Task ForEachAsync(
int totalRequestCount,
Func<Task> asyncProcessor,
int? maxDegreeOfParallelism = null)
{
IEnumerable<Task> tasks;
if (maxDegreeOfParallelism != null)
{
SemaphoreSlim throttler = new SemaphoreSlim(maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value);
tasks = Enumerable.Range(0, totalRequestCount).Select(async requestNumber =>
{
await throttler.WaitAsync();
try
{
await asyncProcessor().ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
}
else
{
tasks = Enumerable.Range(0, totalRequestCount).Select(requestNumber => asyncProcessor());
}
return Task.WhenAll(tasks);
}
我有一堆异步方法,我从 Dispatcher 调用它们。这些方法不在后台执行任何工作,它们只是等待一些 I/O 操作,或等待网络服务器的响应。
async Task FetchAsync()
{
// Prepare request in UI thread
var response = await new WebClient().DownloadDataTaskAsync();
// Process response in UI thread
}
现在,我想通过并行调用多个 FetchAsync()
并达到一定的最大并行度来执行负载测试。
我的第一次尝试是使用 Paralell.Foreach()
,但 id 在 async/await 中效果不佳。
var option = new ParallelOptions {MaxDegreeOfParallelism = 10};
Parallel.ForEach(UnitsOfWork, uow => uow.FetchAsync().Wait());
我一直在研究响应式扩展,但我仍然无法利用 Dispatcher 和 async/await。
我的目标是不为每个 FetchAsync()
创建单独的线程。你能给我一些提示吗?
只需调用 Fetchasync
而无需等待每个调用,然后使用 Task.WhenAll
一起等待所有调用。
var tasks = new List<Task>();
var max = 10;
for(int i = 0; i < max; i++)
{
tasks.Add(FetchAsync());
}
await Task.WhenAll(tasks);
这是针对您的问题的通用可重用解决方案,您不仅可以将其用于 FetchAsync 方法,还可以将其用于任何具有相同签名的异步方法。 api 还包括实时并发限制支持:
参数不言自明: totalRequestCount:是你总共要执行多少个异步请求(FatchAsync 调用),异步处理器是 FetchAsync 方法本身,maxDegreeOfParallelism 是可选的可为 null 的参数。如果你想对最大并发异步请求数进行实时并发限制,请设置它,否则不要设置。
public static Task ForEachAsync(
int totalRequestCount,
Func<Task> asyncProcessor,
int? maxDegreeOfParallelism = null)
{
IEnumerable<Task> tasks;
if (maxDegreeOfParallelism != null)
{
SemaphoreSlim throttler = new SemaphoreSlim(maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value);
tasks = Enumerable.Range(0, totalRequestCount).Select(async requestNumber =>
{
await throttler.WaitAsync();
try
{
await asyncProcessor().ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
}
else
{
tasks = Enumerable.Range(0, totalRequestCount).Select(requestNumber => asyncProcessor());
}
return Task.WhenAll(tasks);
}