基于之前 Observable 的 Throttle

Throttle based on a previous Observable

我有以下设置,每 3 秒会向服务器发出一个新的 HTTP 请求。

  getData(param1: string): Observable<any> {
    return timer(0, 3000).pipe(
      switchMap(() => this.http.get(param1))
    );
  }

如果给定请求花费的时间超过 3 秒,switchMap()(我认为)将取消它并触发一个新请求。

现在,我想做的是,如果一个请求花费的时间超过 3 秒,它会等待它完成,然后再触发另一个请求。就上下文而言,我的想法是,如果请求存在性能问题,我的前端不会过早触发和取消请求。

我在某种程度上使它可以与以下内容一起使用:

currentObs: Observable<any>;

getData(param1: string): Observable<any> {
  return timer(0, 3000).pipe(
    throttle(_ => this.currentObs),
    switchMap(() => {
      this.currentObs = this.http.get(param1)
      return this.currentObs;
    })
  );
}

这将跟踪 currentObs,这是当前 HTTP 请求的可观察对象。然后它将它传递给 throttle() 方法,以便忽略通常提示新请求的 timer() 的值,直到请求 (currentObs) 完成。

这似乎可行,但有点尴尬,因为我需要将某些状态保留在 pipe() 之外。这也有点令人困惑,因为限制是基于它之后发生的事件。我一直在寻找一种方法将 switchMap() 的结果传递给 throttle() 但首先我没有找到,其次,这不会导致 throttle() 到在管道的错误一侧?

是否有使用 RxJS 实现此目的的更简洁的方法?

编辑:

通过@Mrk Sef 对更优雅解决方案的回答和@kvetis 对处理错误的警告,我最终得到了以下将发出请求的管道,成功后等待 3 秒,然后再发出另一个请求。如果请求失败,它将等待 3 秒并再次请求。然后从头开始。

getData(param1: string): Observable<any> {
  return this.http.get(param1).pipe(
    repeatWhen(s => s.pipe(
      delay(3000)
    )),
    retryWhen(s => s.pipe(
      delay(3000)
    ))
  );
}

你的解决方案是一个非常优雅的解决方案。走出 observables 的世界并在简单的回调中将状态保持在外面,你可能会更加肮脏。但我会说你正确地解决了这个问题。

只是要小心,如果请求失败,那么整个计时器都会失败。你需要 在 switchMapcurrentObs 中恢复,如果你想继续下一个请求,即使前一个请求失败。由于 throttle 需要接收管道继续的值,因此您不应该只恢复到 EMTPY。让我们发出 null。

getData(param1: string): Observable<any> {
  return timer(0, 3000).pipe(
    throttle(_ => this.currentObs),
    switchMap(() => {
      this.currentObs = this.http.get(param1).pipe(
          catchError(e => {
              console.error(e);
              return of(null); // so the throttle continues with next value
      return this.currentObs;
    }),
    filter(identity) // use identity from RxJS so we filter out the null
  );
}

一般来说,您要实现的目标称为背压。您可以 google “RxJS 背压”并提出不同的技术。在大多数情况下,如果没有外部 Observable 将信息反馈给源 Observable,你就无法实现你想要的。

排气图

每3秒尝试运行 this.http.get,如果前一个调用没有在3秒内完成,什么都不做,3秒后重试。

getData(param1: string): Observable<any> {
  return timer(0, 3000).pipe(
    exhaustMap(() => this.http.get(param1))
  );
}

延迟重复

每当上一个通话结束,等待3秒,然后再次拨打电话

getData(param1: string): Observable<any> {
  return this.http.get(param1).pipe(
    repeatWhen(s => s.pipe(
      delay(3000)
    ))
  );
}

比较重复

每 3 秒重复一次通话,除非通话时间超过 3 秒,在这种情况下,在上一次通话结束后立即重复通话。

这与您描述的最接近。它通过使用静默计时器人为地“扩展”HTTP 调用来工作。这是有效的,因为 merge 在两个内部可观察对象完成之前不会完成。这意味着最快完成合并是 3 秒。

getData(param1: string): Observable<any> {
  return merge(
    this.http.get(param1),
    timer(3000).pipe(
      filter(_ => false)
    )
  ).pipe(
    repeatWhen(s => s)
  );
}