RxJS - 重试或重置

RxJS - retry or reset

假设我有这样一个 sequence

Rx.Observable
.interval(1000)
.subscribe(data => {console.log(data)})

有了operators,我怎么可以'restart'这个顺序,意思是unsubscriberesubscribe

真实场景是 sequence 是套接字流,在某些条件下我们需要 unsubscriberesubscribe,有点像 retryWhen(errors) 的工作方式,但不会有错误...理想情况下应该是...retryWhen(bool:Subject).

我会使用 switchMap() 来完成它,因为它会自动取消订阅旧的 Observable 并订阅新的 Observable。在这种情况下,我们将仅使用 .switchMap(() => source):

const subject = new Subject();

const source = Observable.create(obs => {
    console.log('Observable.create');
    obs.next(42);
});

subject.switchMap(() => source)
    .subscribe(v => console.log('next:', v));


setTimeout(() => subject.next(), 1000);
setTimeout(() => subject.next(), 5000);

这将打印以下内容:

Observable.create
next: 42
Observable.create
next: 42

您将拥有 WebSocket 源(或任何您拥有的)而不是 source