RxJs mergeAll(), scan() 继续累加

RxJs mergeAll(), scan() coninutes to accumulate

我有这段代码,旨在在将特征添加到可观察的特征 $$ 时积累特征。

  private features$$: BehaviorSubject<Observable<Feature<FeatureConfig>>> = new BehaviorSubject<
    Observable<Feature<FeatureConfig>>
  >(of());
  
  private features$: Observable<Feature<FeatureConfig>[]> = this.features$$.pipe(
    mergeAll(),
    scan((acc: Feature<FeatureConfig>[], filter: Feature<FeatureConfig>) => {
      acc.push(filter);
      return acc;
    }, [] as Feature<FeatureConfig>[])
  );

对象本身不重要,但是用add函数添加的:

  add(feature: FeatureConfig): void {
    this.features$$.next(
      of({
        key: feature.friendlyId,
        value: feature
      })
    );
  }

然后通过旨在通过值过滤当前特征的可观察对象公开公开:

  feature$ = (feature: string): Observable<FeatureConfig | null> => {
    return this.features$.pipe(
      map((features: FeatureConfig[]) => {
        return (
          features.find((featureConfig: FeatureConfig) => {
            return featureConfig.key === feature;
          })?.value ?? null
        );
      })
    );
  };

这似乎工作正常,除了每次调用 feature$(feature: string) 它都会命中私有 features$ 可观察扫描运算符中的 acc.push(filter) 行。也许我误解了模式或 mergeAll/scan 的行为方式,或者我只是遗漏了一些明显的东西?我的想法是它应该只是 return 在任何给定时间推送到 features$$ 的任何内容的当前值?

在定义了features$scan运算符后添加shareReplay(1)运算符:

private features$: Observable<Feature<FeatureConfig>[]> = this.features$$.pipe(
  mergeAll(),
  scan((acc: Feature<FeatureConfig>[], filter: Feature<FeatureConfig>) => {
    acc.push(filter);
    return acc;
  }, [] as Feature<FeatureConfig>[]),
  shareReplay(1)
);

这将防止 Observable 链在 shareReplay 运算符之前到 re-run 每次有新订阅时。