RxJS - 重试或重置
RxJS - retry or reset
假设我有这样一个 sequence
:
Rx.Observable
.interval(1000)
.subscribe(data => {console.log(data)})
有了operators
,我怎么可以'restart
'这个顺序,意思是unsubscribe
和resubscribe
。
真实场景是 sequence
是套接字流,在某些条件下我们需要 unsubscribe
和 resubscribe
,有点像 retryWhen(errors)
的工作方式,但不会有错误...理想情况下应该是...retryWhen(bool:Subject)
.
我会使用 switchMap()
来完成它,因为它会自动取消订阅旧的 Observable 并订阅新的 Observable。在这种情况下,我们将仅使用 .switchMap(() => source)
:
const subject = new Subject();
const source = Observable.create(obs => {
console.log('Observable.create');
obs.next(42);
});
subject.switchMap(() => source)
.subscribe(v => console.log('next:', v));
setTimeout(() => subject.next(), 1000);
setTimeout(() => subject.next(), 5000);
这将打印以下内容:
Observable.create
next: 42
Observable.create
next: 42
您将拥有 WebSocket 源(或任何您拥有的)而不是 source
。
假设我有这样一个 sequence
:
Rx.Observable
.interval(1000)
.subscribe(data => {console.log(data)})
有了operators
,我怎么可以'restart
'这个顺序,意思是unsubscribe
和resubscribe
。
真实场景是 sequence
是套接字流,在某些条件下我们需要 unsubscribe
和 resubscribe
,有点像 retryWhen(errors)
的工作方式,但不会有错误...理想情况下应该是...retryWhen(bool:Subject)
.
我会使用 switchMap()
来完成它,因为它会自动取消订阅旧的 Observable 并订阅新的 Observable。在这种情况下,我们将仅使用 .switchMap(() => source)
:
const subject = new Subject();
const source = Observable.create(obs => {
console.log('Observable.create');
obs.next(42);
});
subject.switchMap(() => source)
.subscribe(v => console.log('next:', v));
setTimeout(() => subject.next(), 1000);
setTimeout(() => subject.next(), 5000);
这将打印以下内容:
Observable.create
next: 42
Observable.create
next: 42
您将拥有 WebSocket 源(或任何您拥有的)而不是 source
。