在单独的线程池中执行某些后台任务,以避免主线程中执行的关键任务饿死

Execute certain background Tasks in separate ThreadPool to avoid starvation to critical Tasks executed in main thread

在单独的线程池中执行某些后台任务(非线程)以避免主线程中执行的关键任务(非线程)饿死

我们的场景

我们托管了一个大容量 WCF Web 服务,逻辑上具有以下代码:

void WcfApiMethod()
{
   // logic

   // invoke other tasks which are critical
   var mainTask = Task.Factory.StartNew(() => { /* important task */ });
   mainTask.Wait();

   // invoke background task which is not critical
   var backgroundTask = Task.Factory.StartNew(() => { /* some low-priority background action (not entirely async) */ });
   // no need to wait, as this task is best effort. Fire and forget

   // other logic
}

// other APIs

现在,问题是,在某些情况下,低优先级后台任务可能需要更长的时间(~ 30 秒),例如检测 SQL 连接问题、DB perf 问题、redis 缓存问题、等等,这将使那些后台线程延迟,这意味着由于高容量,TOTAL PENDING TASK COUNT 将增加。

这会造成这样一种情况,即 API 的较新执行无法安排高优先级任务,因为有很多后台任务在排队。

我们尝试过的解决方案

  1. 在高优先级任务中添加TaskCreationOptions.LongRunning会立即执行。 然而,这对我们来说不是一个解决方案,因为系统中到处都有很多任务被调用,我们不能让它们到处都是 long-运行 。 此外,WCF 处理传入的 APIs 将依赖于 .NET 线程池,它现在处于饥饿状态。

  2. 短路低pri-background任务创建,通过Semaphore。仅在系统有能力处理线程时才生成线程(检查之前创建的线程是否已退出)。如果没有,就不要产生线程。 例如,由于某个问题(例如 DB perf 问题),约 10,000 个后台线程(非异步)处于 IO 等待状态,这可能会导致主 .net 线程池中的线程饥饿。 在这种特定情况下,我们可以添加一个信号量以将创建限制为 100,因此如果 100 个任务被卡住,则不会首先创建第 101 个任务。

询问替代解决方案

有没有办法专门在 "custom threads/ thread pool" 上生成 "tasks",而不是默认的 .NET 线程池。 这是针对我提到的后台任务,因此如果它们被延迟,它们不会导致整个系统崩溃。 可以覆盖并创建一个自定义 TaskScheduler 以传递到 Task.Factory.StartNew() 因此,创建的任务不会在默认的 .NET 线程池中,而是在其他一些自定义池中。

基于https://codereview.stackexchange.com/questions/203213/custom-taskscheduler-limited-concurrency-level?newreg=acb8e97fe4c94844a660bcd7473c4876,确实存在一种内置解决方案,可通过有限并发任务调度程序来限制线程生成。

内置 ConcurrentExclusiveSchedulerPair.ConcurrentScheduler 可用于实现此目的。

对于上述情况,以下代码限制后台线程破坏应用程序/防止线程饥饿。

        {
            // fire and forget background task
            var task = Task.Factory.StartNew(
                () =>
                {
                    // background threads
                }
                , CancellationToken.None
                , TaskCreationOptions.None
                , concurrentSchedulerPair.ConcurrentScheduler);
        }

        private static ConcurrentExclusiveSchedulerPair concurrentSchedulerPair = new ConcurrentExclusiveSchedulerPair(
            TaskScheduler.Default,
            maxConcurrencyLevel: 100);

A caution on using TaskScheduler.Default and maxConcurrencyLevel:100 parameters, say, you create 10000 tasks using this limited-conc-scheduler and try to immediately spawn another thread using 'default-scheduler', that new spawn would be blocked unless all 100 threads are created. If you try maxConcurrencyLevel:10, new thread spawns are immediately and not blocking once all 10 threads are instantiated.

感谢@Theodor Zoulias 的指点。

这是一个静态 RunLowPriority 方法,您可以使用它来代替 Task.Run。它具有简单和通用任务的重载,以及普通和异步委托的重载。

const int LOW_PRIORITY_CONCURRENCY_LEVEL = 2;
static TaskScheduler LowPriorityScheduler = new ConcurrentExclusiveSchedulerPair(
    TaskScheduler.Default, LOW_PRIORITY_CONCURRENCY_LEVEL).ConcurrentScheduler;

public static Task RunLowPriority(Action action,
    CancellationToken cancellationToken = default)
{
    return Task.Factory.StartNew(action, cancellationToken,
        TaskCreationOptions.DenyChildAttach, LowPriorityScheduler);
}

public static Task RunLowPriority(Func<Task> function,
    CancellationToken cancellationToken = default)
{
    return Task.Factory.StartNew(function, cancellationToken,
        TaskCreationOptions.DenyChildAttach, LowPriorityScheduler).Unwrap();
}

public static Task<TResult> RunLowPriority<TResult>(Func<TResult> function,
    CancellationToken cancellationToken = default)
{
    return Task.Factory.StartNew(function, cancellationToken,
        TaskCreationOptions.DenyChildAttach, LowPriorityScheduler);
}

public static Task<TResult> RunLowPriority<TResult>(Func<Task<TResult>> function,
    CancellationToken cancellationToken = default)
{
    return Task.Factory.StartNew(function, cancellationToken,
        TaskCreationOptions.DenyChildAttach, LowPriorityScheduler).Unwrap();
}

通过 RunLowPriority 方法安排的操作将 运行 在 ThreadPool 线程上,但最多可以同时分配所有可用 ThreadPool 线程中的 2 个到 RunLowPriority 任务。

请记住,System.Timers.TimerElapsed 事件的 SynchronizingObject 属性 设置为 null 运行 ThreadPool 个线程。因此,如果您在此处理程序中执行低优先级工作,您可能应该通过相同的有限并发调度程序来安排它:

var timer = new System.Timers.Timer();
timer.Elapsed += (object sender, System.Timers.ElapsedEventArgs e) =>
{
    Thread.Sleep(10); // High priority code
    var fireAndForget = RunLowPriority(() =>
    {
        if (!timer.Enabled) return;
        Thread.Sleep(1000); // Simulate long running code that has low priority
    });
};