限制调度程序中的通知数量

Limit number of notifications in scheduler

我有一个冷观察者和一个观察者。 两者都很慢,但观察者比可观察者慢。 他们处理很多很多通知,所以我不想无限制地存储通知。

此示例大约需要 30 秒才能完成。非常慢。我相信他们可以在 21 秒内完成。

var subject = new Subject<int>();

subject.Subscribe(i =>
{
    Thread.Sleep(2000);
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});


Task.Run(() =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - Start");
    foreach (var i in Enumerable.Range(0, 10))
    {
        Thread.Sleep(1000);
        subject.OnNext(i);
    }
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - End");
});

此示例大约在 20 秒内完成,但可观察对象在观察者显示“4”之前结束。 它表示调度程序在某处存储了 4 到 9。恐怕如果它存储超过 1'000'000 个通知并抛出 OutOfMemoryException。

var subject = new Subject<int>();

subject.ObserveOn(ThreadPoolScheduler.Instance).Subscribe(i =>
{
    Thread.Sleep(2000);
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});


Task.Run(() =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - Start");
    foreach (var i in Enumerable.Range(0, 10))
    {
        Thread.Sleep(1000);
        subject.OnNext(i);
    }
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - End");
});

这就是为什么我想限制调度程序中的通知数量。

编辑:图表

x : calculate or other task
S : send notification
R : receive notification

-- time -->

sample1
thread1:  xxxS       xxxS       xxxS
thread2:     Rxxxxxx    Rxxxxxx    Rxxxxxx

sample2
thread1:  xxxSxxxSxxxSxxxSxxxSxxxSxxxS
thread2:     RxxxxxxRxxxxxxRxxxxxxRxxxxxx

I want
thread1:  xxxSxxxS   xxxS   xxxS   xxxS
thread2:     RxxxxxxRxxxxxxRxxxxxxRxxxxxx

Rx 合约要求通知序列化,因此即使您可以指定 Scheduler,它更类似于说 "here, use this scheduler for managing concurrency"。

ThreadPoolScheduler 仍将序列化通知,因此最终结果是它不会并行调用您的方法。

如果要异步执行,可以改写成这样:

subject.Subscribe(async i =>
{
    await Task.Delay(1000);
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});

但潜在的问题是你的消费者落后于你的生产者。 您可以考虑使用 backpressure,或者如果您的应用程序是一系列数据处理任务,您还可以查看优秀的 TPL Dataflow。

在事件源和最终接收器之间的管道中发生了什么,这就是 Rx 最擅长的地方。

您可以尝试使用 SemaphoreSlim.

实施自定义 IScheduler 来限制请求(方法 Schedule

或者,您可以创建一个 BlockingThrottle 扩展方法来接受和 returns 一个 IObservable,您可以在订阅它之前链接到原始 IObservable。这是一个使用 BlockingCollection 作为节流机制的实现:

private static IObservable<T> BlockingThrottle<T>(this IObservable<T> source,
    int boundedCapacity)
{
    return Observable.Create<T>(observer =>
    {
        var queue = new BlockingCollection<T>(boundedCapacity);
        var cts = new CancellationTokenSource();
        var locker = new object();
        Exception exception = null;

        new Thread(() =>
        {
            try
            {
                foreach (var item in queue.GetConsumingEnumerable(cts.Token))
                {
                    observer.OnNext(item);
                }
                observer.OnCompleted();
            }
            catch (OperationCanceledException)
            {
                Exception ex; lock (locker) ex = exception;
                if (ex != null) observer.OnError(ex);
            }
            // Leave all other exceptions unhandled.
            // The responsibility for catching them belongs to the caller.
        })
        { IsBackground = true }.Start();

        var subscription = source.Subscribe(x =>
        {
            try
            {
                queue.Add(x, cts.Token);
            }
            catch (OperationCanceledException) { } // Ignore this exception too
        }, ex =>
        {
            lock (locker) exception = ex;
            cts.Cancel();
        }, () =>
        {
            queue.CompleteAdding();
        });

        return Disposable.Create(() =>
        {
            cts.Cancel();
            subscription.Dispose();
        });
    });
}

用法示例:

subject.BlockingThrottle(boundedCapacity: 10).Subscribe(i =>

注意: 如果您打算在 ASP.NET 应用程序中使用它,请考虑将 BlockingCollection 替换为异步队列(如 BufferBlock<T>Channel<T>),以避免阻塞线程。