rxjs 5.2:ReplaySubject 等待 3 秒,如果只有空则抛出超时
rxjs 5.2: ReplaySubject wait for 3 seconds and throw timeout if only empty
我的 Angular 服务中有 setter 和 getter,如下所示,我想要的是如果 vizItemStream$
不在 3 秒内,我想抛出超时错误。如果项目已设置或已设置,则不应抛出(如果非空)。
vizItemStream$ = new ReplaySubject<any>(1);
set visualizeItem(item: VizItem) {
this.vizItemStream$.next(item);
}
get visualizeItem$(): Observable<VizItem> {
//Only throw timeout if there are no items emitted in 3 seconds
// this.vizItemStream$.timeout(3000);
let sub = Observable.empty().delay(3000).subscribe(e => {
this.vizItemStream$.isEmpty().subscribe(e => {
this.vizItemStream$.error(new Error('Timeout'));
});
sub.unsubscribe();
});
return this.vizItemStream$.asObservable();
}
我尝试了不同的选项,但 none 我可以让它工作。任何帮助如何实现这一目标?
您可以使用 race
operator and a timer
来获得您正在寻找的行为:
import 'rxjs/add/observable/throw';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/concatMap';
import 'rxjs/add/operator/race';
get visualizeItem$(): Observable<VizItem> {
return this.vizItemStream$
.race(Observable
.timer(3000)
.concatMap(() => Observable.throw(new Error('Timeout'))))
.asObservable();
}
race
运算符:
Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables.
我的 Angular 服务中有 setter 和 getter,如下所示,我想要的是如果 vizItemStream$
不在 3 秒内,我想抛出超时错误。如果项目已设置或已设置,则不应抛出(如果非空)。
vizItemStream$ = new ReplaySubject<any>(1);
set visualizeItem(item: VizItem) {
this.vizItemStream$.next(item);
}
get visualizeItem$(): Observable<VizItem> {
//Only throw timeout if there are no items emitted in 3 seconds
// this.vizItemStream$.timeout(3000);
let sub = Observable.empty().delay(3000).subscribe(e => {
this.vizItemStream$.isEmpty().subscribe(e => {
this.vizItemStream$.error(new Error('Timeout'));
});
sub.unsubscribe();
});
return this.vizItemStream$.asObservable();
}
我尝试了不同的选项,但 none 我可以让它工作。任何帮助如何实现这一目标?
您可以使用 race
operator and a timer
来获得您正在寻找的行为:
import 'rxjs/add/observable/throw';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/concatMap';
import 'rxjs/add/operator/race';
get visualizeItem$(): Observable<VizItem> {
return this.vizItemStream$
.race(Observable
.timer(3000)
.concatMap(() => Observable.throw(new Error('Timeout'))))
.asObservable();
}
race
运算符:
Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables.