如何修复 Publish().RefCount() 行为的不一致?

How to fix the inconsistency of the Publish().RefCount() behavior?

最近我偶然发现了一个 by Enigmativity about the Publish and RefCount 运算符:

You're using the dangerous .Publish().RefCount() operator pair which creates a sequence that can't be subscribed to after it completes.

这个说法似乎与Lee Campbell 对这些运营商的评价背道而驰。引用他的书 Intro to Rx:

The Publish/RefCount pair is extremely useful for taking a cold observable and sharing it as a hot observable sequence for subsequent observers.

起初我不相信Enigmativity的说法是正确的,所以我试图反驳它。我的实验表明 Publish().RefCount() 可以是 确实不一致。第二次订阅已发布的序列可能会导致对源序列的新订阅,也可能不会,这取决于源序列在连接时是否已完成。如果已完成,则不会重新订阅。如果未完成,则将重新订阅。这是此行为的演示:

var observable = Observable
    .Create<int>(o =>
    {
        o.OnNext(13);
        o.OnCompleted(); // Commenting this line alters the observed behavior
        return Disposable.Empty;
    })
    .Do(x => Console.WriteLine($"Producer generated: {x}"))
    .Finally(() => Console.WriteLine($"Producer finished"))
    .Publish()
    .RefCount()
    .Do(x => Console.WriteLine($"Consumer received #{x}"))
    .Finally(() => Console.WriteLine($"Consumer finished"));

observable.Subscribe().Dispose();
observable.Subscribe().Dispose();

在此示例中,observable 由三部分组成。首先是生成单个值然后完成的生产部分。然后遵循发布机制(Publish+RefCount)。最后是消费部分,它观察生产者发出的值。 observable 被订阅了两次。预期的行为是每个订阅将收到一个值。但事实并非如此!这是输出:

Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Consumer finished

(Try it on fiddle)

如果我们注释 o.OnCompleted(); 行,这里是输出。这种细微的变化导致了预期和理想的行为:

Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Producer generated: 13
Consumer received #13
Producer finished
Consumer finished

在第一种情况下,cold 生产者(Publish().RefCount() 之前的部分)只被订阅了一次。第一个消费者收到了发出的值,但第二个消费者什么也没收到(除了 OnCompleted 通知)。在第二种情况下,生产者被订阅了两次。每次产生一个值,每个消费者得到一个值。

我的问题是:我们如何解决这个问题?我们如何修改 Publish 运算符或 RefCount 或两者,以使它们的行为始终一致且令人满意?以下是理想行为的规范:

  1. 已发布的序列应向其订阅者传播直接来自源序列的所有通知,而不是其他任何内容。
  2. 当订阅者的当前数量从零增加到一时,发布的序列应该订阅源序列。
  3. 只要至少有一个订阅者,发布的序列就应该与源保持连接。
  4. 当当前订阅者数量变为零时,已发布序列应从源取消订阅。

我要求提供上述功能的自定义 PublishRefCount 运算符,或者使用内置运算符实现所需功能的方法。

顺便说一句 similar question 存在,这会问为什么会这样。我的问题是关于如何修复它。


更新: 回想起来,上述规范导致了一种不稳定的行为,使得竞争条件不可避免。不能保证对已发布序列的两次订阅将导致对源序列的一次订阅。源序列可能在两次订阅之间完成,导致第一个订阅者取消订阅,导致 RefCount 运算符取消订阅,导致下一个订阅者对源进行新订阅。内置 .Publish().RefCount() 的行为阻止了这种情况的发生。

道德教训是 .Publish().RefCount() 序列没有被破坏,但它不可重用。它不能可靠地用于多个 connect/disconnect 会话。如果你想要第二个会话,你应该创建一个新的 .Publish().RefCount() 序列。

Lee good job 解释了 IConnectableObservable,但 Publish 解释得不是很好。这是一种非常简单的动物,很难解释。我假设你理解 IConnectableObservable:

如果我们简单而懒惰地重新实现零参数 Publish 函数,它看起来像这样:

//  For illustrative purposes only: don't use this code
public class PublishObservable<T> : IConnectableObservable<T>
{
    private readonly IObservable<T> _source;
    private readonly Subject<T> _proxy = new Subject<T>();
    private IDisposable _connection;
    
    public PublishObservable(IObservable<T> source)
    {
        _source = source;
    }
    
    public IDisposable Connect()
    {
        if(_connection == null)
            _connection = _source.Subscribe(_proxy);
        var disposable = Disposable.Create(() =>
        {
            _connection.Dispose();
            _connection = null;
        });
        return _connection;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        var _subscription = _proxy.Subscribe(observer);
        return _subscription;
    }
}

public static class X
{
    public static IConnectableObservable<T> Publish<T>(this IObservable<T> source)
    {
        return new PublishObservable<T>(source);
    }
}

Publish 创建一个订阅源可观察对象的代理 Subject。 proxy 可以subscribe/unsubscribe 到基于连接的source:调用Connect,proxy 订阅source。在一次性连接上调用 Dispose,代理取消订阅源。从中得出的重要想法是,有一个 Subject 代理与源的任何连接。不能保证只有一个源订阅,但可以保证有一个代理和一个并发连接。您可以通过 connecting/disconnecting.

进行多个订阅

RefCount 处理调用 Connect 部分事情:这是一个简单的重新实现:

//  For illustrative purposes only: don't use this code
public class RefCountObservable<T> : IObservable<T>
{
    private readonly IConnectableObservable<T> _source;
    private IDisposable _connection;
    private int _refCount = 0;

    public RefCountObservable(IConnectableObservable<T> source)
    {
        _source = source;
    }
    public IDisposable Subscribe(IObserver<T> observer)
    {
        var subscription = _source.Subscribe(observer);
        var disposable = Disposable.Create(() =>
        {
            subscription.Dispose();
            DecrementCount();
        });
        if(++_refCount == 1)
            _connection = _source.Connect();
            
        return disposable;
    }

    private void DecrementCount()
    {
        if(--_refCount == 0)
            _connection.Dispose();
    }
}
public static class X
{
    public static IObservable<T> RefCount<T>(this IConnectableObservable<T> source)
    {
        return new RefCountObservable<T>(source);
    }

}

更多代码,但仍然非常简单:如果 refcount 上升到 1,则在 ConnectableObservable 上调用 Connect,如果下降到 0,则断开连接。

将两者放在一起,你会得到一对保证只有一个 并发 订阅源可观察,通过一个持久 Subject 代理。 Subject 将仅在有 >0 个下游订阅时订阅源。


鉴于上述介绍,您的问题中存在很多误解,所以我将一一讨论:

... Publish().RefCount() can be indeed inconsistent. Subscribing a second time to a published sequence can cause a new subscription to the source sequence, or not, depending on whether the source sequence was completed while connected. If it was completed, then it won't be resubscribed. If it was not completed, then it will be resubscribed.

.Publish().RefCount() 将仅在一种情况下重新订阅源:当它从零订阅者变为 1 时。如果订阅者数量从 0 变为 1 变为 0 变为 1 reason 那么你最终会重新订阅。源可观察完成将导致 RefCount 发出 OnCompleted,并且其所有观察者取消订阅。因此,后续订阅 RefCount 将触发重新订阅源的尝试。自然地,如果 source 正确地观察了可观察的合约,它会立即发出 OnCompleted,就是这样。

[see sample observable with OnCompleted...] The observable is subscribed twice. The expected behavior would be that each subscription will receive one value.

没有。预期的行为是代理 Subject 在发出 OnCompleted 后将重新发出 OnCompleted 到任何后续订阅尝试。由于您的源可观察对象在您的第一个订阅结束时同步完成,因此第二个订阅将尝试订阅已经发出 OnCompletedSubject。这应该导致 OnCompleted,否则 Observable 契约将被破坏。

[see sample observable without OnCompleted as second case...] In the first case the cold producer (the part before the Publish().RefCount()) was subscribed only once. The first consumer received the emitted value, but the second consumer received nothing (except from an OnCompleted notification). In the second case the producer was subscribed twice. Each time it generated a value, and each consumer got one value.

这是正确的。由于代理 Subject 从未完成,后续对源的重新订阅将导致冷观察重新 运行.

My question is: how can we fix this? [..]

  1. The published sequence should propagate to its subscribers all notifications coming directly from the source sequence, and nothing else.
  2. The published sequence should subscribe to the source sequence when its current number of subscribers increases from zero to one.
  3. The published sequence should stay connected to the source as long as it has at least one subscriber.
  4. The published sequence should unsubscribe from the source when its current number of subscribers become zero.

目前 .Publish.RefCount 当前 以上所有情况都会发生,只要您不 complete/error。我不建议实施一个操作符来改变它,打破 Observable 契约。


编辑:

我认为与 Rx 混淆的第一大来源是 Hot/Cold observables。由于 Publish 可以 'warm-up' 冷可观察量,因此它会导致令人困惑的边缘情况也就不足为奇了。

首先,谈谈可观察合约。 Observable contract 更简洁的表述是 OnNext 永远不能跟在 OnCompleted/OnError 之后,应该只有一个 OnCompleted or OnError 通知。这确实留下了尝试订阅终止的可观察对象的边缘情况: 尝试订阅已终止的可观察对象会导致立即收到终止消息。这算违约吗?也许吧,但据我所知,这是图书馆里唯一的合同作弊。另一种方法是订阅 dead air。这对任何人都没有帮助。

这与 hot/cold observables 有什么关系?不幸的是,令人困惑。订阅一个冰冷的可观察对象会触发整个可观察管道的重建。这意味着 subscribe-to-already-terminated 规则只适用于热可观察对象。冷的 observables 总是重新开始。

考虑这段代码,其中 o 是冷可观察量。:

var o = Observable.Interval(TimeSpan.FromMilliseconds(100))
    .Take(5);
var s1 = o.Subscribe(i => Console.WriteLine(i.ToString()));
await Task.Delay(TimeSpan.FromMilliseconds(600));
var s2 = o.Subscribe(i => Console.WriteLine(i.ToString()));

为了合约的目的,s1后面的observable和s2后面的observable是完全不同的。因此,即使它们之间存在延迟,并且您最终会在 OnCompleted 之后看到 OnNext,但这不是问题,因为它们是完全不同的可观察值。

它的粘性在于预热 Publish 版本。如果您要在上面的代码中将 .Publish().RefCount() 添加到 o 的末尾...

  • 如果不做任何更改,s2 将立即终止打印任何内容。
  • 将延迟更改为 400 左右,s2 将打印最后两个数字。
  • s1 更改为仅 .Take(2)s2 将重新开始打印 0 到 4。

更糟糕的是 Shroedinger 的猫效应:如果您在 o 上设置一个观察器来观察整个过程中会发生什么,这会改变引用计数,影响功能!看着它,改变了行为。调试噩梦。

这是尝试 'warm-up' 冷观察的危险。它只是效果不佳,尤其是 Publish/RefCount.

我的建议是:

  1. 不要试图预热冷的 Observables。
  2. 如果您需要与冷或热可观察对象共享订阅,请遵守@Enigmativity 严格使用选择器 Publish 版本
  3. 的一般规则
  4. 如果必须,请在 Publish/RefCount 可观察对象上进行虚拟订阅。这至少提供了一致的 Refcount >= 1,减少了量子 activity 效应。

作为什洛莫 , this problem is associated with the Publish operator. The RefCount works fine. So it's the Publish that needs fixing. The Publish is nothing more than calling the Multicast operator with a standard Subject<T> as argument. Here is its source code:

public IConnectableObservable<TSource> Publish<TSource>(IObservable<TSource> source)
{
    return source.Multicast(new Subject<TSource>());
}

因此 Publish 运算符继承了 Subject class 的行为。 class 出于非常充分的理由,保持其完成状态。因此,如果您通过调用 subject.OnCompleted() 来表示其完成,该主题的任何未来订阅者将立即收到 OnCompleted 通知。此功能很好地服务于独立的主题及其订阅者,但当 Subject 用作源序列和该序列的订阅者之间的中间传播者时,它会成为一个有问题的工件。那是因为源序列已经保持了它自己的状态,并且在主体内部复制这个状态会引入两个状态变得不同步的风险。这正是 PublishRefCount 运算符结合使用时发生的情况。主体记得源已经完成,而源是一个冷序列,已经失去了对前世的记忆,愿意重新开始新的生活。

所以解决方案是为 Multicast 运算符提供无状态主题。不幸的是,我找不到一种基于内置 Subject<T> 来组合它的方法(继承不是一种选择,因为 class 是密封的)。幸运的是,从头开始实施它并不是很困难。下面的实现使用 ImmutableArray as storage for the subject's observers, and uses interlocked operations to ensure its thread-safety (much like the built-in Subject<T> implementation).

public class StatelessSubject<T> : ISubject<T>
{
    private IImmutableList<IObserver<T>> _observers
        = ImmutableArray<IObserver<T>>.Empty;

    public void OnNext(T value)
    {
        foreach (var observer in Volatile.Read(ref _observers))
            observer.OnNext(value);
    }
    public void OnError(Exception error)
    {
        foreach (var observer in Volatile.Read(ref _observers))
            observer.OnError(error);
    }
    public void OnCompleted()
    {
        foreach (var observer in Volatile.Read(ref _observers))
            observer.OnCompleted();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        ImmutableInterlocked.Update(ref _observers, x => x.Add(observer));
        return Disposable.Create(() =>
        {
            ImmutableInterlocked.Update(ref _observers, x => x.Remove(observer));
        });
    }
}

现在 Publish().RefCount() 可以通过替换它来修复:

.Multicast(new StatelessSubject<SomeType>()).RefCount()

此更改会产生理想的行为。发布的序列最初是冷的,第一次订阅时变热,最后一个订阅者取消订阅时又变冷。循环往复,没有往事的记忆

关于源序列完成的另一种正常情况,完成传播到所有订阅者,导致所有订阅者自动取消订阅,导致发布的序列变冷。最终结果是两个序列,即源序列和已发布序列,始终保持同步。它们要么都热,要么都冷。

这是一个 StatelessPublish 运算符,使 class 的使用更容易一些。

/// <summary>
/// Returns a connectable observable sequence that shares a single subscription to
/// the underlying sequence, without maintaining its state.
/// </summary>
public static IConnectableObservable<TSource> StatelessPublish<TSource>(
    this IObservable<TSource> source)
{
    return source.Multicast(new StatelessSubject<TSource>());
}

用法示例:

.StatelessPublish().RefCount()