如何使用 MergeMap 或 FlatMap 或使用 rxJs-operators 的更好方法编写以下代码?
How to write below code using MergeMap or FlatMap or some better way with rxJs-operators?
我有两个可观察的管道。我需要一个接一个 运行 并比较两个值是否相等。我尝试了下面的 code.This 应该可以工作,当发出第一个可观察值时,它应该去获取第二个 obserbla 值并且应该首先比较它 return value.I 需要一些专家帮助,以 refator这段代码更好。
this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub)).subscribe(
(res: UnitDetail) =>{
if(res.unitTwo){
this.appStore.select(selectUnit).
pipe(shareReplayUntil(this.destroySub)).subscribe(
(unitId: string) => {
if(unitId === res.unitTwo){
this.sameUnit = true;
}else{
this.sameUnit = false;
}
});
}
}
);
this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub),mergeMap(
(res: UnitDetail) =>{
if(res.unitTwo){
return this.appStore.select(selectUnit).
pipe(shareReplayUntil(this.destroySub),map(
(unitId: string) => unitId === res.unitTwo);
}
}
).subscribe({
next: (sameUnit: boolean) => {
//do something
}
});
您不需要高阶运算符,因为可观察对象 this.selectedUnitDetailModel$
和 this.appStore.select(selectUnit)
彼此独立。相反,您可以使用 forkJoin
、combineLatest
或 zip
等函数来并行获取它们的通知。
你可以找到不同之处 b/n 这些函数 。
尝试以下方法
forkJoin(
this.selectedUnitDetailModel$.pipe(take(1)), // <-- complete on first emission
this.appStore.select(selectUnit).pipe(take(1)) // <-- complete on first emission
).subscribe(
([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
(error) => console.log(error) // <-- handle error
);
forkJoin
仅在源可观察对象完成时发出,因此我将 take(1)
通过管道传输到每个可观察对象。 forkJoin
现在将在每个可观察和完整的第一次发射时发射。因此,您对 shareReplayUntil(this.destroySub)
的需求得到了缓解。
但是,如果您需要保持来自 observable 的发射流打开,您可以使用 combineLatest
或 zip
代替。在这种情况下,您可以将 take(1)
替换为您的 ``shareReplayUntil(this.destroySub)`。
更新: this.selectedUnitDetailModel$
可观察
连续流
正如我之前所说,您可以使用 combineLatest
而不是 forkJoin
来启用连续的数据流。
尝试以下方法
import { Subject, combineLatest } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
combineLatest(
this.selectedUnitDetailModel$,
this.appStore.select(selectUnit)
).pipe(
takeUntil(this.destroySub) // <-- replaced with `takeUntil` operator
).subscribe(
([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
(error) => console.log(error) // <-- handle error
);
我有两个可观察的管道。我需要一个接一个 运行 并比较两个值是否相等。我尝试了下面的 code.This 应该可以工作,当发出第一个可观察值时,它应该去获取第二个 obserbla 值并且应该首先比较它 return value.I 需要一些专家帮助,以 refator这段代码更好。
this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub)).subscribe(
(res: UnitDetail) =>{
if(res.unitTwo){
this.appStore.select(selectUnit).
pipe(shareReplayUntil(this.destroySub)).subscribe(
(unitId: string) => {
if(unitId === res.unitTwo){
this.sameUnit = true;
}else{
this.sameUnit = false;
}
});
}
}
);
this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub),mergeMap(
(res: UnitDetail) =>{
if(res.unitTwo){
return this.appStore.select(selectUnit).
pipe(shareReplayUntil(this.destroySub),map(
(unitId: string) => unitId === res.unitTwo);
}
}
).subscribe({
next: (sameUnit: boolean) => {
//do something
}
});
您不需要高阶运算符,因为可观察对象 this.selectedUnitDetailModel$
和 this.appStore.select(selectUnit)
彼此独立。相反,您可以使用 forkJoin
、combineLatest
或 zip
等函数来并行获取它们的通知。
你可以找到不同之处 b/n 这些函数
尝试以下方法
forkJoin(
this.selectedUnitDetailModel$.pipe(take(1)), // <-- complete on first emission
this.appStore.select(selectUnit).pipe(take(1)) // <-- complete on first emission
).subscribe(
([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
(error) => console.log(error) // <-- handle error
);
forkJoin
仅在源可观察对象完成时发出,因此我将 take(1)
通过管道传输到每个可观察对象。 forkJoin
现在将在每个可观察和完整的第一次发射时发射。因此,您对 shareReplayUntil(this.destroySub)
的需求得到了缓解。
但是,如果您需要保持来自 observable 的发射流打开,您可以使用 combineLatest
或 zip
代替。在这种情况下,您可以将 take(1)
替换为您的 ``shareReplayUntil(this.destroySub)`。
更新: this.selectedUnitDetailModel$
可观察
连续流
正如我之前所说,您可以使用 combineLatest
而不是 forkJoin
来启用连续的数据流。
尝试以下方法
import { Subject, combineLatest } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
combineLatest(
this.selectedUnitDetailModel$,
this.appStore.select(selectUnit)
).pipe(
takeUntil(this.destroySub) // <-- replaced with `takeUntil` operator
).subscribe(
([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
(error) => console.log(error) // <-- handle error
);