Ping API 与 RXJS 连续,超时取消?

Ping API continuous with RXJS, timeout cancels?

我尝试使用 rxjs ping URL/API 连续。

我的第一次尝试是:

timer(3000, 2000)
   .pipe(mergeMap(() => this._http.get(environment.pingUrl)))
   .subscribe(res => console.log(res), err => console.log(err))

但是当 URL 无法访问时,我永远不会得到响应。所以我决定设置一个超时时间:

timer(3000, 2000)
   .pipe(mergeMap(() => this._http.get(environment.pingUrl).pipe(timeout(5000)))
   .subscribe(res => console.log(res), err => console.log(err))

现在,当 api 无法访问时,我得到了第一个 timeout/response,但似乎超时触发了对计时器的取消订阅,并且没有进一步的响应。我不知道如何阻止取消订阅。

据我了解,您想连续 ping API 并在无法访问此 API 时继续执行此操作。

你遇到的问题是,当 api 失败时,observable 已完成,不会再次触发。

您将需要使用 retry or retryWhen

然后简单地做

timer(2000, 3000)
  .pipe(
    mergeMap(() => this._http.get(environment.pingUrl)),
    timeout(5000),
    retryWhen(() => timer(3000))
  )
  .subscribe(
    (res) => console.log(res),
    (err) => console.log(err)
  );

例如:https://stackblitz.com/edit/rxjs-playground-test-mgppcj

请务必在某个时候取消订阅此 observable。

关于错误

在 RxJS 中,Observables 有三种发射类型。

  1. next:steam中的一个值,可以有0到任意多个
  2. error:终端发射。这个可观察对象的实例有错误并被关闭。它永远不会再发射。
  3. complete:终端发射。可观察对象的这个实例已完成并关闭。它永远不会再发射。

你会注意到一个 observable 的实例永远不会发射超过一个 errorcomplete emission,但是任何 observable 都可以是 运行 (或者重新启动/重试) 任意次数。每次您订阅或某些操作员为您订阅时,您都会创建一个新的 observable 实例。

创建发出错误但之后仍继续的流

由于 observables 永远不会在错误后发出,因此在继续时发出错误的唯一方法是捕获 error 发射并将其转换为 next 发射。

一个例子:

在此示例中,任何时候源可观察到的错误,我都会告诉 catchError 重新订阅源。

  • 我在具有 {value: } 属性
  • 的物体内部定期排放
  • 我在带有 {error: } 属性.
  • 的对象内部发出错误

我通过将它们打印到控制台来处理这些捕获的错误,但您可能想要做一些比这更好的事情:)


timer(3000, 2000).pipe(
  mergeMap(_ => this._http.get(environment.pingUrl)),
  map(value => ({value})),
  timeout({each: 5000}),
  catchError((error, src) => {
    // Only catch/handle TimeoutError
    if(error instanceof TimeoutError){
      return src.pipe(startWith({error}));
    } else {
      return throwError(() => error);
    }
  })
).subscribe({
  next: emitted => {
    if("error" in emitted){
      // Found an error! In this case, we only handled
      // TimeoutErrors, so that'll appear here as a next
      // emission. The observable is still running.
      console.log("Caught an error: ", emitted.error);
    }else{
      // got a result!
      console.log(emitted.value);
    }
  },
  // All other errors appear here and are error emissions, 
  // so they're terminal. This observable is closed
  error: err => console.log("Uncaught error: ", err),
  // The complete emission is also terminal. This 
  // observable is now closed.
  complete: () => console.log("Complete")
});