动态改变轮询间隔

Dynamically changed polling interval

我需要我的前端应用程序轮询我的后端应用程序并刷新显示的数据。我以某种方式解决了它,但我觉得它可以优化。这是我的解决方案:

        this.pollDataSource$ = new BehaviorSubject<number>(this.pollingInterval);

        this.pollDataSource$
            .pipe(
                switchMap(duration =>
                    of(duration)
                        .pipe(
                            filter(duration => duration !== 0),
                            delay(duration),
                            concatMap(() => this.kubernetesObjectList$)
                        )
                )
            )
            .subscribe({
                next: (kubernetesObjectList) => {
                    this.dataSource.data = kubernetesObjectList;
                    if (this.pollingInterval > 0) {
                        this.pollDataSource$.next(this.pollingInterval);
                    }
                }
            });

        this.pollDataSource$.next(-1);

所以,我有一个影响 this.pollingInterval 的下拉选择器。另外,我有 this.pollDataSource$,这是一个 BehaviorSubject<number>。它发出一个 number 被用作下一次轮询之前的持续时间。

this.pollDataSource$发出-1时(当用户点击刷新按钮时发生),必须立即轮询数据源,不管轮询是什么间隔已设置。

this.pollDataSource$发出一些正数时(当用户从​​下拉选择器中选择某个轮询间隔时发生),这个数字必须用作下一次刷新之前的持续时间。

this.pollDataSource$发出0时(当用户在同一下拉选择器中选择停止轮询选项时发生),我们必须停止轮询,直到用户选择新的轮询间隔。

一切都很完美。页面已加载,默认情况下 this.pollingInterval 具有 10000,因此用户会立即获取数据,但在 10 秒后会更新。当用户点击 刷新 时,数据正在更新,下一次自动刷新会在 10 秒后发生。当用户切换到 Stop Polling 时,数据保持不变。当用户切换到另一个时间间隔时,数据将再次更新。一切都很好!但我觉得我的解决方案不是最优的。我只是不喜欢这种结构:...pipe...switchMap...of...pipe...有没有办法简化它?

提前感谢所有线索。

我提到 expand 的原因是可以使用此运算符代替构造

someSubject.pipe(
  // do stuff
).subscribe(
  next: data => {
    // do some more stuff
    someSubject.next(something)
  }

这是您实际使用的构造,因此有关 expand 的建议。

那么,不介绍expand,回到你的问题,你可能会考虑类似

this.pollDataSource$
        .pipe(
            switchMap(duration =>
                return duration === 0 ?
                  NEVER :
                  this.kubernetesObjectList$.pipe(delay(duration));
            )
        )
        .subscribe({
            next: (kubernetesObjectList) => {
                this.dataSource.data = kubernetesObjectList;
                if (this.pollingInterval > 0) {
                    this.pollDataSource$.next(this.pollingInterval);
                }
            }
        });

    this.pollDataSource$.next(-1);

expand 看起来像

this.pollDataSource$
            .pipe(
                expand(duration =>
                    return duration === 0 ?
                      NEVER :
                      this.kubernetesObjectList$.pipe(
                         delay(duration),
                         map(() =>  duration)
                      );
                )
            )
            .subscribe({
                next: (kubernetesObjectList) => {
                    this.dataSource.data = kubernetesObjectList;
                }
            });

        this.pollDataSource$.next(-1);

这些版本的代码是否比我发现清晰易读的原始代码更清晰,可能是个人品味的问题。

我使用的所有 "could" 和 "would" 是因为我没有测试此代码的游乐场,因此您很有可能会找到一些东西不在这里工作。

如果您想按时间间隔进行轮询,请考虑尝试以下操作:

const intervalDelays$ = new BehaviorSubject(10000);

intervalDelays.pipe(
  switchMap(ms => ms === 0 ? EMPTY : interval(ms).pipe(
    concatMap(() => fetchDataObservableHere$)
  ))
);

interval 在后台使用 setInterval。只需确保 msfetchDataObservableHere$ 完成 的时间长。否则你最终会遇到背压问题。

如果担心背压,您可以这样做:

const intervalDelays$ = new BehaviorSubject(10000);

intervalDelays.pipe(
  switchMap(ms => ms < 0
    // No wait? We'll assume it's "off"
    ? EMPTY
    // just an observable to start the recursive expand call.
    // 'start' doesn't matter. Could be `true` or anything, really.
    : of('start').pipe(
      expand(() => timer(ms).pipe(
        switchMap(() => fetchDataObservableHere$),
        tap(data => {
          // PROCESS DATA HERE
          // this ensures that it's done processing before you move on.
        }),
      )),
  ))
);

以上代码将看到对间隔延迟的传入更改,并启动一个新的内部可观察对象,它将递归执行,等待每个值返回并在发出另一个请求之前进行处理.