调度员,Async/Await,并发工作

Dispatcher, Async/Await, Concurrent work

我有一堆异步方法,我从 Dispatcher 调用它们。这些方法不在后台执行任何工作,它们只是等待一些 I/O 操作,或等待网络服务器的响应。

async Task FetchAsync()
{
    // Prepare request in UI thread
    var response = await new WebClient().DownloadDataTaskAsync();
    // Process response in UI thread
}

现在,我想通过并行调用多个 FetchAsync() 并达到一定的最大并行度来执行负载测试。

我的第一次尝试是使用 Paralell.Foreach(),但 id 在 async/await 中效果不佳。

var option = new ParallelOptions {MaxDegreeOfParallelism = 10};
Parallel.ForEach(UnitsOfWork, uow => uow.FetchAsync().Wait());

我一直在研究响应式扩展,但我仍然无法利用 Dispatcher 和 async/await。

我的目标是不为每个 FetchAsync() 创建单独的线程。你能给我一些提示吗?

只需调用 Fetchasync 而无需等待每个调用,然后使用 Task.WhenAll 一起等待所有调用。

var tasks = new List<Task>();
var max = 10;
for(int i = 0; i < max; i++)
{
    tasks.Add(FetchAsync());
}

await Task.WhenAll(tasks);

这是针对您的问题的通用可重用解决方案,您不仅可以将其用于 FetchAsync 方法,还可以将其用于任何具有相同签名的异步方法。 api 还包括实时并发限制支持:

参数不言自明: totalRequestCount:是你总共要执行多少个异步请求(FatchAsync 调用),异步处理器是 FetchAsync 方法本身,maxDegreeOfParallelism 是可选的可为 null 的参数。如果你想对最大并发异步请求数进行实时并发限制,请设置它,否则不要设置。

public static Task ForEachAsync(
        int totalRequestCount,
        Func<Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        IEnumerable<Task> tasks;

        if (maxDegreeOfParallelism != null)
        {
            SemaphoreSlim throttler = new SemaphoreSlim(maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value);

            tasks = Enumerable.Range(0, totalRequestCount).Select(async requestNumber =>
            {
                await throttler.WaitAsync();
                try
                {
                    await asyncProcessor().ConfigureAwait(false);
                }
                finally
                {
                    throttler.Release();
                }
            });
        }
        else
        {
            tasks = Enumerable.Range(0, totalRequestCount).Select(requestNumber => asyncProcessor());
        }

        return Task.WhenAll(tasks);
    }