如何防止 Observable 错误传播?

How to prevent Observable error propagation?

我有一项服务可以定期自动获取数据:

export class ApiService {
    interval$: BehaviorSubject<number> = new BehaviorSubject<number>(0); // Feed a 0 initially so it makes HTTP call even if auto-refresh is 0
                                                                         // variable ending in $ is common convention to mean it is an Observable
    constructor(private http: HttpClient) { }

    getApi(url: string, auto_refresh=false) {
        if (!auto_refresh)
            return this.http.get(url);

        return this.interval$.pipe(
            switchMap(duration => {
                if (duration == 0)
                    return this.http.get(url);

                return interval(duration * 1000).pipe(
                    startWith(0),
                    switchMap(() => this.http.get(url))
                )
            })
        );
    }

    updateInterval(i: number) {
        this.interval$.next(i);
    }
}

如果我在一个组件中做一些事情,这会很好用:

this.subscription = this.apiService.getApi('/api/foo/bar', true).subscribe(tempjson => {
    this.foo = tempjson;
});

如果我将自动刷新间隔设置为 1,它将每秒获取 /api/foo/bar

问题是如果 API return 是一个非 200 return 代码。在这种情况下,它似乎破坏了 Observable 并且再也不会尝试做 GET

我无法弄清楚这个问题的根本原因。我猜测某种异常正在从 Observable 传播出去并使 Observable 被销毁。但我不知道如何防止它。我尝试向订阅添加错误处理程序,但这没有任何区别:

this.subscriptions.push(this.apiService.getApi('/api/modem/lte_signal_info', true).subscribe(tempjson => {
  this.lte_signal_info = tempjson;
},
error => {
  console.log(error)
}));

我也试过在服务本身捕获错误,但似乎你不能吞下异常,你必须在完成后重新抛出它,如下所示:https://angular.io/guide/http#getting-error-details

根据 Observable 的设计,如果在 observable 管道中发生错误(异常),那么 observable 处于错误状态并且它不能发出新值(https://blog.angular-university.io/rxjs-error-handling/)并且它可以被认为是完成的[即它不能发出新的价值]。由于这个原因,如果 API returns 一个非 200 代码,您的 observable 处于错误状态并且它不会发出新值。

现在要在发生错误的情况下保持源可观察性(在您的情况下 interval 可观察性保持 运行 以防发生错误),处理可观察性中的错误,它通过使用抛出错误catchError 运算符。像这样更改您的代码:

getApi(url: string, auto_refresh=false) {
      if (!auto_refresh)
          return this.http.get(url);

      return this.interval$.pipe(
          switchMap(duration => {
              if (duration == 0)
                  return this.http.get(url)
                             .pipe(
                               catchError(err => {

                                 //return an observable as per your logic
                                 //for now I am returning error wrapped in an observable
                                 //as per your logic you may process the error
                                 return of(err);
                               })
                             );

              return interval(duration * 1000).pipe(
                  startWith(0),
                  switchMap(() => {
                    return this.http.get(url)
                             .pipe(
                               catchError(err => {
                                 //return an observable as per your logic
                                 //for now I am returning error wrapped in an observable
                                 //as per your logic you may process the error
                                 return of(err);
                               })
                             );
                  })
              )
          })
      );

当然,你可以写一个函数把重复的代码[正如你在上面的代码中看到的那样]并使用那个方法。

希望它能给你一个思路,解决你的问题。