如何限制异步 IO 任务到数据库的数量?
How to limit number of async IO tasks to database?
我有一个 id's
的列表,我想从数据库中并行获取每个 id
的数据。我下面的 ExecuteAsync
方法以非常高的吞吐量被调用,对于每个请求,我们有大约 500 ids
需要提取数据。
所以我得到了下面的代码,我在其中循环 ids
的列表并并行地为每个 id
进行异步调用并且它工作正常。
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
{
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
}
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
var excludeNull = new List<T>(ids.Count);
for (int i = 0; i < responses.Length; i++)
{
var response = responses[i];
if (response != null)
{
excludeNull.Add(response);
}
}
return excludeNull;
}
private async Task<T> Execute<T>(IPollyPolicy policy,
Func<CancellationToken, Task<T>> requestExecuter) where T : class
{
var response = await policy.Policy.ExecuteAndCaptureAsync(
ct => requestExecuter(ct), CancellationToken.None);
if (response.Outcome == OutcomeType.Failure)
{
if (response.FinalException != null)
{
// log error
throw response.FinalException;
}
}
return response?.Result;
}
问题:
现在你可以看到我正在循环所有 ids
并为每个 id
并行地对数据库进行大量异步调用,这会给数据库带来很大的负载(取决于请求的数量来了)。所以我想限制我们对数据库进行的异步调用的数量。我修改了 ExecuteAsync
以使用 Semaphore
,如下所示,但它看起来不像我想要的那样:
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var throttler = new SemaphoreSlim(250);
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
{
await throttler.WaitAsync().ConfigureAwait(false);
try
{
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
}
finally
{
throttler.Release();
}
}
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
// same excludeNull code check here
return excludeNull;
}
Semaphore 是否适用于 Threads
或 Tasks
?阅读它 here 看起来 Semaphore 是针对线程的,而 SemaphoreSlim 是针对任务的。
这是正确的吗?如果是,那么解决此问题并限制我们在此处对数据库执行的异步 IO 任务数量的最佳方法是什么。
您正在限制向列表中添加任务的速度。您没有限制任务的执行速度。为此,您可能必须在 Execute
方法本身内实现信号量调用。
如果您不能修改 Execute
,另一种方法是轮询已完成的任务,有点像这样:
for (int i = 0; i < ids.Count; i++)
{
var pendingCount = tasks.Count( t => !t.IsCompleted );
while (pendingCount >= 500) await Task.Yield();
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
}
await Task.WhenAll( tasks );
Task is an abstraction on threads, and doesn’t necessarily create a new thread. Semaphore limits the number of threads that can access that for loop. Execute returns a Task which aren’t threads. If there’s only 1 request, there will be only 1 thread inside that for loop, even if it is asking for 500 ids. The 1 thread sends off all the async IO tasks itself.
有点。我不会说任务与线程相关。实际上有两种任务:委托任务(一种线程的抽象)和 promise 任务(与线程无关)。
关于SemaphoreSlim
,它确实限制了代码块(不是线程)的并发。
I recently started playing with C# so my understanding is not right looks like w.r.t Threads and Tasks.
如果您对如何真正不涉及线程更感兴趣,我建议您阅读我的 async
intro and best practices. Follow up with There Is No Thread。
I modified ExecuteAsync to use Semaphore as shown below but it doesn't look like it does what I want it to do
当前代码仅限制将任务添加到列表中,无论如何一次只能完成一个任务。你想要做的是限制执行本身:
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy, Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var throttler = new SemaphoreSlim(250);
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
tasks.Add(ThrottledExecute(ids[i]));
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
// same excludeNull code check here
return excludeNull;
async Task<T> ThrottledExecute(int id)
{
await throttler.WaitAsync().ConfigureAwait(false);
try {
return await Execute(policy, ct => mapper(ct, id)).ConfigureAwait(false);
} finally {
throttler.Release();
}
}
}
您的同事可能已经想到 Semaphore
class,它确实是一个以线程为中心的节流器,没有异步功能。
Limits the number of threads that can access a resource or pool of resources concurrently.
每个线程为其堆栈保留的 SemaphoreSlim
class is a lightweight alternative to Semaphore
, which includes the asynchronous method WaitAsync
, that makes all the difference in the world. The WaitAsync
doesn't block a thread, it blocks an asynchronous workflow. Asynchronous workflows are cheap (usually less than 1000 bytes each). You can have millions of them "running" concurrently at any given moment. This is not the case with threads, because of the 内存。
至于 ExecuteAsync
方法,您可以通过以下方法使用 LINQ 方法 Select
, Where
、ToArray
和 ToList
:
重构它
Update: Polly library supports 捕获并继续当前同步上下文,所以我添加了一个 bool executeOnCurrentContext
API 的参数。我还将异步 Execute
方法重命名为 ExecuteAsync
,以与 guidelines.
保持一致
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper,
int concurrencyLevel = 1, bool executeOnCurrentContext = false) where T : class
{
var throttler = new SemaphoreSlim(concurrencyLevel);
Task<T>[] tasks = ids.Select(async id =>
{
await throttler.WaitAsync().ConfigureAwait(executeOnCurrentContext);
try
{
return await ExecuteAsync(policy, ct => mapper(ct, id),
executeOnCurrentContext).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
}).ToArray();
T[] results = await Task.WhenAll(tasks).ConfigureAwait(false);
return results.Where(r => r != null).ToList();
}
private async Task<T> ExecuteAsync<T>(IPollyPolicy policy,
Func<CancellationToken, Task<T>> function,
bool executeOnCurrentContext = false) where T : class
{
var response = await policy.Policy.ExecuteAndCaptureAsync(
ct => executeOnCurrentContext ? function(ct) : Task.Run(() => function(ct)),
CancellationToken.None, continueOnCapturedContext: executeOnCurrentContext)
.ConfigureAwait(executeOnCurrentContext);
if (response.Outcome == OutcomeType.Failure)
{
if (response.FinalException != null)
{
ExceptionDispatchInfo.Throw(response.FinalException);
}
}
return response?.Result;
}
其实TPL是可以控制任务执行,限制并发的。您可以测试有多少并行任务适合您的用例。不用考虑线程,TPL会为你打理一切。
要使用有限并发,请参阅此答案,归功于@panagiotis-kanavos
.Net TPL: Limited Concurrency Level Task scheduler with task priority?
示例代码是(即使使用不同的优先级,您也可以删除它):
QueuedTaskScheduler qts = new QueuedTaskScheduler(TaskScheduler.Default,4);
TaskScheduler pri0 = qts.ActivateNewQueue(priority: 0);
TaskScheduler pri1 = qts.ActivateNewQueue(priority: 1);
Task.Factory.StartNew(()=>{ },
CancellationToken.None,
TaskCreationOptions.None,
pri0);
只需将所有任务放入队列,然后 Task.WhenAll
您可以等到一切都完成。
我有一个 id's
的列表,我想从数据库中并行获取每个 id
的数据。我下面的 ExecuteAsync
方法以非常高的吞吐量被调用,对于每个请求,我们有大约 500 ids
需要提取数据。
所以我得到了下面的代码,我在其中循环 ids
的列表并并行地为每个 id
进行异步调用并且它工作正常。
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
{
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
}
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
var excludeNull = new List<T>(ids.Count);
for (int i = 0; i < responses.Length; i++)
{
var response = responses[i];
if (response != null)
{
excludeNull.Add(response);
}
}
return excludeNull;
}
private async Task<T> Execute<T>(IPollyPolicy policy,
Func<CancellationToken, Task<T>> requestExecuter) where T : class
{
var response = await policy.Policy.ExecuteAndCaptureAsync(
ct => requestExecuter(ct), CancellationToken.None);
if (response.Outcome == OutcomeType.Failure)
{
if (response.FinalException != null)
{
// log error
throw response.FinalException;
}
}
return response?.Result;
}
问题:
现在你可以看到我正在循环所有 ids
并为每个 id
并行地对数据库进行大量异步调用,这会给数据库带来很大的负载(取决于请求的数量来了)。所以我想限制我们对数据库进行的异步调用的数量。我修改了 ExecuteAsync
以使用 Semaphore
,如下所示,但它看起来不像我想要的那样:
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var throttler = new SemaphoreSlim(250);
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
{
await throttler.WaitAsync().ConfigureAwait(false);
try
{
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
}
finally
{
throttler.Release();
}
}
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
// same excludeNull code check here
return excludeNull;
}
Semaphore 是否适用于 Threads
或 Tasks
?阅读它 here 看起来 Semaphore 是针对线程的,而 SemaphoreSlim 是针对任务的。
这是正确的吗?如果是,那么解决此问题并限制我们在此处对数据库执行的异步 IO 任务数量的最佳方法是什么。
您正在限制向列表中添加任务的速度。您没有限制任务的执行速度。为此,您可能必须在 Execute
方法本身内实现信号量调用。
如果您不能修改 Execute
,另一种方法是轮询已完成的任务,有点像这样:
for (int i = 0; i < ids.Count; i++)
{
var pendingCount = tasks.Count( t => !t.IsCompleted );
while (pendingCount >= 500) await Task.Yield();
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
}
await Task.WhenAll( tasks );
Task is an abstraction on threads, and doesn’t necessarily create a new thread. Semaphore limits the number of threads that can access that for loop. Execute returns a Task which aren’t threads. If there’s only 1 request, there will be only 1 thread inside that for loop, even if it is asking for 500 ids. The 1 thread sends off all the async IO tasks itself.
有点。我不会说任务与线程相关。实际上有两种任务:委托任务(一种线程的抽象)和 promise 任务(与线程无关)。
关于SemaphoreSlim
,它确实限制了代码块(不是线程)的并发。
I recently started playing with C# so my understanding is not right looks like w.r.t Threads and Tasks.
如果您对如何真正不涉及线程更感兴趣,我建议您阅读我的 async
intro and best practices. Follow up with There Is No Thread。
I modified ExecuteAsync to use Semaphore as shown below but it doesn't look like it does what I want it to do
当前代码仅限制将任务添加到列表中,无论如何一次只能完成一个任务。你想要做的是限制执行本身:
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy, Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var throttler = new SemaphoreSlim(250);
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
tasks.Add(ThrottledExecute(ids[i]));
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
// same excludeNull code check here
return excludeNull;
async Task<T> ThrottledExecute(int id)
{
await throttler.WaitAsync().ConfigureAwait(false);
try {
return await Execute(policy, ct => mapper(ct, id)).ConfigureAwait(false);
} finally {
throttler.Release();
}
}
}
您的同事可能已经想到 Semaphore
class,它确实是一个以线程为中心的节流器,没有异步功能。
Limits the number of threads that can access a resource or pool of resources concurrently.
每个线程为其堆栈保留的 SemaphoreSlim
class is a lightweight alternative to Semaphore
, which includes the asynchronous method WaitAsync
, that makes all the difference in the world. The WaitAsync
doesn't block a thread, it blocks an asynchronous workflow. Asynchronous workflows are cheap (usually less than 1000 bytes each). You can have millions of them "running" concurrently at any given moment. This is not the case with threads, because of the
至于 ExecuteAsync
方法,您可以通过以下方法使用 LINQ 方法 Select
, Where
、ToArray
和 ToList
:
Update: Polly library supports 捕获并继续当前同步上下文,所以我添加了一个 bool executeOnCurrentContext
API 的参数。我还将异步 Execute
方法重命名为 ExecuteAsync
,以与 guidelines.
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper,
int concurrencyLevel = 1, bool executeOnCurrentContext = false) where T : class
{
var throttler = new SemaphoreSlim(concurrencyLevel);
Task<T>[] tasks = ids.Select(async id =>
{
await throttler.WaitAsync().ConfigureAwait(executeOnCurrentContext);
try
{
return await ExecuteAsync(policy, ct => mapper(ct, id),
executeOnCurrentContext).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
}).ToArray();
T[] results = await Task.WhenAll(tasks).ConfigureAwait(false);
return results.Where(r => r != null).ToList();
}
private async Task<T> ExecuteAsync<T>(IPollyPolicy policy,
Func<CancellationToken, Task<T>> function,
bool executeOnCurrentContext = false) where T : class
{
var response = await policy.Policy.ExecuteAndCaptureAsync(
ct => executeOnCurrentContext ? function(ct) : Task.Run(() => function(ct)),
CancellationToken.None, continueOnCapturedContext: executeOnCurrentContext)
.ConfigureAwait(executeOnCurrentContext);
if (response.Outcome == OutcomeType.Failure)
{
if (response.FinalException != null)
{
ExceptionDispatchInfo.Throw(response.FinalException);
}
}
return response?.Result;
}
其实TPL是可以控制任务执行,限制并发的。您可以测试有多少并行任务适合您的用例。不用考虑线程,TPL会为你打理一切。
要使用有限并发,请参阅此答案,归功于@panagiotis-kanavos
.Net TPL: Limited Concurrency Level Task scheduler with task priority?
示例代码是(即使使用不同的优先级,您也可以删除它):
QueuedTaskScheduler qts = new QueuedTaskScheduler(TaskScheduler.Default,4);
TaskScheduler pri0 = qts.ActivateNewQueue(priority: 0);
TaskScheduler pri1 = qts.ActivateNewQueue(priority: 1);
Task.Factory.StartNew(()=>{ },
CancellationToken.None,
TaskCreationOptions.None,
pri0);
只需将所有任务放入队列,然后 Task.WhenAll
您可以等到一切都完成。