Observable.Range 会破坏 Observable Contract 吗?

Does Observable.Range break The Observable Contract?

在学习 Rx 的过程中,我遇到了关于 Observables 的经常重复的规则,在 The Observable Contract.

中有详细说明

Upon issuing an OnCompleted or OnError notification, it may not thereafter issue any further notifications.

这对我来说很有意义,因为让 Observable 在完成后继续产生值会让人感到困惑,但是当我在 .NET 中测试 Observable.Range 方法时,我注意到它不会表现出这种行为,事实上 许多 Observables 违反了这个规则。

var rangeObservable = Observable.Range(0, 5);

rangeObservable.Subscribe(Console.WriteLine, () => Console.WriteLine("Done first!"));
Console.ReadLine();

rangeObservable.Subscribe(Console.WriteLine, () => Console.WriteLine("Done second!"));
Console.ReadLine();

//Output:
//0
//1
//2
//3
//4
//Done first!

//0
//1
//2
//3
//4
//Done second!

很明显,rangeObservable 调用了 OnComplete 两次,并在第一次 OnComplete 之后产生了值。这让我相信这不是关于 Observables 的规则,而是关于 Subscriptions 的规则。也就是说,一个 Observable 可以产生任意多的终止消息,甚至可以在它产生之后产生值,只要每个 Subscription 只接收一条终止消息,此后不再接收消息。

当它说 Observable 时,它们实际上是指 订阅 吗?它们真的是不同的东西吗?我对模型有根本性的误解吗?

可观察合同必须对任何被观察的可观察对象有效。 当 Observable 未被观察时是否发生任何事情留给 observable 的实现。

在 Enumerable 中考虑模拟是有帮助的——Observable 是 Enumerable 的对偶。在枚举中,你会有 range = Enumerable.Range(0, 5),你会使用类似于上面的范围:

range.ForEach(Console.WriteLine); //prints 0 - 4

range.ForEach(Console.WriteLine); //prints 0 - 4 again

并发现这是完全可以接受的行为,因为只有在调用 GetEnumerator 时才会创建实际的数字生成器。同样,在Observable中,等价的方法是Subscribe.

范围的实现类似于:

        static IObservable<int> Range(int start, int count)
        {
            return Observable.Create<int>(observer =>
            {
                for (int i = 0; i < count; i++)
                    observer.OnNext(start + i);

                observer.OnCompleted();

                return Disposable.Empty;
            });
        }

这里,每次有订阅都会调用observer => {...}函数。工作在 subscribe 方法中完成。您可以很容易地看到它 (1) 为每个观察者推送相同的序列,(2) 每个观察者只完成一次。

这些仅当您观察它们时才会发生某些事情的可观察对象称为冷可观察对象。 Here's an article 描述概念。

备注

Range 是一个非常幼稚的实现,仅用于说明目的。该方法在完成之前不会 return 一次性 - 所以 Disposable.Empty 是可以接受的。一个正确的实现将 运行 在调度程序上工作,并在继续循环之前使用已检查的可释放项来查看订阅是否已被释放。

要点是,手动实现可观察合约困难,这就是 Rx 库存在的原因——通过组合构建功能。

Observable.Range returns 一个 cold 可观察的,这意味着它 "replays" 它是每个订阅者的行为。由于 "OnNext* OnComplete|OnError" 合同仅适用于 订阅 ,这完全没问题。

有关 hot/cold 可观测值的更多信息,请参阅 my answer on "IConnectableObservables in Rx"