优雅关闭 IHostedService / BackgroundService

Graceful shutdown of IHostedService / BackgroundService

在作业完成时阻止服务关闭的最佳方法是什么?我有一些后台服务在消息总线上侦听消息并处理 processing.Some 直接实现 IHostService 一些利用 BackgroundService

我知道延迟循环绝对不是最好的方法,但这是我可以演示的最简单的示例。我的另一个想法是一个包含活跃工作总数的计数器,但这需要对其进行锁定,我认为这也不是一个好主意。

public interface IMessageBus
{
   Task<int> Subscribe(CancellationToken cancellationToken);
   Task Unsubscribe(int subscriptionId, CancellationToken cancellationToken);
}

public class Worker : IHostedService, IDisposable
{
    private readonly ILogger logger;
    private readonly IMessageBus messageBus;
    private readonly List<Task> activeTasks;
    private CancellationTokenSource stoppingCts;
    int? subscriptionId;

    public Worker(ILogger logger, IMessageBus messageBus)
    {
        this.logger = logger;
        this.messageBus = messageBus;
        this.activeTasks = new List<Task>();
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        this.stoppingCts = new CancellationTokenSource();

        this.logger.LogDebug("Subscribing to message bus.");
        this.subscriptionId = await this.messageBus.Subscribe(cancellationToken, this.Process);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        this.logger.LogDebug("Stopping Worker");

        // this will stop new messages coming in....
        if (this.subscriptionId.HasValue)
        {
            this.logger.LogDebug("Unsubscribed from message bus");
            await this.messageBus.Unsubscribe(this.subscriptionId.Value, cancellationToken);
        }

        // this should block until existing jobs finished....i don't like it though
        this.logger.LogDebug("Waiting for all jobs to finish");
        while (!cancellationToken.IsCancellationRequested && this.activeTasks.Count > 0)
        {
            await Task.Delay(10);
            this.logger.LogDebug("Still waiting for all jobs to finish");
        }

        this.logger.LogDebug("Worker stopped.");
    }

    // this does real work, would be multiple running concurrently
    public async Task<bool> Process()
    {
        this.logger.LogInformation("Starting task..");

        var task = Task.Delay(100, this.stoppingCts.Token);
        this.activeTasks.Add(task);
        await task;
        this.activeTasks.Remove(task);

        this.logger.LogInformation("Task completed");

        return true;
    }

    public void Dispose()
    {
        this.stoppingCts?.Cancel();
    }
}

使用初始化为1的CountdownEvent,等待它达到零like here。在任务开始时增加事件中的计数器,并在完成时分别使用 AddCountSignal 减少。只记得在 StopAsync 中最后一次调用 Signal 来减少初始值。