管道 RxJS 可观察到现有主题

Pipe RxJS observable to existing subject

有一个正在使用的主题:

const fooSubject = new BehaviorSubject(null);

还有另一个可观察对象(本例中的另一个主题):

const barSubject = new Subject();
barSubject.subscribe(
  value => fooSubject.next(),
  err => fooSubject.error(err),
  () => fooSubject.complete()
);

barSubject.next('bar');

代码有效,但看起来很笨拙。

是否有更好的方法将barSubject observable(广义上,不一定使用pipe 运算符)管道化到fooSubject?它看起来像是一个可以由库本身处理的操作。

由于 Subject 已经是具有方法 next()error()complete() 的观察者,您可以将它订阅到任何 Observable:

const fooSubject = new BehaviorSubject(null);

const barSubject = new Subject();
barSubject.subscribe(fooSubject);

barSubject.next('bar');

关于在源 Observable 完成后取消订阅,我一直在使用这段代码。它按预期运行,但我不知道它是否是“反模式”...?

const subscription = this.http.get(url)
  .pipe(finalize(() => subscription.unsubscribe()))
  .subscribe(this.mySubject$);

编辑:您不需要取消订阅 http.get(..),因为它是 done automatically。因此,对于上面的代码,正确的形式是:

this.http.get(url).subscribe(mySubject$)

编辑 2:上面代码的一个问题是,当 http.get 完成时,mySubject$ 也将完成。现在,如果你 .subscribe(mySubject$)mySubject$.next(..) 它不会发出值。为避免这种情况并保持 mySubject$ 热,请使用此代码:

this.http.get(url).subscribe(r => this.mySubject$.next(r))