如何使用 MergeMap 或 FlatMap 或使用 rxJs-operators 的更好方法编写以下代码?

How to write below code using MergeMap or FlatMap or some better way with rxJs-operators?

我有两个可观察的管道。我需要一个接一个 运行 并比较两个值是否相等。我尝试了下面的 code.This 应该可以工作,当发出第一个可观察值时,它应该去获取第二个 obserbla 值并且应该首先比较它 return value.I 需要一些专家帮助,以 refator这段代码更好。

   this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub)).subscribe(
          (res: UnitDetail) =>{
              if(res.unitTwo){
                this.appStore.select(selectUnit).
                pipe(shareReplayUntil(this.destroySub)).subscribe(
                  (unitId: string) => {
                    if(unitId ===  res.unitTwo){
                      this.sameUnit = true;
                    }else{
                      this.sameUnit = false;
                    }
                  });
              }
          }
       );
this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub),mergeMap(
          (res: UnitDetail) =>{
              if(res.unitTwo){
               return this.appStore.select(selectUnit).
                pipe(shareReplayUntil(this.destroySub),map(
                  (unitId: string) =>  unitId ===  res.unitTwo);
              }
          }
       ).subscribe({
        next: (sameUnit: boolean) => {
           //do something 
        }
       });

您不需要高阶运算符,因为可观察对象 this.selectedUnitDetailModel$this.appStore.select(selectUnit) 彼此独立。相反,您可以使用 forkJoincombineLatestzip 等函数来并行获取它们的通知。

你可以找到不同之处 b/n 这些函数

尝试以下方法

forkJoin(
  this.selectedUnitDetailModel$.pipe(take(1)),      // <-- complete on first emission
  this.appStore.select(selectUnit).pipe(take(1))    // <-- complete on first emission
).subscribe(
  ([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
  (error) => console.log(error)                     // <-- handle error
);

forkJoin 仅在源可观察对象完成时发出,因此我将 take(1) 通过管道传输到每个可观察对象。 forkJoin 现在将在每个可观察和完整的第一次发射时发射。因此,您对 shareReplayUntil(this.destroySub) 的需求得到了缓解。

但是,如果您需要保持来自 observable 的发射流打开,您可以使用 combineLatestzip 代替。在这种情况下,您可以将 take(1) 替换为您的 ``shareReplayUntil(this.destroySub)`。

更新: this.selectedUnitDetailModel$ 可观察

连续流

正如我之前所说,您可以使用 combineLatest 而不是 forkJoin 来启用连续的数据流。

尝试以下方法

import { Subject, combineLatest } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

combineLatest(
  this.selectedUnitDetailModel$,
  this.appStore.select(selectUnit)
).pipe(
  takeUntil(this.destroySub)         // <-- replaced with `takeUntil` operator
).subscribe(
  ([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
  (error) => console.log(error)                     // <-- handle error
);