RxJS:如何订阅仅在订阅后才发出的 Observable 数据?
RxJS: How do I subscribe to Observable data that is emitted only after the moment of subscription?
上下文:我有很多ConnectableObservable
,几乎所有的重播次数都是1或更多。在任何给定时间都有许多观察者订阅和取消订阅。
我想要什么: 在许多情况下,当观察者订阅其中一个可观察对象时,我不关心我可能预先存在的发射数据receive 是因为 observable 的数据重播机制。最近订阅的观察者唯一感兴趣的数据是订阅后发出的数据。
const observable = Rx.Observable
.interval(100)
.take(4)
.publishReplay(3);
observable.connect();
问题: 据我所知,当观察者订阅可观察对象时,它无法知道它观察到的数据是在之前还是之后发出的订阅时刻。
observable.subscribe(x => console.log('observed', x));
setTimeout(() =>
observable.subscribe(y => console.log('delayed observed', y)),
400
);
上面的代码会输出:
// => observed 0
// => observed 1
// => observed 2
// => delayed observed 0 **don't care**
// => delayed observed 1 **don't care**
// => delayed observed 2 **don't care**
// => observed 3
// => delayed observed 3
在这种假设情况下,延迟观察者只对订阅时刻之后发出的数据感兴趣;在这种情况下,3
.
我已经搜索了 RxJS 5 reference docs,但似乎找不到银弹运算符来完成我所追求的。有什么想法吗?
你能做这样的事情吗?
const observableNoReplay = Rx.Observable
.interval(100)
.take(4);
const observable = observableNoReplay
.publishReplay(3);
observable.connect();
您可以订阅当时需要的任何可观察对象,而不必担心任何类型的灵丹妙药。
您可以使用 .skipUntil(Rx.Observable.timer(0))
,因为重放的元素将被同步重放,而 skipUntil
将在同步执行的那一刻将 Observable
的其余部分从同步执行中取出接收重播值。
此代码将产生您想要的结果:
const observable = Rx.Observable
.interval(100)
.take(4)
.publishReplay(3);
observable.subscribe(x => console.log('observed', x));
setTimeout(() =>
observable
.skipUntil(Rx.Observable.timer(0))
.subscribe(y => console.log('delayed observed', y)),
400
);
observable.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.min.js"></script>
上下文:我有很多ConnectableObservable
,几乎所有的重播次数都是1或更多。在任何给定时间都有许多观察者订阅和取消订阅。
我想要什么: 在许多情况下,当观察者订阅其中一个可观察对象时,我不关心我可能预先存在的发射数据receive 是因为 observable 的数据重播机制。最近订阅的观察者唯一感兴趣的数据是订阅后发出的数据。
const observable = Rx.Observable
.interval(100)
.take(4)
.publishReplay(3);
observable.connect();
问题: 据我所知,当观察者订阅可观察对象时,它无法知道它观察到的数据是在之前还是之后发出的订阅时刻。
observable.subscribe(x => console.log('observed', x));
setTimeout(() =>
observable.subscribe(y => console.log('delayed observed', y)),
400
);
上面的代码会输出:
// => observed 0
// => observed 1
// => observed 2
// => delayed observed 0 **don't care**
// => delayed observed 1 **don't care**
// => delayed observed 2 **don't care**
// => observed 3
// => delayed observed 3
在这种假设情况下,延迟观察者只对订阅时刻之后发出的数据感兴趣;在这种情况下,3
.
我已经搜索了 RxJS 5 reference docs,但似乎找不到银弹运算符来完成我所追求的。有什么想法吗?
你能做这样的事情吗?
const observableNoReplay = Rx.Observable
.interval(100)
.take(4);
const observable = observableNoReplay
.publishReplay(3);
observable.connect();
您可以订阅当时需要的任何可观察对象,而不必担心任何类型的灵丹妙药。
您可以使用 .skipUntil(Rx.Observable.timer(0))
,因为重放的元素将被同步重放,而 skipUntil
将在同步执行的那一刻将 Observable
的其余部分从同步执行中取出接收重播值。
此代码将产生您想要的结果:
const observable = Rx.Observable
.interval(100)
.take(4)
.publishReplay(3);
observable.subscribe(x => console.log('observed', x));
setTimeout(() =>
observable
.skipUntil(Rx.Observable.timer(0))
.subscribe(y => console.log('delayed observed', y)),
400
);
observable.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.min.js"></script>