如何在 C# 中实现一个令牌系统来限制 processor/IO 繁重的多线程任务的并发性?

How to implement a token system for limiting concurrency of a processor/IO heavy multithreading Tasks in C#?

这是这个问题的后续问题:

假设我有一个值 taskLimit(假设 20),没有在下面的 RunAsync 方法中创建的并发任务“MyTask”:

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var tasks = new List<Task>();
    try
    {
        for (int i = 0; i < taskLimit; i++)
        {
            tasks.Add(MyTask(cancellationToken, i);
        }
        
        await Task.WhenAll(tasks);
    }
    catch (Exception e)
    {
        //Exception Handling
    }
}
public async Task MyTask(CancellationToken cancellationToken, int a)
{
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();

        try
        {
            //long running code, if possible check for cancellation using the token
            //Do something useful here. Very Processor and IO heavy. Takes 5-10 minutes to complete.
            //SomeHeavyTask can only concurrently run upto a limit of say 5. Implement a token system of sorts
            while(freeTokens<1)
            {
            await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);
            }
            freeTokens = freeTokens-1;
            SomeHeavyTask(cancellationToken);
            freeTokens = freeTokens+1;
            
            //sleep for an independently parameterizable period, then wake up and repeat
            await Task.Delay(TimeSpan.FromHours(parametrizableTaskDelay[i]), cancellationToken);
        }
        catch (Exception e)
        {
            //Exception Handling
        }
    }
}

这样的事情可以做吗?在 C# 中是否有更好的更正式的方法来实现同样的事情?请注意这个问题的本质是 freeTokens 比 taskLimit 少得多。而且每个MyTask在SomeHeavyTask()中只花费了10%的时间,大部分时间都花在了await Task.Delay().

您应该使用 Microsoft 的 Reactive Framework(又名 Rx)- NuGet System.Reactive 并添加 using System.Reactive.Linq; - 然后您可以这样做:

int taskLimit = 500;
int maxConcurrent = 5;

IObservable<Unit> query =
    Observable
        .Range(0, taskLimit)
        .Select(x => Observable.FromAsync(ct => SomeHeavyTask(ct)))
        .Merge(maxConcurrent);
        
await query;

在我的书中,这更容易使用。

如@mjwills 所说,您可以使用 C# semaphore to manage concurrent access to resources. (random example)

不过,我确实建议先查看现有的解决方案。例如,Hangfire。 如果需要,您可以存储它的 state inside SF

您可以使用 SemaphoreSlim 来限制同时工作的任务数量(您仍然会有 taskLimit 个活动任务,但只有有限数量的任务会执行繁重的工作同时;我想这就是你想要的)。

最好使用示例控制台应用程序来证明这一点。如果你 运行 这样做,你将从输出中看到最多同时激活 5 个“繁重任务”。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class Program
    {
        static async Task Main()
        {
            Console.WriteLine("Starting");

            // Cancel after 30 seconds for demo purposes.
            using var source = new CancellationTokenSource(TimeSpan.FromSeconds(30));
            await RunAsync(source.Token);

            Console.WriteLine("Stopped.");
            Console.ReadLine();
        }

        public static async Task RunAsync(CancellationToken cancellationToken)
        {
            int taskLimit = 20;
            int concurrencyLimit = 5;

            var sem   = new SemaphoreSlim(concurrencyLimit);
            var tasks = new List<Task>();

            try
            {
                for (int i = 0; i < taskLimit; i++)
                {
                    int p = i; // Prevent modified closure.
                    tasks.Add(Task.Run(() => MyTask(cancellationToken, p, sem)));
                }

                await Task.WhenAll(tasks);
            }

            catch (OperationCanceledException)
            {
                Console.WriteLine("Task(s) were cancelled.");
            }

            catch (Exception e)
            {
                // Exception Handling
            }
        }

        public static async Task MyTask(CancellationToken cancellationToken, int a, SemaphoreSlim sem)
        {
            while (true)
            {
                cancellationToken.ThrowIfCancellationRequested();

                try
                {
                    await sem.WaitAsync(cancellationToken);

                    try
                    {
                        someHeavyTask(cancellationToken, a);
                    }
                    
                    finally
                    {
                        sem.Release();
                    }
                }

                catch (OperationCanceledException)
                {
                    Console.WriteLine("Task was cancelled.");
                    return;
                }

                catch (Exception e)
                {
                    //Exception Handling
                }
            }
        }

        static int heavyTaskCount;

        static void someHeavyTask(CancellationToken cancel, int a)
        {
            int n = Interlocked.Increment(ref heavyTaskCount);
            Console.WriteLine("Starting heavy task. Number of simultaneous heavy tasks = " + n);

            // Simulate work. Make the work for each task take varying time by using 'a' for the sleep.

            for (int i = 0; i < 20 && !cancel.IsCancellationRequested; ++i)
            {
                Thread.Sleep(100 + a*10);
            }

            n = Interlocked.Decrement(ref heavyTaskCount);
            Console.WriteLine("Finishing heavy task. Number of simultaneous heavy tasks = " + n);
        }
    }
}

这里的核心是由代码中的信号量控制的:

await sem.WaitAsync(cancellationToken);

try
{
    someHeavyTask(cancellationToken, a);
}

finally
{
    sem.Release();
}

另一个选项:

var block = new ActionBlock<int>(x => SomeHeavyTask(cancellationToken, x), 
    new ExecutionDataflowBlockOptions() 
    { 
        MaxDegreeOfParallelism = 20,
        CancellationToken = cancellationToken
    });

for (int i = 0; i < 100; i++)
    await block.SendAsync(i, cancellationToken);
    
block.Complete();
await block.Completion;