如果内部有 forkjoin,请避免嵌套订阅

avoid nested subscribe if there is a forkjoin inside

这是我在 angular

中的代码
this.service.save(body).subscribe(
    resp => {
       this.dialog.confirmation({
          message: 'save object successfully!'
       })
       .subscribe((ok) => {
            if(ok) {
               this.pro.status = resp.status;
            this.loadingData(resp);
            const s1 = this.service.getSummary(this.id);
            const s2 = this.service.getCost(this.id);
            forkJoin([s1, s2]).subscribe([r1, r2]) => {
               this.view = r1;
               this.list = r2;
            }
          }
        });
     }
);

所以有很多级别的订阅。不仅难看,而且结果也不对,调试都查不出来。如何用 rxjs 运算符重写它?

您可以使用 RxJS 运算符简化它,如下所示:

// import { EMPTY, forkJoin } from 'rxjs';
// import { map, mergeMap } from 'rxjs/operators';

this.service
  .save(body)
  .pipe(
    mergeMap((result) =>
      // Merge the main observable with the dialog confirmation one..
      // and map it to an object that contains the result from both observables.
      this.dialog
        .confirmation({ message: 'save object successfully!' })
        .pipe(map((confirmed) => ({ result, confirmed })))
    ),
    mergeMap(({ result, confirmed }) => {
      if (confirmed) {
        this.pro.status = result.status;
        this.loadingData(result);
        const s1 = this.service.getSummary(this.id);
        const s2 = this.service.getCost(this.id);
        return forkJoin([s1, s2]);
      }
      // Don't emit any value, if the dialog is not confirmed:
      return EMPTY;
    })
  )
  .subscribe(([r1, r2]) => {
    this.view = r1;
    this.list = r2;
  });

注意:为了处理内存泄漏,强烈建议在您不再需要时从 observable 中unsubscribe,这可以基于在您的用例上,例如将 subscribe 函数结果分配给 Subscription 变量并在 ngOnDestroy 生命周期挂钩中调用 unsubscribe,或者使用 SubjecttakeUntil 运算符并在 ngOnDestroy.

中调用 next/complete 函数

这里是如何使用 unsubscribe 方法的例子:

// import { Subscription } from 'rxjs';

@Component({...})
export class AppComponent implements OnInit, OnDestroy {
    subscription: Subscription 
    ngOnInit(): void {
      this.subscription = this.service.save(body)
        // >>> pipe and other RxJS operators <<<
        .subscribe(([r1, r2]) => {
          this.view = r1;
          this.list = r2;
        });
    }
    ngOnDestroy() {
        this.subscription.unsubscribe()
    }
}

您可以在此处阅读更多相关信息:https://blog.bitsrc.io/6-ways-to-unsubscribe-from-observables-in-angular-ab912819a78f

这应该大致等同于:

this.service.save(body).pipe(

  mergeMap(resp => 
    this.dialog.confirmation({
      message: 'save object successfully!'
    }).pipe(
      // This filter acts like your `if(ok)` statement. There's no 
      // else block, so if it's not okay, then nothing happens. The 
      // view isn't updated etc.
      filter(ok => !!ok),
      mapTo(resp)
    )
  ),

  tap(resp => {
    this.pro.status = resp.status;
    // If the following line mutates service/global state,
    // it probably won't work as expected
    this.loadingData(resp); 
  }),

  mergeMap(_ => forkJoin([
    this.service.getSummary(this.id),
    this.service.getCost(this.id)
  ]))

).subscribe([view, list]) => {
  this.view = view;
  this.list = list;
});