RxJS:如何订阅仅在订阅后才发出的 Observable 数据?

RxJS: How do I subscribe to Observable data that is emitted only after the moment of subscription?

上下文:我有很多ConnectableObservable,几乎所有的重播次数都是1或更多。在任何给定时间都有许多观察者订阅和取消订阅。

我想要什么: 在许多情况下,当观察者订阅其中一个可观察对象时,我不关心我可能预先存在的发射数据receive 是因为 observable 的数据重播机制。最近订阅的观察者唯一感兴趣的数据是订阅后发出的数据。

const observable = Rx.Observable
    .interval(100)
    .take(4)
    .publishReplay(3);

observable.connect();

问题: 据我所知,当观察者订阅可观察对象时,它无法知道它观察到的数据是在之前还是之后发出的订阅时刻。

observable.subscribe(x => console.log('observed', x));

setTimeout(() => 
    observable.subscribe(y => console.log('delayed observed', y)), 
    400
);

上面的代码会输出:

// => observed 0
// => observed 1
// => observed 2
// => delayed observed 0 **don't care**
// => delayed observed 1 **don't care**
// => delayed observed 2 **don't care**
// => observed 3
// => delayed observed 3

在这种假设情况下,延迟观察者只对订阅时刻之后发出的数据感兴趣;在这种情况下,3.

我已经搜索了 RxJS 5 reference docs,但似乎找不到银弹运算符来完成我所追求的。有什么想法吗?

你能做这样的事情吗?

const observableNoReplay = Rx.Observable
    .interval(100)
    .take(4);

const observable = observableNoReplay
    .publishReplay(3);

observable.connect();

您可以订阅当时需要的任何可观察对象,而不必担心任何类型的灵丹妙药。

您可以使用 .skipUntil(Rx.Observable.timer(0)),因为重放的元素将被同步重放,而 skipUntil 将在同步执行的那一刻将 Observable 的其余部分从同步执行中取出接收重播值。

此代码将产生您想要的结果:

const observable = Rx.Observable
    .interval(100)
    .take(4)
    .publishReplay(3);

observable.subscribe(x => console.log('observed', x));

setTimeout(() => 
    observable
        .skipUntil(Rx.Observable.timer(0))
        .subscribe(y => console.log('delayed observed', y)), 
    400
);

observable.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.min.js"></script>