处理重放的错误,长期存在的 Observables
Handling errors for replaying, long-living Observables
StackBlitz: https://stackblitz.com/edit/angular-ivy-s8mzka
我有一个 Angular 模板,它使用 async
管道订阅 Observable。此 Observable 由以下人员创建:
- 订阅 NgRx Store Selector(为我们提供选定的用户过滤器)
- 切换映射到 API 具有选定用户过滤器的请求
- 在
switchMap
之后执行map
、filter
等多个操作
- 步骤 2 和 3 可以重复任意次数
switchMap
中的 API 请求可能由于多种原因而失败,这可能会产生错误通知。
我们想通过显示错误警报来处理该错误。这个错误警报也应该通过使用 async
管道订阅一个 Observable 来显示。这个 angry-path Observable 应该直接派生自 happy-path Observable,不使用中间 Subjects 或任何其他副作用。
我们在提出的解决方案中遇到的问题:
catchError
或 materialize
inside switchMap
:将错误包装在另一个对象中会更改通知的数据结构,并使用以下过滤器,如 map
、filter
等很辛苦。 debounceTime
或 delay
等运算符无法立即转发错误,因为它被视为 next
通知。
retry
:鉴于源 Observable 是一个重放的 Subject,这将在每次重新订阅时触发一个新的 API 调用,如果服务器不断响应错误,则会导致无限循环.另外,我们不能真正使用 retry
. 转发错误通知
- 调度操作,下一个主题或在
tap
中设置 this.error
:为像这样的微不足道的用例创建这样的副作用似乎是不必要的,并且违反了功能设计原则。
product.service.ts
getProducts$() {
return this.store.select(selectProductFilters).pipe(
switchMap(filters => this.http.get("/api/products?" + encodeFilters(filters))
// ... map, filter, delay
);
}
products.component.ts
products$: Observable<Product[]>
error$: Observable<string>
ngOnInit() {
this.products$ = this.productService.getProducts$();
this.error$ = this.products$.pipe(
// what can we do here to get errors as notifications
);
}
products.component.html
<div *ngIf="(products$ | async) as products">{{products | json}}</div>
<div class="error" *ngIf="(error$ | async) as error">{{error}}</div>
所以问题是:
我们如何为我们的模板创建两个 Observable:一个发出 next
通知,另一个发出 error
通知,同时在错误发生后保持源 Observable 活动?
编辑
我们正在为此寻找一个非常通用的解决方案 - 上面的示例仅使用了一个 switchMap
,但该解决方案应该适用于任何 Observable 管道。例如,假设服务看起来像这样:
product.service.ts
getProducts$() {
return this.store.select(selectProductFilters).pipe(
switchMap(filters => this.http.get("/api/products?" + encodeFilters(filters))
// ... map, filter, delay
switchMap(...)
// ... map, filter, delay
switchMap(...)
);
}
该解决方案应该能够处理任何这些 switchMap
语句中的错误并将它们传播到模板。
这是我的方法:
product.service.ts
getProducts$(): Observable<Product[] | { hasError: boolean }> {
return this.store$.pipe(
switchMap(
filters => this.httpClient.get("/api/products?" + encodeFilters(filters)).pipe(
catchError(err => of({ hasError: true, msg: 'an error' })),
)
),
share(),
)
}
products.component.ts
ngOnInit() {
const [products$, errors$] = partition(this.productService.getProducts$(), (v: any) => !v.hasError);
this.products$ = products$.pipe(
filter(products => products.length > 0),
map(products => products.map(product => ({name: product.name.toUpperCase(), price: product.price + ",00 €"}))),
delay(300)
);
this.products$ = merge(this.products$, errors$.pipe(mapTo(null)));
this.error$ = merge(this.products$.pipe(mapTo(null)), errors$);
}
我们在 getProducts()
中使用 share()
很重要。它所做的是在数据消费者和数据生产者之间放置一个 Subject
实例。这是必需的,因为在组件的 ngOnInit
中,将有多个订阅到同一来源(share()
以上的所有内容)。
partition
将订阅提供的 source
twice,每个部分一次。
然后,
this.products$ = merge(this.products$, errors$.pipe(mapTo(null)));
this.error$ = merge(this.products$.pipe(mapTo(null)), errors$);
确保如果我们显示了一些产品并且随后发生错误,则只会显示错误,反之亦然。
您现在可能想知道,订阅者不是太多了吗?我会说不。我上面提到的 Subject
将有 5 个订阅者。让我们看看为什么。
<div *ngIf="(products$ | async) as products">{{products | json}}</div>
将订阅 merge(this.products$, errors$.pipe(mapTo(null)));
,因此我们有 2 个订阅者。
然后,
<div class="error" *ngIf="(error$ | async) as error">{{error}}</div>
会订阅 merge(this.products$.pipe(mapTo(null)), errors$).pipe(debounceTime(0));
,但是第一个参数是第一个merge(...)
,所以它意味着另外2个订阅者,加上errors$
,所以总共有5个订阅者。
编辑
另一种方法:
product.service.ts
getProducts$(): Observable<Product[] | { hasError: boolean }> {
return this.store$.pipe(
switchMap(filters => this.httpClient.get("/api/products?" + encodeFilters(filters))),
filter(products => products.length > 0),
map(products => products.map(product => ({name: product.name.toUpperCase(), price: product.price + ",00 €"}))),
delay(300),
tap({ next: () => console.log('next notif'), error: () => console.log('error notif') }),
// we want to catch the error and pass it along with the `hasError` flag
// so that it can be intercepted by the `error$` stream
// at this point, the source (`store$`) is unsubscribed
// and that's why we use `throwError`, so that `retryWhen` can intercept it and eventually
// **re-subscribe** to the source, but only when it has something new to emit, otherwise
// we might run into an infinite loop
catchError(err => concat(of({ hasError: true, msg: err }), throwError(err))),
// `skip(1)` - we want to skip the error from which an error resulted
retryWhen(errors => errors.pipe(switchMapTo(this.store$.pipe(skip(1))))),
// using it because `partition` subscribes **twice** to the source
share(),
)
}
products.component.ts
ngOnInit() {
const [products$, errors$] = partition(this.productService.getProducts$(), (n) => !(n as any).hasError);
this.products$ = products$ as Observable<Product[]>;;
this.error$ = errors$;
}
StackBlitz: https://stackblitz.com/edit/angular-ivy-s8mzka
我有一个 Angular 模板,它使用 async
管道订阅 Observable。此 Observable 由以下人员创建:
- 订阅 NgRx Store Selector(为我们提供选定的用户过滤器)
- 切换映射到 API 具有选定用户过滤器的请求
- 在
switchMap
之后执行 - 步骤 2 和 3 可以重复任意次数
map
、filter
等多个操作
switchMap
中的 API 请求可能由于多种原因而失败,这可能会产生错误通知。
我们想通过显示错误警报来处理该错误。这个错误警报也应该通过使用 async
管道订阅一个 Observable 来显示。这个 angry-path Observable 应该直接派生自 happy-path Observable,不使用中间 Subjects 或任何其他副作用。
我们在提出的解决方案中遇到的问题:
catchError
或materialize
insideswitchMap
:将错误包装在另一个对象中会更改通知的数据结构,并使用以下过滤器,如map
、filter
等很辛苦。debounceTime
或delay
等运算符无法立即转发错误,因为它被视为next
通知。retry
:鉴于源 Observable 是一个重放的 Subject,这将在每次重新订阅时触发一个新的 API 调用,如果服务器不断响应错误,则会导致无限循环.另外,我们不能真正使用retry
. 转发错误通知
- 调度操作,下一个主题或在
tap
中设置this.error
:为像这样的微不足道的用例创建这样的副作用似乎是不必要的,并且违反了功能设计原则。
product.service.ts
getProducts$() {
return this.store.select(selectProductFilters).pipe(
switchMap(filters => this.http.get("/api/products?" + encodeFilters(filters))
// ... map, filter, delay
);
}
products.component.ts
products$: Observable<Product[]>
error$: Observable<string>
ngOnInit() {
this.products$ = this.productService.getProducts$();
this.error$ = this.products$.pipe(
// what can we do here to get errors as notifications
);
}
products.component.html
<div *ngIf="(products$ | async) as products">{{products | json}}</div>
<div class="error" *ngIf="(error$ | async) as error">{{error}}</div>
所以问题是:
我们如何为我们的模板创建两个 Observable:一个发出 next
通知,另一个发出 error
通知,同时在错误发生后保持源 Observable 活动?
编辑
我们正在为此寻找一个非常通用的解决方案 - 上面的示例仅使用了一个 switchMap
,但该解决方案应该适用于任何 Observable 管道。例如,假设服务看起来像这样:
product.service.ts
getProducts$() {
return this.store.select(selectProductFilters).pipe(
switchMap(filters => this.http.get("/api/products?" + encodeFilters(filters))
// ... map, filter, delay
switchMap(...)
// ... map, filter, delay
switchMap(...)
);
}
该解决方案应该能够处理任何这些 switchMap
语句中的错误并将它们传播到模板。
这是我的方法:
product.service.ts
getProducts$(): Observable<Product[] | { hasError: boolean }> {
return this.store$.pipe(
switchMap(
filters => this.httpClient.get("/api/products?" + encodeFilters(filters)).pipe(
catchError(err => of({ hasError: true, msg: 'an error' })),
)
),
share(),
)
}
products.component.ts
ngOnInit() {
const [products$, errors$] = partition(this.productService.getProducts$(), (v: any) => !v.hasError);
this.products$ = products$.pipe(
filter(products => products.length > 0),
map(products => products.map(product => ({name: product.name.toUpperCase(), price: product.price + ",00 €"}))),
delay(300)
);
this.products$ = merge(this.products$, errors$.pipe(mapTo(null)));
this.error$ = merge(this.products$.pipe(mapTo(null)), errors$);
}
我们在 getProducts()
中使用 share()
很重要。它所做的是在数据消费者和数据生产者之间放置一个 Subject
实例。这是必需的,因为在组件的 ngOnInit
中,将有多个订阅到同一来源(share()
以上的所有内容)。
partition
将订阅提供的 source
twice,每个部分一次。
然后,
this.products$ = merge(this.products$, errors$.pipe(mapTo(null)));
this.error$ = merge(this.products$.pipe(mapTo(null)), errors$);
确保如果我们显示了一些产品并且随后发生错误,则只会显示错误,反之亦然。
您现在可能想知道,订阅者不是太多了吗?我会说不。我上面提到的 Subject
将有 5 个订阅者。让我们看看为什么。
<div *ngIf="(products$ | async) as products">{{products | json}}</div>
将订阅 merge(this.products$, errors$.pipe(mapTo(null)));
,因此我们有 2 个订阅者。
然后,
<div class="error" *ngIf="(error$ | async) as error">{{error}}</div>
会订阅 merge(this.products$.pipe(mapTo(null)), errors$).pipe(debounceTime(0));
,但是第一个参数是第一个merge(...)
,所以它意味着另外2个订阅者,加上errors$
,所以总共有5个订阅者。
编辑
另一种方法:
product.service.ts
getProducts$(): Observable<Product[] | { hasError: boolean }> {
return this.store$.pipe(
switchMap(filters => this.httpClient.get("/api/products?" + encodeFilters(filters))),
filter(products => products.length > 0),
map(products => products.map(product => ({name: product.name.toUpperCase(), price: product.price + ",00 €"}))),
delay(300),
tap({ next: () => console.log('next notif'), error: () => console.log('error notif') }),
// we want to catch the error and pass it along with the `hasError` flag
// so that it can be intercepted by the `error$` stream
// at this point, the source (`store$`) is unsubscribed
// and that's why we use `throwError`, so that `retryWhen` can intercept it and eventually
// **re-subscribe** to the source, but only when it has something new to emit, otherwise
// we might run into an infinite loop
catchError(err => concat(of({ hasError: true, msg: err }), throwError(err))),
// `skip(1)` - we want to skip the error from which an error resulted
retryWhen(errors => errors.pipe(switchMapTo(this.store$.pipe(skip(1))))),
// using it because `partition` subscribes **twice** to the source
share(),
)
}
products.component.ts
ngOnInit() {
const [products$, errors$] = partition(this.productService.getProducts$(), (n) => !(n as any).hasError);
this.products$ = products$ as Observable<Product[]>;;
this.error$ = errors$;
}