Angular 2 - 如何更改 RxJS Observable 的间隔

Angular 2 - How to change the interval of an RxJS Observable

我正在使用 rxJS Observable Interval 来刷新正在获取的数据。我想不出更改间隔设置的方法。我已经看到一些关于使用 Subject class 提供的 rxJS 的东西,但我无法让它工作。

我在此 plunk

中提供了一个简化示例

在 AppComponent 中我有这个方法。

getTime() {
        this.timeService.getTime(this.refreshInterval)
          .subscribe(t => {
            this.currentTime = t;
            console.log('Refresh interval is: ' + this.refreshInterval);
          }
        );
}

在服务组件中我目前有这段代码。

getTime(refreshInterval: number) {
  return Observable.interval(refreshInterval)
        .startWith(0)
        .map((res: any) => this.getDate())
        .catch(this.handleError)
}

有人能给我提供一个可行的例子吗?

据我从您的 plnkr 了解到,您的目标是允许用户修改计时器间隔。

如你所料,refreshInterval 的改变会改变 rxJs 声明的流:

    this.timeService.getTime(this.refreshInterval)
      .subscribe(t => {
        this.currentTime = t;
        console.log('Refresh interval is: ' + this.refreshInterval);
      }
    );

这是错误的。

每次更新refreshInterval,你需要:

  • 取消订阅或销毁之前的流。
  • 创建新流并 再次订阅

您不需要销毁并重新创建整个 Observable 流来更改 refreshInterval。您只需要更新依赖于变化间隔的流部分。

首先简化您的服务 getTime(),使其不负责确定输出频率。它所做的只是 return 时间:

getTime() { return (new Date()).toString(); }

现在调用代码将确定时间表。只需 3 个简单步骤:

1.一个调整到所需区间的源函数:

/** Observable waits for the current interval, then emits once */
refreshObs() {return Observable.timer(this.refreshInterval)}

2.一个使用repeat operator连续重新执行流的可观察链:

getTime$ = Observable.of(null)
            .switchMap(e=>this.refreshObs()) // wait for interval, then emit
            .map(() => this.timeService.getTime()) // get new time
            .repeat(); // start over

3.一个订阅触发整个事情:

ngOnInit(){
    this.getTime$.subscribe(t => {
        this.currentTime = t;
        console.log('refresh interval = '+this.refreshInterval);
    });
}

这是有效的,因为 refreshObs() return 每次重复流时都会有一个新的可观察对象,并且新的可观察对象将根据当前设置的时间间隔等待发射。

Live demo

我想在此(以及 Stack Overflow 上的其他地方)的基础上建立以前的答案。我的示例有一个通用的 RefreshService,各种组件都可以将其用于订阅。这样,一个站点可以有一个 "Refresh every X seconds" 组件,每个组件都可以订阅。

https://plnkr.co/edit/960yztjl3dqXQD2XPSei?p=preview

该服务提供的功能 withRefresh 提供 Observable。它利用 BehaviorSubject,这将立即触发订阅事件。

export class RefreshService {

  static interval$: BehaviorSubject<number> = new BehaviorSubject<number>(30000);

  setInterval(newInterval: number){
    RefreshService.interval$.next(newInterval);
  }

  public withRefresh() {
    return RefreshService.interval$
      .switchMap((int: number) => Observable
        .interval(int)
        .startWith(0)
      );
  }
}

那么任何组件都可以使用这个服务如下:

this.refreshService
  .withRefresh()
  .switchMap(() => /* do something on each interval of the timer */);