给定一个可以停止和启动的外部生产者 API,当本地缓冲区已满时有效地停止生产者

Given an external producer API that can be stopped and started, efficiently stop the producer when local buffer is full

假设我获得了一个事件生成器 API,其中包含 Start()Pause()Resume() 方法,以及一个 ItemAvailable 事件。生产者本身是外部代码,我无法控制它的线程。在调用 Pause() 之后,一些项目 可能 仍然通过(生产者实际上是远程的,因此项目可能已经在网络上传输)。

还假设我正在编写消费者代码,其中消费可能比生产慢。

关键要求是

  1. 消费者事件处理程序不得阻塞生产者线程,并且
  2. 必须处理所有事件(不能丢弃任何数据)。

我在消费者中引入了一个缓冲区来消除一些突发性。但是在扩展突发的情况下,我想在适当的时候调用Producer.Pause(),然后调用Resume(),以避免运行在消费者端内存不足。

我有一个解决方案利用 Interlocked 来递增和递减计数器,将其与阈值进行比较以决定是 Pause 还是 Resume

问题:在效率(和优雅)方面,是否有比 Interlocked 计数器(下面代码中的int current)更好的解决方案?

更新的 MVP(不再反弹限制器):

namespace Experiments
{
    internal class Program
    {
        // simple external producer API for demo purposes
        private class Producer
        {
            public void Pause(int i) { _blocker.Reset(); Console.WriteLine($"paused at {i}"); }
            public void Resume(int i) { _blocker.Set(); Console.WriteLine($"resumed  at {i}"); }
            public async Task Start()
            {
                await Task.Run
                (
                    () =>
                    {
                        for (int i = 0; i < 10000; i++)
                        {
                            _blocker.Wait();
                            ItemAvailable?.Invoke(this, i);
                        }
                    }
                );
            }

            public event EventHandler<int> ItemAvailable;
            private ManualResetEventSlim _blocker = new(true);
        }

        private static async Task Main(string[] args)
        {
            var p = new Producer();
            var buffer = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleWriter = true });
            int threshold = 1000;
            int resumeAt = 10;
            int current = 0;
            int paused = 0;

            p.ItemAvailable += (_, i) =>
            {
                if (Interlocked.Increment(ref current) >= threshold
                    && Interlocked.CompareExchange(ref paused, 0, 1) == 0
                ) p.Pause(i);

                buffer.Writer.TryWrite(i);
            };

            var processor = Task.Run
            (
                async () =>
                {
                    await foreach (int i in buffer.Reader.ReadAllAsync())
                    {
                        Console.WriteLine($"processing {i}");
                        await Task.Delay(10);
                        if
                        (
                            Interlocked.Decrement(ref current) < resumeAt
                            && Interlocked.CompareExchange(ref paused, 1, 0) == 1
                        ) p.Resume(i);
                    }
                }
            );

            p.Start();
            await processor;
        }
    }
}

如果您的目标是优雅,可以考虑在自定义 Channel<T> 中烘焙压力感知功能。下面是从 Channel<T> 派生的 PressureAwareUnboundedChannel<T> class。它提供了基础 class 的所有功能,此外,它还会在通道承受压力和释放压力时发出通知。通知通过 IProgress<bool> 实例推送,当压力超过特定的高阈值时发出 true 值,当压力下降到特定的低阈值以下时发出 false 值.

public sealed class PressureAwareUnboundedChannel<T> : Channel<T>
{
    private readonly Channel<T> _channel;
    private readonly int _highPressureThreshold;
    private readonly int _lowPressureThreshold;
    private readonly IProgress<bool> _pressureProgress;
    private int _pressureState = 0; // 0: no pressure, 1: under pressure

    public PressureAwareUnboundedChannel(int lowPressureThreshold,
        int highPressureThreshold, IProgress<bool> pressureProgress)
    {
        if (lowPressureThreshold < 0)
            throw new ArgumentOutOfRangeException(nameof(lowPressureThreshold));
        if (highPressureThreshold < lowPressureThreshold)
            throw new ArgumentOutOfRangeException(nameof(highPressureThreshold));
        if (pressureProgress == null)
            throw new ArgumentNullException(nameof(pressureProgress));
        _highPressureThreshold = highPressureThreshold;
        _lowPressureThreshold = lowPressureThreshold;
        _pressureProgress = pressureProgress;
        _channel = Channel.CreateBounded<T>(Int32.MaxValue);
        this.Writer = new ChannelWriter(this);
        this.Reader = new ChannelReader(this);
    }

    private class ChannelWriter : ChannelWriter<T>
    {
        private readonly PressureAwareUnboundedChannel<T> _parent;

        public ChannelWriter(PressureAwareUnboundedChannel<T> parent)
            => _parent = parent;
        public override bool TryComplete(Exception error = null)
            => _parent._channel.Writer.TryComplete(error);
        public override bool TryWrite(T item)
        {
            bool success = _parent._channel.Writer.TryWrite(item);
            if (success) _parent.SignalWriteOrRead();
            return success;
        }
        public override ValueTask<bool> WaitToWriteAsync(
            CancellationToken cancellationToken = default)
                => _parent._channel.Writer.WaitToWriteAsync(cancellationToken);
    }

    private class ChannelReader : ChannelReader<T>
    {
        private readonly PressureAwareUnboundedChannel<T> _parent;

        public ChannelReader(PressureAwareUnboundedChannel<T> parent)
            => _parent = parent;
        public override Task Completion => _parent._channel.Reader.Completion;
        public override bool CanCount => _parent._channel.Reader.CanCount;
        public override int Count => _parent._channel.Reader.Count;
        public override bool TryRead(out T item)
        {
            bool success = _parent._channel.Reader.TryRead(out item);
            if (success) _parent.SignalWriteOrRead();
            return success;
        }
        public override ValueTask<bool> WaitToReadAsync(
            CancellationToken cancellationToken = default)
                => _parent._channel.Reader.WaitToReadAsync(cancellationToken);
    }

    private void SignalWriteOrRead()
    {
        var currentCount = _channel.Reader.Count;
        bool underPressure;
        if (currentCount > _highPressureThreshold)
            underPressure = true;
        else if (currentCount <= _lowPressureThreshold)
            underPressure = false;
        else
            return;
        int newState = underPressure ? 1 : 0;
        int oldState = underPressure ? 0 : 1;
        if (Interlocked.CompareExchange(
            ref _pressureState, newState, oldState) != oldState) return;
        _pressureProgress.Report(underPressure);
    }
}

封装的Channel<T>实际上是一个bounded channel,容量等于最大Int32值,因为只有bounded channels实现了Reader.Count 属性.

用法示例:

var progress = new Progress<bool>(underPressure =>
{
    if (underPressure) Producer.Pause(); else Producer.Resume();
});
var channel = new PressureAwareUnboundedChannel<Item>(500, 1000, progress);

本例中Producer会在频道内存储的项目超过1000个时暂停,当项目数量下降到500个或更少时恢复。

Progress<bool> 操作在创建 Progress<bool> 时捕获的上下文中调用。因此,如果您在 GUI 应用程序的 UI 线程上创建它,该操作将在 UI 线程上调用,否则将在 ThreadPool 上调用。在后一种情况下,将无法防止 Action<bool> 的重叠调用。如果 Producer class 不是线程安全的,则必须在处理程序中添加同步。示例:

var progress = new Progress<bool>(underPressure =>
{
    lock (Producer) if (underPressure) Producer.Pause(); else Producer.Resume();
});

如果您意识到这个问题有三个“步骤”,那么这就相对简单了。

  1. 第一步ToChannel(Producer)接收生产者的消息
  2. 下一步,PauseAt 发出信号 pause() 如果面板中有太多待处理的项目。
  3. 第三步,如果输入通道的计数小于阈值,ResumeAt 发出信号 resume()

使用典型的渠道模式很容易组合所有三个步骤。


producer.ToChannel(token)
    .PauseAt(1000,()=>producer.PauseAsync(),token)
    .ResumeAt(10,()=>producer.ResumeAsync(),token)
    ....

或单一的通用TrafficJam方法:

static ChannelReader<T> TrafficJam(this ChannelReader<T> source,
    int pauseAt,int resumeAt,
    Func<Task> pause,Func<Task> resume,
    CancellationToken token=default)
{
    return source
             .PauseAt(pauseAt,pause,token)
             .ResumeAt(resumeAt,resume,token);
}

到频道

第一步相对简单,基于生产者事件的无界通道源。

static ChannelReader<int> ToChannel(this Producer producer,
                                    CancellationToken token=default)
{
    Channel<int> channel=Channel.CreateUnbounded();
    var writer=channel.Writer;
    producer.ItemAvailable += OnItem;
    return channel;

    void OnItem(object sender, int item)
    {
        writer.TryWriteAsync(item);
        if(token.IsCancellationRequested)
        {
            producer.ItemAvailable-=OnItem;
            writer.Complete();
            
        }
    }
}

唯一不寻常的部分是使用本地函数来允许在请求取消时禁用事件处理程序并完成输出通道

这足以让所有传入的项目排队。 ToChannel 不会为启动、暂停等而烦恼,那不是它的工作。

暂停时间

下一个函数 PauseAt 使用 BoundedChannel 来实现阈值。如果可以,它会转发传入的消息。如果通道不能再接受任何消息,它将调用 pause 回调并 a 等待直到它可以恢复转发:

static ChannelReader<T> PauseAt(this ChannelReader<T> source, 
        int threshold, Func<Task> pause,
        CancellationToken token=default)
{
    Channel<T> channel=Channel.CreateBounded(threshold);
    var writer=channel.Writer;

    _ = Task.Run(async ()=>
        await foreach(var msg in source.ReadAllAsync(token))
        {
            if(writer.CanWrite())
            {
               await writer.WriteAsync(msg);
            }
            else
            {
               await pause();
               //Wait until we can post again
               await writer.WriteAsync(msg);
            }
        }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

恢复时间

最后一步,ResumeAt,如果其输入先前高于阈值并且现在有更少的项目,则调用 resume()

如果输入不受限制,它只会转发所有消息。

static ChannelReader<T> ResumeAt(this ChannelReader<T> source, 
        int resumeAt, Func<Task> resume,
        CancellationToken token=default)
{
    Channel<T> channel=Channel.CreateUnbounded();
    var writer=channel.Writer;

    _ = Task.Run(async ()=>{
        bool above=false;
        await foreach(var msg in source.ReadAllAsync(token))
        {
            await writer.WriteAsync(msg);
            //Do nothing if the source isn't bounded
            if(source.CanCount)
            {
                if(above && source.Count<=resumeAt)
                {
                    await resume();
                    above=false;
                }       
                above=source.Count>resumeAt;  
            }
       }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

由于只使用了一个线程,我们可以保留之前的计数。以及它是高于还是低于阈值。

结合暂停和恢复

由于 PauseResume 仅使用通道,因此可以将它们组合成一个方法:

static ChannelReader<T> TrafficJam(this ChannelReader<T> source,
    int pauseAt,int resumeAt,
    Func<Task> pause,Func<Task> resume,
    CancellationToken token=default)
{
    return source.PauseAt(pauseAt,pause,token)
             .ResumeAt(resumeAt,resume,token);
}