有没有办法在 ASP.NET Web API 应用程序中全局限制并行任务的数量?

Is there a way to limit the number of parallel Tasks globally in an ASP.NET Web API application?

我有一个 ASP.NET 5 Web API 应用程序,它包含一个从 List<T> 获取对象并向服务器发出 HTTP 请求的方法,一次 5 个,直到所有请求已完成。这是使用 SemaphoreSlimList<Task>() 并等待 Task.WhenAll() 完成的,类似于下面的示例片段:

public async Task<ResponseObj[]> DoStuff(List<Input> inputData)
{
  const int maxDegreeOfParallelism = 5;
  var tasks = new List<Task<ResponseObj>>();

  using var throttler = new SemaphoreSlim(maxDegreeOfParallelism);
  foreach (var input in inputData)
  {
    tasks.Add(ExecHttpRequestAsync(input, throttler));
  }

  List<ResponseObj> resposnes = await Task.WhenAll(tasks).ConfigureAwait(false);

  return responses;
}

private async Task<ResponseObj> ExecHttpRequestAsync(Input input, SemaphoreSlim throttler)
{
  await throttler.WaitAsync().ConfigureAwait(false);
  
  try
  {
    using var request = new HttpRequestMessage(HttpMethod.Post, "https://foo.bar/api");
    request.Content = new StringContent(JsonConvert.SerializeObject(input, Encoding.UTF8, "application/json");

    var response = await HttpClientWrapper.SendAsync(request).ConfigureAwait(false);
    var responseBody = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
    var responseObject = JsonConvert.DeserializeObject<ResponseObj>(responseBody);

    return responseObject;
  }
  finally
  {
    throttler.Release();
  }
}

这很好用,但是我希望限制在整个应用程序中全局并行执行的任务总数,以便允许扩展此应用程序。例如,如果同时进入我的 API 的 50 个请求,这将最多并行启动 250 个任务 运行。如果我想将在任何给定时间执行的任务总数限制为 100,是否可以实现?也许通过 Queue<T>?框架会自动阻止执行太多任务吗?或者我是否以错误的方式解决了这个问题,我是否需要将传入的请求排队到我的应用程序?

If I wanted to limit the total number of Tasks that are being executed at any given time to say 100, is it possible to accomplish this?

您正在寻找的是限制 MaximumConcurrencyLevel of what's called the Task Scheduler. You can create your own task scheduler 来调节它管理的任务的 MaximumCongruencyLevel 我建议实现一个类似队列的对象来跟踪传入的requests 和当前正在工作的请求,并等待当前请求完成后再消费更多。以下信息可能仍然有用。

任务调度程序负责 Tasks 的优先级排序,并负责跟踪任务并确保它们的工作完成,至少最终是这样。

它执行此操作的方式实际上与您提到的非常相似,一般来说,任务计划程序处理任务的方式采用 FIFO(先进先出)模型,与 ConcurrentQueue<T> works (at least starting in .NET 4) .

Would the framework automatically prevent too many tasks from being executed?

默认情况下,大多数应用程序创建的 TaskScheduler 似乎默认为 int.MaxValueMaximumConcurrencyLevel。所以理论上是的。

任务数量实际上没有限制(至少在默认情况下 TaskScheduler)对于您的案例场景来说可能不是什么大问题。

任务分为两种类型,至少在涉及如何将它们分配给可用线程池时是这样。它们被分成 Local and Global queues.

不用太详细,它的工作方式是如果一个任务创建其他任务,那些新任务是父任务队列(本地队列)的一部分。父任务生成的任务仅限于父线程池。(Unless the task scheduler takes it upon itself to move queues around)

如果一个任务不是由另一个任务创建的,则它是顶级任务并被放入全局队列。这些通常会分配给它们自己的线程(如果可用),如果一个线程不可用,它将在 FIFO 模型中处理,如上所述,直到它的工作可以完成。

这很重要,因为虽然您可以限制 TaskScheduler 发生的并发量,但它不一定很重要 - 如果说您有一个标记为长的顶级任务 运行 并负责处理您的传入请求。这将很有帮助,因为此顶级任务生成的所有任务都将成为该任务本地队列的一部分,因此不会向线程池中的所有可用线程发送垃圾邮件。

我假设代码已修复,即删除了 Task.Run 并调整了 WaitAsync / Release 以限制 HTTP 调用而不是 List<T>.Add.

I am looking to limit the total number of Tasks that are being executed in parallel globally throughout the application, so as to allow scaling up of this application.

这对我来说没有意义。限制你的任务限制了你的扩展。

For example, if 50 requests to my API came in at the same time, this would start at most 250 tasks running parallel.

并发,当然,但不是并行。重要的是要注意,这些不是 250 个线程,它们也不是 250 个 CPU 绑定的等待空闲线程池线程到 运行 的操作。这些是 Promise Tasks,而不是 Delegate Tasks,因此它们根本不会在线程上“运行”。内存中只有 250 个对象。

If I wanted to limit the total number of Tasks that are being executed at any given time to say 100, is it possible to accomplish this?

由于(这些类型的)任务只是内存中的对象,因此不需要限制它们,就像您不需要限制 string 或 [=15= 的数量一样]s。在 需要的地方应用节流;例如,每个请求同时完成的 HTTP 调用数。或者每个主机。

Would the framework automatically prevent too many tasks from being executed?

框架没有内置类似的东西。

Perhaps via a Queue? Or am I approaching this problem in the wrong way, and would I instead need to Queue the incoming requests to my application?

已经有一个请求队列。它由 IIS(或任何您的主机)处理。如果您的服务器太忙(或突然变得很忙),请求将排队,您无需执行任何操作。

当您有一堆项目并且想要异步处理它们并且并发性有限时,SemaphoreSlim 是完成这项工作的绝佳工具。有两种方法可以使用它。一种方法是立即创建所有任务并让每个任务在执行其主要工作之前获取信号量,另一种方法是在枚举源时限制任务的创建。第一种技术是急切的,因此它会消耗更多的 RAM,但它更易于维护,因为它更容易理解和实现。第二种技术是惰性的,如果你有数百万个项目要处理,它会更有效率。

您在示例代码中使用的技术是第二种(惰性)技术。

下面是一个使用两个 SemaphoreSlim 的示例,以便实施两个最大并发策略,一个针对每个请求,一个针对全局。首先是急切的方法:

private const int maxConcurrencyGlobal = 100;
private static SemaphoreSlim globalThrottler
    = new SemaphoreSlim(maxConcurrencyGlobal, maxConcurrencyGlobal);

public async Task<ResponseObj[]> DoStuffAsync(IEnumerable<Input> inputData)
{
    const int maxConcurrencyPerRequest = 5;
    var perRequestThrottler
        = new SemaphoreSlim(maxConcurrencyPerRequest, maxConcurrencyPerRequest);

    Task<ResponseObj>[] tasks = inputData.Select(async input =>
    {
        await perRequestThrottler.WaitAsync();
        try
        {
            await globalThrottler.WaitAsync();
            try
            {
                return await ExecHttpRequestAsync(input);
            }
            finally { globalThrottler.Release(); }
        }
        finally { perRequestThrottler.Release(); }
    }).ToArray();
    return await Task.WhenAll(tasks);
}

Select LINQ 运算符提供了一种简单直观的方法来将项目投影到任务。

这里是做同样事情的惰性方法:

private const int maxConcurrencyGlobal = 100;
private static SemaphoreSlim globalThrottler
    = new SemaphoreSlim(maxConcurrencyGlobal, maxConcurrencyGlobal);

public async Task<ResponseObj[]> DoStuffAsync(IEnumerable<Input> inputData)
{
    const int maxConcurrencyPerRequest = 5;
    var perRequestThrottler
        = new SemaphoreSlim(maxConcurrencyPerRequest, maxConcurrencyPerRequest);

    var tasks = new List<Task<ResponseObj>>();
    foreach (var input in inputData)
    {
        await perRequestThrottler.WaitAsync();
        await globalThrottler.WaitAsync();
        Task<ResponseObj> task = Run(async () =>
        {
            try
            {
                return await ExecHttpRequestAsync(input);
            }
            finally
            {
                try { globalThrottler.Release(); }
                finally { perRequestThrottler.Release(); }
            }
        });
        tasks.Add(task);
    }
    return await Task.WhenAll(tasks);

    static async Task<T> Run<T>(Func<Task<T>> action) => await action();
}

此实现假设 await globalThrottler.WaitAsync() 永远不会抛出,这是根据文档给出的。如果您稍后决定添加对取消的支持,并且将 CancellationToken 传递给该方法,则情况将不再如此。在那种情况下,您将需要一个围绕任务创建逻辑的 try/finally 包装器。第一种(急切的)方法可以在没有此类考虑的情况下通过取消支持得到增强。它现有的 try/finally 基础设施是 已经足够了。

使用async/await实现内部助手Run方法也很重要。 Eliding the async/await 是一个很容易犯的错误,因为在那种情况下,ExecHttpRequestAsync 方法同步抛出的任何异常都会立即重新抛出,并且不会封装在 Task<ResponseObj> 中。然后 DoStuffAsync 方法返回的任务将失败,不会释放获取的信号量,也不会等待已经启动的操作完成。这是支持急切方法的另一个论据。懒惰的方法有太多陷阱需要注意。