rxjs 5.2:ReplaySubject 等待 3 秒,如果只有空则抛出超时

rxjs 5.2: ReplaySubject wait for 3 seconds and throw timeout if only empty

我的 Angular 服务中有 setter 和 getter,如下所示,我想要的是如果 vizItemStream$ 不在 3 秒内,我想抛出超时错误。如果项目已设置或已设置,则不应抛出(如果非空)。

vizItemStream$ = new ReplaySubject<any>(1);

set visualizeItem(item: VizItem) {
    this.vizItemStream$.next(item);
}

get visualizeItem$(): Observable<VizItem> {
    //Only throw timeout if there are no items emitted in 3 seconds
    // this.vizItemStream$.timeout(3000);
    let sub = Observable.empty().delay(3000).subscribe(e => {
        this.vizItemStream$.isEmpty().subscribe(e => {
            this.vizItemStream$.error(new Error('Timeout'));    
        });
        sub.unsubscribe();
    });        
    return this.vizItemStream$.asObservable();
}

我尝试了不同的选项,但 none 我可以让它工作。任何帮助如何实现这一目标?

您可以使用 race operator and a timer 来获得您正在寻找的行为:

import 'rxjs/add/observable/throw';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/concatMap';
import 'rxjs/add/operator/race';

get visualizeItem$(): Observable<VizItem> {
  return this.vizItemStream$
    .race(Observable
      .timer(3000)
      .concatMap(() => Observable.throw(new Error('Timeout'))))
    .asObservable();
}

race运算符:

Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables.