在渲染模板之前等待两个可观察对象(包括失败的一个)

Wait for two observables (incl. failed one) before rendering template

我有两个 Observable。模板的渲染应该只在 BOTH 个 Observables 完成或失败时开始:

(忽略<any>类型,这里只是为了简化)

组件:

@Component({
  selector: 'app-page',
  templateUrl: './page.component.html',
  changeDetection: ChangeDetectionStrategy.OnPush
})
export class PageComponent implements OnInit {
  obs1$ = new Subject<any>();
  obs2$ = new Subject<any>();

  isLoading = true;
  isObs1Error: boolean;
  isObs2Error: boolean;


  ngOnInit() {
    this.initializeDataRetrieval();
  }

  initializeDataRetrieval() {
    this.obs1$ = this.obs1Method();
    this.obs1$.subscribe((response: any) => {
      this.isObs1Error = false;
      this.obs1 = response;

      this.obs2$ = this.obs2Method();
      this.obs2$.subscribe((response: any) => {
        this.isObs2Error = false;
        this.isLoading = false;
        this.obs2 = response;
        this.cdr.detectChanges();
      });
    });
  }

  private obs1Method(): any {
    return this.obs1Service
      .getStuff()
      .pipe(
        catchError(() => {
          this.isError = true;
          this.isLoading = false;
          this.cdr.detectChanges();
          return EMPTY;
        })
      );
  }

  private obs2Method(): any {
    return this.obs2Service
      .getStuff()
      .pipe(
        catchError(() => {
          this.isObs2Error = true;
          this.isLoading = false;
          this.cdr.detectChanges();
          return EMPTY;
        })
      );
  }

  canDisplayContent(): boolean {
    return !this.isLoading && !this.isObs1Error;
  }

模板:

<ng-container *ngIf="isLoading">
  <app-loading-indicator></app-loading-indicator>
</ng-container>

<ng-container *ngIf="isObs1Error">
  <div class="error">
    This Obs1 stuff could not be loaded currently
  </div>
</ng-container>

<ng-container *ngIf="canDisplayContent()">
  <div class="error" *ngIf="isObs2Error">
    Technical error
  </div>
  More content here which is shown when at least Obs1 doesn't had an error
</div>

所以基本上:

我确定 TS 代码可以通过使用 ... 哪个 RxJS 运算符来简化?虽然通读 RxJS Operators for Dummies: forkJoin, zip, combineLatest, withLatestFrom 我不确定这些是否合适。据我了解,例如combineLatest 只有当两个流都成功完成时才会成功 ...

欢迎任何提示,谢谢。

这是我的做法:

const decorateObservable = (obs$, key) => obs$.pipe(
  mapTo(false), // `false` -> no error
  catchError(() => of(true)), // `true` -> error found
  map(value => ({ key, value })) // Identification
)

const base$ = merge(
  decorateObservable(obs1$, 'obs1'),
  decorateObservable(obs2$, 'obs2'),
).pipe(
  // When the source is unsubscribed(`error`/`complete`),
  finalize(() => this.isLoading = false),
  share(),
)

const obs1Displayed$ = base$.pipe(
  filter(o => o.key === 'obs1'),
  map(o => o.value),
)

const obs2Displayed$ = base$.pipe(
  filter(o => o.key === 'obs2'),
  map(o => o.value),
)

此处使用 share() 运算符,因为不需要多次订阅源。

在这种情况下,您将订阅 两次(在模板中),因为两个 displayed observables 都来自相同的基础. share 对数据生产者 多播 做了什么。

share 等同于 pipe(multicast(() => new Subject()), refCount())refCount 表示一旦第一个订阅者进来,生产者就会被调用

换句话说,数据消费者在某种程度上决定生产者何时开始其逻辑。

请注意,我假设 obs1$obs2$ 正在异步生成值。

本质上,它几乎和做的一样:

const s  = new Subject();

// The subscriptions happen inside the template
s.pipe(filter(o => o.key === 'obs1', ...).subscribe(observer1)
s.pipe(filter(o => o.key === 'obs2', ...).subscribe(observer2)

// And later on...
s.next({ key: 'obs1', value: false }) // `obs1$` emitted

这是模板:

<ng-container *ngIf="isLoading">
  <app-loading-indicator></app-loading-indicator>
</ng-container>

<ng-container *ngIf="obs1Displayed$ | async">
  <div class="error">
    This Obs1 stuff could not be loaded currently
  </div>
</ng-container>

<ng-container *ngIf="obs2Displayed$ | async">
  <div class="error">
    Technical error
  </div>
  More content here which is shown when at least Obs1 doesn't had an error
  </div>

我会考虑在这种情况下使用 forkJoin

代码如下所示

forkJoin(this.obs1Method(), this.obs2Method()).subscribe(
   ({resp1, resp2}) => {
        this.isLoading = false;
        this.obs2 = resp1;
        this.obs2 = resp2;
        this.cdr.detectChanges()
   }
)

您可能还需要稍微更改 obsxMethods 添加 tap 以在成功检索数据的情况下将错误属性设置为 false 并删除在subscribe,像这样

private obs1Method(): any {
    return this.obs1Service
      .getStuff()
      .pipe(
        tap(() => this.isError = false),
        catchError(() => {
          this.isError = true;
          return EMPTY;
        })
      );
  }

  private obs2Method(): any {
    return this.obs2Service
      .getStuff()
      .pipe(
        tap(() => this.isObs2Error = false),
        catchError(() => {
          this.isObs2Error = true;
          return EMPTY;
        })
      );
  }