RxJs - 我可以将 BehaviorSubject 与 takeUntil 一起使用吗?
RxJs - Can I use a BehaviorSubject with takeUntil?
我想使用 BehaviorSubject 使用 takeUntil 取消订阅另一个 Observable。当我用 takeUntil 订阅 Observable 时,它似乎立即取消订阅。此代码适用于主题,但我想要一个初始值集。
我正在使用 rxjs 5.5.6
//MyService1
class Observable1 {
status1: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
displayStatus1(val: boolean) {
this.status1.next(val)
}
}
//MyService2
class Observable2 {
status2: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
displayStatus2(val: boolean) {
this.status2.next(val)
}
}
//MyComponent
status: boolean;
constructor(private myService1: MyService1, private myService2: MyService2) {
this.subscribeToObservable1();
this.subscribeToObservable2();
}
subscribeToObservable1() {
this.myService1.status1.subscribe((val: boolean) => {
console.log('val: ', val);
}
}
subscribeToObservable2() {
this.myService2.status2
.takeUntil(this.myService1.status1)
.subscribe((val: boolean) => {
this.status = val;
}
}
您在 takeWhile
之后:(因为 takeUntil 不采用谓词)。
var bs = new Rx.BehaviorSubject<boolean>(false); //create beahviour subject
const source = Rx.Observable.interval(1000); //create observable
// take from obs while , behaviour subject not emitting true
const example = source.takeWhile ((a)=>bs.value!=true);
const subscribe = example.subscribe(val => console.log(val));
setTimeout(()=>bs.next(true),3000); //make the BehaviorSubject emit true and stop.
BehaviorSubject 将具有初始值。
因此,一旦您的 takeUntil
内部 订阅 它,它就会释放其初始值。
请参阅(此处)[http://reactivex.io/documentation/subject.html]
When an observer subscribes to a BehaviorSubject, it begins by emitting the item most recently emitted by the source Observable (or a seed/default value if none has yet been emitted) and then continues to emit any other items emitted later by the source Observable(s).
改用 PublishSubject
。
当然可以,只需使用 skip():
跳过第一个初始值
this.status2$
.pipe(
takeUntil(
this.status1$.pipe(skip(1))
),
)
.subscribe((val: boolean) => {
// I execute until status1$ emits
}
顺便说一句:从 RxJS >= 5.5 开始,您可以像我的示例一样使用 pipe。你也可以在末尾用 $
命名你的 observables,所以它显示为 "status1stream"
你也可以使用过滤器。
const source = new Subject();
const destory$ = new BehaviorSubject<boolean>(false);
source
.pipe(takeUntil(destory$.pipe(filter((v: boolean) => v))))
.subscribe(resp => {
console.log("resp", resp);
});
source.next("hello");
source.next("hello2");
destory$.next(true);
destory$.complete();
source.next("hello 3 ");
我想使用 BehaviorSubject 使用 takeUntil 取消订阅另一个 Observable。当我用 takeUntil 订阅 Observable 时,它似乎立即取消订阅。此代码适用于主题,但我想要一个初始值集。
我正在使用 rxjs 5.5.6
//MyService1
class Observable1 {
status1: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
displayStatus1(val: boolean) {
this.status1.next(val)
}
}
//MyService2
class Observable2 {
status2: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
displayStatus2(val: boolean) {
this.status2.next(val)
}
}
//MyComponent
status: boolean;
constructor(private myService1: MyService1, private myService2: MyService2) {
this.subscribeToObservable1();
this.subscribeToObservable2();
}
subscribeToObservable1() {
this.myService1.status1.subscribe((val: boolean) => {
console.log('val: ', val);
}
}
subscribeToObservable2() {
this.myService2.status2
.takeUntil(this.myService1.status1)
.subscribe((val: boolean) => {
this.status = val;
}
}
您在 takeWhile
之后:(因为 takeUntil 不采用谓词)。
var bs = new Rx.BehaviorSubject<boolean>(false); //create beahviour subject
const source = Rx.Observable.interval(1000); //create observable
// take from obs while , behaviour subject not emitting true
const example = source.takeWhile ((a)=>bs.value!=true);
const subscribe = example.subscribe(val => console.log(val));
setTimeout(()=>bs.next(true),3000); //make the BehaviorSubject emit true and stop.
BehaviorSubject 将具有初始值。
因此,一旦您的 takeUntil
内部 订阅 它,它就会释放其初始值。
请参阅(此处)[http://reactivex.io/documentation/subject.html]
When an observer subscribes to a BehaviorSubject, it begins by emitting the item most recently emitted by the source Observable (or a seed/default value if none has yet been emitted) and then continues to emit any other items emitted later by the source Observable(s).
改用 PublishSubject
。
当然可以,只需使用 skip():
跳过第一个初始值this.status2$
.pipe(
takeUntil(
this.status1$.pipe(skip(1))
),
)
.subscribe((val: boolean) => {
// I execute until status1$ emits
}
顺便说一句:从 RxJS >= 5.5 开始,您可以像我的示例一样使用 pipe。你也可以在末尾用 $
命名你的 observables,所以它显示为 "status1stream"
你也可以使用过滤器。
const source = new Subject();
const destory$ = new BehaviorSubject<boolean>(false);
source
.pipe(takeUntil(destory$.pipe(filter((v: boolean) => v))))
.subscribe(resp => {
console.log("resp", resp);
});
source.next("hello");
source.next("hello2");
destory$.next(true);
destory$.complete();
source.next("hello 3 ");