RxJs mergeAll(), scan() 继续累加
RxJs mergeAll(), scan() coninutes to accumulate
我有这段代码,旨在在将特征添加到可观察的特征 $$ 时积累特征。
private features$$: BehaviorSubject<Observable<Feature<FeatureConfig>>> = new BehaviorSubject<
Observable<Feature<FeatureConfig>>
>(of());
private features$: Observable<Feature<FeatureConfig>[]> = this.features$$.pipe(
mergeAll(),
scan((acc: Feature<FeatureConfig>[], filter: Feature<FeatureConfig>) => {
acc.push(filter);
return acc;
}, [] as Feature<FeatureConfig>[])
);
对象本身不重要,但是用add函数添加的:
add(feature: FeatureConfig): void {
this.features$$.next(
of({
key: feature.friendlyId,
value: feature
})
);
}
然后通过旨在通过值过滤当前特征的可观察对象公开公开:
feature$ = (feature: string): Observable<FeatureConfig | null> => {
return this.features$.pipe(
map((features: FeatureConfig[]) => {
return (
features.find((featureConfig: FeatureConfig) => {
return featureConfig.key === feature;
})?.value ?? null
);
})
);
};
这似乎工作正常,除了每次调用 feature$(feature: string) 它都会命中私有 features$ 可观察扫描运算符中的 acc.push(filter) 行。也许我误解了模式或 mergeAll/scan 的行为方式,或者我只是遗漏了一些明显的东西?我的想法是它应该只是 return 在任何给定时间推送到 features$$ 的任何内容的当前值?
在定义了features$
的scan
运算符后添加shareReplay(1)运算符:
private features$: Observable<Feature<FeatureConfig>[]> = this.features$$.pipe(
mergeAll(),
scan((acc: Feature<FeatureConfig>[], filter: Feature<FeatureConfig>) => {
acc.push(filter);
return acc;
}, [] as Feature<FeatureConfig>[]),
shareReplay(1)
);
这将防止 Observable 链在 shareReplay
运算符之前到 re-run 每次有新订阅时。
我有这段代码,旨在在将特征添加到可观察的特征 $$ 时积累特征。
private features$$: BehaviorSubject<Observable<Feature<FeatureConfig>>> = new BehaviorSubject<
Observable<Feature<FeatureConfig>>
>(of());
private features$: Observable<Feature<FeatureConfig>[]> = this.features$$.pipe(
mergeAll(),
scan((acc: Feature<FeatureConfig>[], filter: Feature<FeatureConfig>) => {
acc.push(filter);
return acc;
}, [] as Feature<FeatureConfig>[])
);
对象本身不重要,但是用add函数添加的:
add(feature: FeatureConfig): void {
this.features$$.next(
of({
key: feature.friendlyId,
value: feature
})
);
}
然后通过旨在通过值过滤当前特征的可观察对象公开公开:
feature$ = (feature: string): Observable<FeatureConfig | null> => {
return this.features$.pipe(
map((features: FeatureConfig[]) => {
return (
features.find((featureConfig: FeatureConfig) => {
return featureConfig.key === feature;
})?.value ?? null
);
})
);
};
这似乎工作正常,除了每次调用 feature$(feature: string) 它都会命中私有 features$ 可观察扫描运算符中的 acc.push(filter) 行。也许我误解了模式或 mergeAll/scan 的行为方式,或者我只是遗漏了一些明显的东西?我的想法是它应该只是 return 在任何给定时间推送到 features$$ 的任何内容的当前值?
在定义了features$
的scan
运算符后添加shareReplay(1)运算符:
private features$: Observable<Feature<FeatureConfig>[]> = this.features$$.pipe(
mergeAll(),
scan((acc: Feature<FeatureConfig>[], filter: Feature<FeatureConfig>) => {
acc.push(filter);
return acc;
}, [] as Feature<FeatureConfig>[]),
shareReplay(1)
);
这将防止 Observable 链在 shareReplay
运算符之前到 re-run 每次有新订阅时。