RxJS 5,将可观察对象转换为 BehaviorSubject(?)

RxJS 5, converting an observable to a BehaviorSubject(?)

我有一个父 observable,一旦它有订阅者,就会进行查找并发出单个值,然后完成。

我想将其转换为执行以下操作的可观察对象(或行为主题或任何有效的对象):一旦它至少有一个订阅者,它就会从父可观察对象(一次)获取结果。然后它向所有订阅者发送该值,并在所有未来订阅者订阅时向他们发送该单个值。它应该继续这种行为,即使它的订阅者数量下降到零。

看来这应该很容易。以下是无效的:

theValue$: Observable<boolean> = parent$
.take(1)
.share()

其他不起作用的东西:publishReplay()publish()。效果更好的东西:

theValue$ = new BehaviorSubject<boolean>(false);

parent$
.take(1)
.subscribe( value => theValue$.next(value));

不过,这种方法存在一个问题:parent$theValue$ 获得其第一个订阅者之前订阅。

有没有更好的方法来处理这个问题?

shareReplay 应该做你想做的事:

import 'rxjs/add/operator/shareReplay';
...
theValue$: Observable<boolean> = parent$.shareReplay(1);

shareReplay 是在 RxJS 5.4.0 版本中添加的。它 returns 一个引用计数的 observable 将订阅源 - parent$ - 在第一次订阅时。在源完成后进行的订阅将收到重播通知。

shareReplay - 和一般的 refCount - 在我最近写的一篇文章中有更详细的解释:RxJS: How to Use refCount.

我已经实现了一种将 Observables 转换为 BehaviorSubjects 的方法,因为我认为 shareReplay 方法 可读性不是很好未来参考。

import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';

export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
    const subject = new BehaviorSubject(initValue);

    observable.subscribe(
        (x: T) => {
            subject.next(x);
        },
        (err: any) => {
            subject.error(err);
        },
        () => {
            subject.complete();
        },
    );

    return subject;
}

这是 的改进版本。

import { Observable, BehaviorSubject } from 'rxjs';

export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
  const subject = new BehaviorSubject(initValue);
  const subscription = observable.subscribe(subject);
  return {
    subject,
    stopWatching: () => subscription.unsubscribe()
  };
}

请小心,因为返回的主题永远不会取消对源可观察对象的订阅。当您知道不再有对 subject 的引用时(例如,当视图组件是 destroyed/unmounted 时),您需要手动调用 stopWatching。否则你会发生内存泄漏。

不可能针对给定的问题制定绝对安全的解决方案。原因是行为主题有一个 value 属性,即使没有订阅该主题也必须始终更新,因此当每个人都取消订阅 [=12] 时,你不能自动取消订阅 observable =].

解决方案也不完美,因为结果不是 instanceof BehaviorSubject 并且 shareReplay 仅在订阅时记录值。