在保持订阅的同时更改可观察流

Changing observable stream while keeping subscription

在 RxJS 中,我希望订阅在流发生变化时持续存在。下面我使用了一个间隔流来测试行为

//Works because foo$ is unchanged
let foo$ = Rx.Observable.interval(1000);
foo$.subscribe(x => console.log(`foo$: ${x}`));

//Doesn't work because bar$ is changed
let bar$ = Rx.Observable.never();
bar$.subscribe(x => console.log(`bar$: ${x}`))
bar$ = Rx.Observable.interval(1000);

jsbin Live Demo

如何在更改 bar$ 流时保持订阅? 更改 bar$ 后是否需要处理订阅并设置另一个订阅?

Rx 中的一般模式是用值流(即可观察值)替换状态突变。此处,不应重新分配 bar$,而是应将 bar$ 建模为 Observable<Observable<T>>(即 T 类型值流的流)。然后它可以 "flattened out" 变成值流(在这种情况下,使用 switch)。

例如:

const bar$ = new Rx.Subject();
bar$.switch().subscribe(x => console.log(`bar$: ${x}`));
bar$.onNext(Rx.Observable.fromArray([1,2,3]));
bar$.onNext(Rx.Observable.interval(1000).take(3));

https://jsbin.com/firoso/edit?js,console,output