如何制作一个只能订阅一次的轻量级 `Replay` 运算符?

How to make a lightweight `Replay` operator that can be subscribed only once?

在各种情况下,我都希望有一个 Rx Replay operator that buffers the incoming notifications, replays its buffer synchronously when it is subscribed for the first time, and stops buffering after that. This lightweight Replay operator should be able to serve only one subscriber. One use case for such an operator can be found ,在第一次订阅后继续缓冲只是浪费资源。出于演示目的,我将在这里展示一个我希望避免的问题行为的人为示例:

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(500))
    .SelectMany(x => Enumerable.Range((int)x * 100_000 + 1, 100_000))
    .Take(800_000)
    .Do(x =>
    {
        if (x % 100_000 == 0) Console.WriteLine(
            $"{DateTime.Now:HH:mm:ss.fff} > " +
            $"Produced: {x:#,0}, TotalMemory: {GC.GetTotalMemory(true):#,0} bytes");
    })
    .Replay()
    .AutoConnect(0);

await Task.Delay(2200);

Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Subscribing...");

// First subscription
await observable.Do(x =>
{
    if (x % 100_000 == 0)
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Emitted: {x:#,0}");
});

// Second subscription
Console.WriteLine($"Count: {await observable.Count():#,0}");

Observable 总共生成了 800,000 个值。 Replay机制立即连接到源,并在完成之前中途订阅。

输出:

16:54:19.893 > Produced: 100,000, TotalMemory: 635,784 bytes
16:54:20.341 > Produced: 200,000, TotalMemory: 1,164,376 bytes
16:54:20.840 > Produced: 300,000, TotalMemory: 2,212,992 bytes
16:54:21.354 > Produced: 400,000, TotalMemory: 2,212,992 bytes
16:54:21.543 > Subscribing...
16:54:21.616 > Emitted: 100,000
16:54:21.624 > Emitted: 200,000
16:54:21.633 > Emitted: 300,000
16:54:21.641 > Emitted: 400,000
16:54:21.895 > Produced: 500,000, TotalMemory: 4,313,344 bytes
16:54:21.897 > Emitted: 500,000
16:54:22.380 > Produced: 600,000, TotalMemory: 6,411,208 bytes
16:54:22.381 > Emitted: 600,000
16:54:22.868 > Produced: 700,000, TotalMemory: 6,411,600 bytes
16:54:22.869 > Emitted: 700,000
16:54:23.375 > Produced: 800,000, TotalMemory: 6,413,400 bytes
16:54:23.376 > Emitted: 800,000
Count: 800,000

订阅后内存使用量不断增长。这是意料之中的,因为所有值都被缓冲,并在重放的可观察对象的整个生命周期内保持缓冲状态。理想的行为是在订阅后内存使用率直线下降。传播缓冲值后应丢弃缓冲区,因为在订阅后它没有用处。此外,第二个订阅(await observable.Count())应该会失败并返回 InvalidOperationException。在失去 Replay 功能后,我不想再次订阅 Observable。

这是我要实现的自定义 ReplayOnce 运算符的存根。有人知道如何实施吗?

public static IConnectableObservable<T> ReplayOnce<T>(this IObservable<T> source)
{
    return source.Replay(); // TODO: enforce the subscribe-once policy
}

顺便说一句,有一个相关的问题 here,关于如何制作一个 Replay 运算符,其缓冲区可以根据需要偶尔清空。我的问题是不同的,因为我希望缓冲区在订阅后完全禁用,并且不再开始增长。

我提出了 ReplayOnce 运算符的实现,它基于多播自定义 ReplayOnceSubject<T>。这个主题最初由 ReplaySubject<T> 支持,在第一次(也是唯一允许的)订阅期间,它与正常的 Subject<T> 切换:

public static IConnectableObservable<T> ReplayOnce<T>(
    this IObservable<T> source)
{
    return source.Multicast(new ReplayOnceSubject<T>());
}

private class ReplayOnceSubject<T> : ISubject<T>
{
    private readonly object _locker = new object();
    private ISubject<T> _subject = new ReplaySubject<T>();

    public void OnNext(T value)
    {
        lock (_locker) _subject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        lock (_locker) _subject.OnError(error);
    }

    public void OnCompleted()
    {
        lock (_locker) _subject.OnCompleted();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (_locker)
        {
            if (_subject is ReplaySubject<T> replaySubject)
            {
                var subject = new Subject<T>();
                var subscription = subject.Subscribe(observer);
                // Now replay the buffered notifications
                replaySubject.Subscribe(subject).Dispose();
                _subject = subject;
                return subscription;
            }
            else
                throw new InvalidOperationException("Already subscribed.");
        }
    }
}

replaySubject.Subscribe(subject) 确保不仅将缓冲值传播给观察者,而且还会传播任何可能的 OnError/OnCompleted 通知。订阅后 ReplaySubject 不再被引用,并符合垃圾回收条件。