基于之前 Observable 的 Throttle
Throttle based on a previous Observable
我有以下设置,每 3 秒会向服务器发出一个新的 HTTP 请求。
getData(param1: string): Observable<any> {
return timer(0, 3000).pipe(
switchMap(() => this.http.get(param1))
);
}
如果给定请求花费的时间超过 3 秒,switchMap()
(我认为)将取消它并触发一个新请求。
现在,我想做的是,如果一个请求花费的时间超过 3 秒,它会等待它完成,然后再触发另一个请求。就上下文而言,我的想法是,如果请求存在性能问题,我的前端不会过早触发和取消请求。
我在某种程度上使它可以与以下内容一起使用:
currentObs: Observable<any>;
getData(param1: string): Observable<any> {
return timer(0, 3000).pipe(
throttle(_ => this.currentObs),
switchMap(() => {
this.currentObs = this.http.get(param1)
return this.currentObs;
})
);
}
这将跟踪 currentObs
,这是当前 HTTP 请求的可观察对象。然后它将它传递给 throttle()
方法,以便忽略通常提示新请求的 timer()
的值,直到请求 (currentObs
) 完成。
这似乎可行,但有点尴尬,因为我需要将某些状态保留在 pipe()
之外。这也有点令人困惑,因为限制是基于它之后发生的事件。我一直在寻找一种方法将 switchMap()
的结果传递给 throttle()
但首先我没有找到,其次,这不会导致 throttle()
到在管道的错误一侧?
是否有使用 RxJS 实现此目的的更简洁的方法?
编辑:
通过@Mrk Sef 对更优雅解决方案的回答和@kvetis 对处理错误的警告,我最终得到了以下将发出请求的管道,成功后等待 3 秒,然后再发出另一个请求。如果请求失败,它将等待 3 秒并再次请求。然后从头开始。
getData(param1: string): Observable<any> {
return this.http.get(param1).pipe(
repeatWhen(s => s.pipe(
delay(3000)
)),
retryWhen(s => s.pipe(
delay(3000)
))
);
}
你的解决方案是一个非常优雅的解决方案。走出 observables 的世界并在简单的回调中将状态保持在外面,你可能会更加肮脏。但我会说你正确地解决了这个问题。
只是要小心,如果请求失败,那么整个计时器都会失败。你需要
在 switchMap
和 currentObs
中恢复,如果你想继续下一个请求,即使前一个请求失败。由于 throttle
需要接收管道继续的值,因此您不应该只恢复到 EMTPY
。让我们发出 null。
getData(param1: string): Observable<any> {
return timer(0, 3000).pipe(
throttle(_ => this.currentObs),
switchMap(() => {
this.currentObs = this.http.get(param1).pipe(
catchError(e => {
console.error(e);
return of(null); // so the throttle continues with next value
return this.currentObs;
}),
filter(identity) // use identity from RxJS so we filter out the null
);
}
一般来说,您要实现的目标称为背压。您可以 google “RxJS 背压”并提出不同的技术。在大多数情况下,如果没有外部 Observable 将信息反馈给源 Observable,你就无法实现你想要的。
排气图
每3秒尝试运行 this.http.get
,如果前一个调用没有在3秒内完成,什么都不做,3秒后重试。
getData(param1: string): Observable<any> {
return timer(0, 3000).pipe(
exhaustMap(() => this.http.get(param1))
);
}
延迟重复
每当上一个通话结束,等待3秒,然后再次拨打电话
getData(param1: string): Observable<any> {
return this.http.get(param1).pipe(
repeatWhen(s => s.pipe(
delay(3000)
))
);
}
比较重复
每 3 秒重复一次通话,除非通话时间超过 3 秒,在这种情况下,在上一次通话结束后立即重复通话。
这与您描述的最接近。它通过使用静默计时器人为地“扩展”HTTP 调用来工作。这是有效的,因为 merge
在两个内部可观察对象完成之前不会完成。这意味着最快完成合并是 3 秒。
getData(param1: string): Observable<any> {
return merge(
this.http.get(param1),
timer(3000).pipe(
filter(_ => false)
)
).pipe(
repeatWhen(s => s)
);
}
我有以下设置,每 3 秒会向服务器发出一个新的 HTTP 请求。
getData(param1: string): Observable<any> {
return timer(0, 3000).pipe(
switchMap(() => this.http.get(param1))
);
}
如果给定请求花费的时间超过 3 秒,switchMap()
(我认为)将取消它并触发一个新请求。
现在,我想做的是,如果一个请求花费的时间超过 3 秒,它会等待它完成,然后再触发另一个请求。就上下文而言,我的想法是,如果请求存在性能问题,我的前端不会过早触发和取消请求。
我在某种程度上使它可以与以下内容一起使用:
currentObs: Observable<any>;
getData(param1: string): Observable<any> {
return timer(0, 3000).pipe(
throttle(_ => this.currentObs),
switchMap(() => {
this.currentObs = this.http.get(param1)
return this.currentObs;
})
);
}
这将跟踪 currentObs
,这是当前 HTTP 请求的可观察对象。然后它将它传递给 throttle()
方法,以便忽略通常提示新请求的 timer()
的值,直到请求 (currentObs
) 完成。
这似乎可行,但有点尴尬,因为我需要将某些状态保留在 pipe()
之外。这也有点令人困惑,因为限制是基于它之后发生的事件。我一直在寻找一种方法将 switchMap()
的结果传递给 throttle()
但首先我没有找到,其次,这不会导致 throttle()
到在管道的错误一侧?
是否有使用 RxJS 实现此目的的更简洁的方法?
编辑:
通过@Mrk Sef 对更优雅解决方案的回答和@kvetis 对处理错误的警告,我最终得到了以下将发出请求的管道,成功后等待 3 秒,然后再发出另一个请求。如果请求失败,它将等待 3 秒并再次请求。然后从头开始。
getData(param1: string): Observable<any> {
return this.http.get(param1).pipe(
repeatWhen(s => s.pipe(
delay(3000)
)),
retryWhen(s => s.pipe(
delay(3000)
))
);
}
你的解决方案是一个非常优雅的解决方案。走出 observables 的世界并在简单的回调中将状态保持在外面,你可能会更加肮脏。但我会说你正确地解决了这个问题。
只是要小心,如果请求失败,那么整个计时器都会失败。你需要
在 switchMap
和 currentObs
中恢复,如果你想继续下一个请求,即使前一个请求失败。由于 throttle
需要接收管道继续的值,因此您不应该只恢复到 EMTPY
。让我们发出 null。
getData(param1: string): Observable<any> {
return timer(0, 3000).pipe(
throttle(_ => this.currentObs),
switchMap(() => {
this.currentObs = this.http.get(param1).pipe(
catchError(e => {
console.error(e);
return of(null); // so the throttle continues with next value
return this.currentObs;
}),
filter(identity) // use identity from RxJS so we filter out the null
);
}
一般来说,您要实现的目标称为背压。您可以 google “RxJS 背压”并提出不同的技术。在大多数情况下,如果没有外部 Observable 将信息反馈给源 Observable,你就无法实现你想要的。
排气图
每3秒尝试运行 this.http.get
,如果前一个调用没有在3秒内完成,什么都不做,3秒后重试。
getData(param1: string): Observable<any> {
return timer(0, 3000).pipe(
exhaustMap(() => this.http.get(param1))
);
}
延迟重复
每当上一个通话结束,等待3秒,然后再次拨打电话
getData(param1: string): Observable<any> {
return this.http.get(param1).pipe(
repeatWhen(s => s.pipe(
delay(3000)
))
);
}
比较重复
每 3 秒重复一次通话,除非通话时间超过 3 秒,在这种情况下,在上一次通话结束后立即重复通话。
这与您描述的最接近。它通过使用静默计时器人为地“扩展”HTTP 调用来工作。这是有效的,因为 merge
在两个内部可观察对象完成之前不会完成。这意味着最快完成合并是 3 秒。
getData(param1: string): Observable<any> {
return merge(
this.http.get(param1),
timer(3000).pipe(
filter(_ => false)
)
).pipe(
repeatWhen(s => s)
);
}