处理重放的错误,长期存在的 Observables

Handling errors for replaying, long-living Observables

StackBlitz: https://stackblitz.com/edit/angular-ivy-s8mzka

我有一个 Angular 模板,它使用 async 管道订阅 Observable。此 Observable 由以下人员创建:

  1. 订阅 NgRx Store Selector(为我们提供选定的用户过滤器)
  2. 切换映射到 API 具有选定用户过滤器的请求
  3. switchMap
  4. 之后执行mapfilter等多个操作
  5. 步骤 2 和 3 可以重复任意次数

switchMap 中的 API 请求可能由于多种原因而失败,这可能会产生错误通知。

我们想通过显示错误警报来处理该错误。这个错误警报也应该通过使用 async 管道订阅一个 Observable 来显示。这个 angry-path Observable 应该直接派生自 happy-path Observable,不使用中间 Subjects 或任何其他副作用。

我们在提出的解决方案中遇到的问题:

product.service.ts

getProducts$() {
  return this.store.select(selectProductFilters).pipe(
    switchMap(filters => this.http.get("/api/products?" + encodeFilters(filters))
    // ... map, filter, delay
  );
}

products.component.ts

products$: Observable<Product[]>
error$: Observable<string>

ngOnInit() {
  this.products$ = this.productService.getProducts$();
  
  this.error$ = this.products$.pipe(
    // what can we do here to get errors as notifications
  );
}

products.component.html

<div *ngIf="(products$ | async) as products">{{products | json}}</div>
<div class="error" *ngIf="(error$ | async) as error">{{error}}</div>

所以问题是:

我们如何为我们的模板创建两个 Observable:一个发出 next 通知,另一个发出 error 通知,同时在错误发生后保持源 Observable 活动?

编辑

我们正在为此寻找一个非常通用的解决方案 - 上面的示例仅使用了一个 switchMap,但该解决方案应该适用于任何 Observable 管道。例如,假设服务看起来像这样:

product.service.ts

getProducts$() {
  return this.store.select(selectProductFilters).pipe(
    switchMap(filters => this.http.get("/api/products?" + encodeFilters(filters))
    // ... map, filter, delay
    switchMap(...)
    // ... map, filter, delay
    switchMap(...)
  );
}

该解决方案应该能够处理任何这些 switchMap 语句中的错误并将它们传播到模板。

这是我的方法:

product.service.ts

getProducts$(): Observable<Product[] | { hasError: boolean }> {
  return this.store$.pipe(
    switchMap(
      filters => this.httpClient.get("/api/products?" + encodeFilters(filters)).pipe(
        catchError(err => of({ hasError: true, msg: 'an error' })),
      )
    ),
    share(),
  )
}

products.component.ts

ngOnInit() {
  const [products$, errors$] = partition(this.productService.getProducts$(), (v: any) => !v.hasError);

  this.products$ = products$.pipe(
    filter(products => products.length > 0),
    map(products => products.map(product => ({name: product.name.toUpperCase(), price: product.price + ",00 €"}))),
    delay(300)
  );

  this.products$ = merge(this.products$, errors$.pipe(mapTo(null)));
  this.error$ = merge(this.products$.pipe(mapTo(null)), errors$);
}

我们在 getProducts() 中使用 share() 很重要。它所做的是在数据消费者和数据生产者之间放置一个 Subject 实例。这是必需的,因为在组件的 ngOnInit 中,将有多个订阅到同一来源(share() 以上的所有内容)。

partition 将订阅提供的 source twice,每个部分一次。

然后,

this.products$ = merge(this.products$, errors$.pipe(mapTo(null)));
this.error$ = merge(this.products$.pipe(mapTo(null)), errors$);

确保如果我们显示了一些产品并且随后发生错误,则只会显示错误,反之亦然。


您现在可能想知道,订阅者不是太多了吗?我会说不。我上面提到的 Subject 将有 5 个订阅者。让我们看看为什么。

<div *ngIf="(products$ | async) as products">{{products | json}}</div>

将订阅 merge(this.products$, errors$.pipe(mapTo(null)));,因此我们有 2 个订阅者。

然后,

<div class="error" *ngIf="(error$ | async) as error">{{error}}</div>

会订阅 merge(this.products$.pipe(mapTo(null)), errors$).pipe(debounceTime(0));,但是第一个参数是第一个merge(...),所以它意味着另外2个订阅者,加上errors$,所以总共有5个订阅者。

StackBlitz.


编辑

另一种方法:

product.service.ts

getProducts$(): Observable<Product[] | { hasError: boolean }> {
  return this.store$.pipe(
    switchMap(filters => this.httpClient.get("/api/products?" + encodeFilters(filters))),
    filter(products => products.length > 0),
    map(products => products.map(product => ({name: product.name.toUpperCase(), price: product.price + ",00 €"}))),
    delay(300),

    tap({ next: () => console.log('next notif'), error: () => console.log('error notif') }),

    // we want to catch the error and pass it along with the `hasError` flag
    // so that it can be intercepted by the `error$` stream
    // at this point, the source (`store$`) is unsubscribed
    // and that's why we use `throwError`, so that `retryWhen` can intercept it and eventually
    // **re-subscribe** to the source, but only when it has something new to emit, otherwise
    // we might run into an infinite loop 
    catchError(err => concat(of({ hasError: true, msg: err }), throwError(err))),
    
    // `skip(1)` - we want to skip the error from which an error resulted
    retryWhen(errors => errors.pipe(switchMapTo(this.store$.pipe(skip(1))))),
    
    // using it because `partition` subscribes **twice** to the source
    share(),
  )
}

products.component.ts

ngOnInit() {
  const [products$, errors$] = partition(this.productService.getProducts$(), (n) => !(n as any).hasError);

  this.products$ = products$ as  Observable<Product[]>;;
  this.error$ = errors$;
}

StackBlitz.