Ping API 与 RXJS 连续,超时取消?
Ping API continuous with RXJS, timeout cancels?
我尝试使用 rxjs ping URL/API 连续。
我的第一次尝试是:
timer(3000, 2000)
.pipe(mergeMap(() => this._http.get(environment.pingUrl)))
.subscribe(res => console.log(res), err => console.log(err))
但是当 URL 无法访问时,我永远不会得到响应。所以我决定设置一个超时时间:
timer(3000, 2000)
.pipe(mergeMap(() => this._http.get(environment.pingUrl).pipe(timeout(5000)))
.subscribe(res => console.log(res), err => console.log(err))
现在,当 api 无法访问时,我得到了第一个 timeout/response,但似乎超时触发了对计时器的取消订阅,并且没有进一步的响应。我不知道如何阻止取消订阅。
据我了解,您想连续 ping API 并在无法访问此 API 时继续执行此操作。
你遇到的问题是,当 api 失败时,observable 已完成,不会再次触发。
然后简单地做
timer(2000, 3000)
.pipe(
mergeMap(() => this._http.get(environment.pingUrl)),
timeout(5000),
retryWhen(() => timer(3000))
)
.subscribe(
(res) => console.log(res),
(err) => console.log(err)
);
例如:https://stackblitz.com/edit/rxjs-playground-test-mgppcj
请务必在某个时候取消订阅此 observable。
关于错误
在 RxJS 中,Observables 有三种发射类型。
next
:steam中的一个值,可以有0到任意多个
error
:终端发射。这个可观察对象的实例有错误并被关闭。它永远不会再发射。
complete
:终端发射。可观察对象的这个实例已完成并关闭。它永远不会再发射。
你会注意到一个 observable 的实例永远不会发射超过一个 error
或 complete
emission,但是任何 observable 都可以是 运行 (或者重新启动/重试) 任意次数。每次您订阅或某些操作员为您订阅时,您都会创建一个新的 observable 实例。
创建发出错误但之后仍继续的流
由于 observables 永远不会在错误后发出,因此在继续时发出错误的唯一方法是捕获 error
发射并将其转换为 next
发射。
一个例子:
在此示例中,任何时候源可观察到的错误,我都会告诉 catchError
重新订阅源。
- 我在具有
{value: }
属性 的物体内部定期排放
- 我在带有
{error: }
属性. 的对象内部发出错误
我通过将它们打印到控制台来处理这些捕获的错误,但您可能想要做一些比这更好的事情:)
timer(3000, 2000).pipe(
mergeMap(_ => this._http.get(environment.pingUrl)),
map(value => ({value})),
timeout({each: 5000}),
catchError((error, src) => {
// Only catch/handle TimeoutError
if(error instanceof TimeoutError){
return src.pipe(startWith({error}));
} else {
return throwError(() => error);
}
})
).subscribe({
next: emitted => {
if("error" in emitted){
// Found an error! In this case, we only handled
// TimeoutErrors, so that'll appear here as a next
// emission. The observable is still running.
console.log("Caught an error: ", emitted.error);
}else{
// got a result!
console.log(emitted.value);
}
},
// All other errors appear here and are error emissions,
// so they're terminal. This observable is closed
error: err => console.log("Uncaught error: ", err),
// The complete emission is also terminal. This
// observable is now closed.
complete: () => console.log("Complete")
});
我尝试使用 rxjs ping URL/API 连续。
我的第一次尝试是:
timer(3000, 2000)
.pipe(mergeMap(() => this._http.get(environment.pingUrl)))
.subscribe(res => console.log(res), err => console.log(err))
但是当 URL 无法访问时,我永远不会得到响应。所以我决定设置一个超时时间:
timer(3000, 2000)
.pipe(mergeMap(() => this._http.get(environment.pingUrl).pipe(timeout(5000)))
.subscribe(res => console.log(res), err => console.log(err))
现在,当 api 无法访问时,我得到了第一个 timeout/response,但似乎超时触发了对计时器的取消订阅,并且没有进一步的响应。我不知道如何阻止取消订阅。
据我了解,您想连续 ping API 并在无法访问此 API 时继续执行此操作。
你遇到的问题是,当 api 失败时,observable 已完成,不会再次触发。
然后简单地做
timer(2000, 3000)
.pipe(
mergeMap(() => this._http.get(environment.pingUrl)),
timeout(5000),
retryWhen(() => timer(3000))
)
.subscribe(
(res) => console.log(res),
(err) => console.log(err)
);
例如:https://stackblitz.com/edit/rxjs-playground-test-mgppcj
请务必在某个时候取消订阅此 observable。
关于错误
在 RxJS 中,Observables 有三种发射类型。
next
:steam中的一个值,可以有0到任意多个error
:终端发射。这个可观察对象的实例有错误并被关闭。它永远不会再发射。complete
:终端发射。可观察对象的这个实例已完成并关闭。它永远不会再发射。
你会注意到一个 observable 的实例永远不会发射超过一个 error
或 complete
emission,但是任何 observable 都可以是 运行 (或者重新启动/重试) 任意次数。每次您订阅或某些操作员为您订阅时,您都会创建一个新的 observable 实例。
创建发出错误但之后仍继续的流
由于 observables 永远不会在错误后发出,因此在继续时发出错误的唯一方法是捕获 error
发射并将其转换为 next
发射。
一个例子:
在此示例中,任何时候源可观察到的错误,我都会告诉 catchError
重新订阅源。
- 我在具有
{value: }
属性 的物体内部定期排放
- 我在带有
{error: }
属性. 的对象内部发出错误
我通过将它们打印到控制台来处理这些捕获的错误,但您可能想要做一些比这更好的事情:)
timer(3000, 2000).pipe(
mergeMap(_ => this._http.get(environment.pingUrl)),
map(value => ({value})),
timeout({each: 5000}),
catchError((error, src) => {
// Only catch/handle TimeoutError
if(error instanceof TimeoutError){
return src.pipe(startWith({error}));
} else {
return throwError(() => error);
}
})
).subscribe({
next: emitted => {
if("error" in emitted){
// Found an error! In this case, we only handled
// TimeoutErrors, so that'll appear here as a next
// emission. The observable is still running.
console.log("Caught an error: ", emitted.error);
}else{
// got a result!
console.log(emitted.value);
}
},
// All other errors appear here and are error emissions,
// so they're terminal. This observable is closed
error: err => console.log("Uncaught error: ", err),
// The complete emission is also terminal. This
// observable is now closed.
complete: () => console.log("Complete")
});