RxJS 5,将可观察对象转换为 BehaviorSubject(?)
RxJS 5, converting an observable to a BehaviorSubject(?)
我有一个父 observable,一旦它有订阅者,就会进行查找并发出单个值,然后完成。
我想将其转换为执行以下操作的可观察对象(或行为主题或任何有效的对象):一旦它至少有一个订阅者,它就会从父可观察对象(一次)获取结果。然后它向所有订阅者发送该值,并在所有未来订阅者订阅时向他们发送该单个值。它应该继续这种行为,即使它的订阅者数量下降到零。
看来这应该很容易。以下是无效的:
theValue$: Observable<boolean> = parent$
.take(1)
.share()
其他不起作用的东西:publishReplay()
、publish()
。效果更好的东西:
theValue$ = new BehaviorSubject<boolean>(false);
parent$
.take(1)
.subscribe( value => theValue$.next(value));
不过,这种方法存在一个问题:parent$
在 theValue$
获得其第一个订阅者之前订阅。
有没有更好的方法来处理这个问题?
shareReplay
应该做你想做的事:
import 'rxjs/add/operator/shareReplay';
...
theValue$: Observable<boolean> = parent$.shareReplay(1);
shareReplay
是在 RxJS 5.4.0 版本中添加的。它 returns 一个引用计数的 observable 将订阅源 - parent$
- 在第一次订阅时。在源完成后进行的订阅将收到重播通知。
shareReplay
- 和一般的 refCount
- 在我最近写的一篇文章中有更详细的解释:RxJS: How to Use refCount.
我已经实现了一种将 Observables 转换为 BehaviorSubjects 的方法,因为我认为 shareReplay 方法 可读性不是很好未来参考。
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
const subject = new BehaviorSubject(initValue);
observable.subscribe(
(x: T) => {
subject.next(x);
},
(err: any) => {
subject.error(err);
},
() => {
subject.complete();
},
);
return subject;
}
这是 的改进版本。
import { Observable, BehaviorSubject } from 'rxjs';
export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
const subject = new BehaviorSubject(initValue);
const subscription = observable.subscribe(subject);
return {
subject,
stopWatching: () => subscription.unsubscribe()
};
}
请小心,因为返回的主题永远不会取消对源可观察对象的订阅。当您知道不再有对 subject
的引用时(例如,当视图组件是 destroyed/unmounted 时),您需要手动调用 stopWatching
。否则你会发生内存泄漏。
不可能针对给定的问题制定绝对安全的解决方案。原因是行为主题有一个 value
属性,即使没有订阅该主题也必须始终更新,因此当每个人都取消订阅 [=12] 时,你不能自动取消订阅 observable
=].
解决方案也不完美,因为结果不是 instanceof BehaviorSubject
并且 shareReplay
仅在订阅时记录值。
我有一个父 observable,一旦它有订阅者,就会进行查找并发出单个值,然后完成。
我想将其转换为执行以下操作的可观察对象(或行为主题或任何有效的对象):一旦它至少有一个订阅者,它就会从父可观察对象(一次)获取结果。然后它向所有订阅者发送该值,并在所有未来订阅者订阅时向他们发送该单个值。它应该继续这种行为,即使它的订阅者数量下降到零。
看来这应该很容易。以下是无效的:
theValue$: Observable<boolean> = parent$
.take(1)
.share()
其他不起作用的东西:publishReplay()
、publish()
。效果更好的东西:
theValue$ = new BehaviorSubject<boolean>(false);
parent$
.take(1)
.subscribe( value => theValue$.next(value));
不过,这种方法存在一个问题:parent$
在 theValue$
获得其第一个订阅者之前订阅。
有没有更好的方法来处理这个问题?
shareReplay
应该做你想做的事:
import 'rxjs/add/operator/shareReplay';
...
theValue$: Observable<boolean> = parent$.shareReplay(1);
shareReplay
是在 RxJS 5.4.0 版本中添加的。它 returns 一个引用计数的 observable 将订阅源 - parent$
- 在第一次订阅时。在源完成后进行的订阅将收到重播通知。
shareReplay
- 和一般的 refCount
- 在我最近写的一篇文章中有更详细的解释:RxJS: How to Use refCount.
我已经实现了一种将 Observables 转换为 BehaviorSubjects 的方法,因为我认为 shareReplay 方法 可读性不是很好未来参考。
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
const subject = new BehaviorSubject(initValue);
observable.subscribe(
(x: T) => {
subject.next(x);
},
(err: any) => {
subject.error(err);
},
() => {
subject.complete();
},
);
return subject;
}
这是
import { Observable, BehaviorSubject } from 'rxjs';
export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
const subject = new BehaviorSubject(initValue);
const subscription = observable.subscribe(subject);
return {
subject,
stopWatching: () => subscription.unsubscribe()
};
}
请小心,因为返回的主题永远不会取消对源可观察对象的订阅。当您知道不再有对 subject
的引用时(例如,当视图组件是 destroyed/unmounted 时),您需要手动调用 stopWatching
。否则你会发生内存泄漏。
不可能针对给定的问题制定绝对安全的解决方案。原因是行为主题有一个 value
属性,即使没有订阅该主题也必须始终更新,因此当每个人都取消订阅 [=12] 时,你不能自动取消订阅 observable
=].
instanceof BehaviorSubject
并且 shareReplay
仅在订阅时记录值。