如何使用 RxJS 正确链接请求
How to chain requests correctly with RxJS
我只是想学习 Angular 和 RxJS (Observables),但我在以正确的方式链接请求时遇到了一些问题。
场景如下:
- 我运行请求A得到一个JSON对象,其中包含一个数组
对象。
- 对于这些对象中的每一个,我都需要执行请求 B
依次 returns 一个带有对象数组的 JSON 对象。
- 对于请求 B 的响应中的每个对象,我需要执行请求 C
returns 一个简单的 JSON 对象。
- 现在我必须存储从请求 C 收到的所有对象并使用信息扩展它们(信息来自请求 D、E 和 F 的答案)。
- 只有当所有请求都完成后,我才想显示我的应用程序的 UI,在此之前只有一个加载微调器应该是可见的。
我当前的解决方案为我提供了所有必要的信息,但我找不到等待所有请求完成的方法。
它看起来像这样:
private fetchData(): void {
this.backendServie.getObjectA().pipe(
flatMap((objects: IObjectA[]) => {
// return objects the get them one by one
return objects
}),
flatMap((object: IObjectA) => {
return this.backendService.getObjectB(object.id)
}),
flatMap((objects: IObjectB[]) => {
// return objects the get them one by one
return objects
}),
flatMap((object: IObjectB) => {
return this.backendService.getObjectC(object.id)
}),
flatMap((object: IObjectC) => {
// convert object to make additional fields available
const extendedObject: IObjectCExtended = object as IObjectCExtended;
this.backendService.getAdditionalInformationD(object.id).subscribe((info: string) >= {
extendedObject.additionalInfoD = info;
});
this.backendService.getAdditionalInformationE(object.id).subscribe((info: string) >= {
extendedObject.additionalInfoE = info;
});
this.backendService.getAdditionalInformationF(object.id).subscribe((info: string) >= {
extendedObject.additionalInfoF = info;
})
return extendedObject;
})
).subscribe((extendedObject: IObejectCExtended) => {
this.objects.push(extendedObject);
})
}
不幸的是,这种链接请求的方式感觉不对,正如我所说,我找不到一种方法来检查是否所有请求都已完成并且 UI 只是一点一点地填满。
我希望我的问题表达得可以理解,在此先感谢您的帮助。
此致
如前所述,这种设计效率很低。因此,最好更改您的后端 - 正如评论中已经提到的那样。
如果您无法更改后端,您可以使用 switchMap
和 forkJoin
的组合对您的请求进行分组。
一次精简所有条目
您可以针对每个要调用的 API 一次性简化所有条目。所以基本上:
- 获取所有 objectAs
- 获取每个对象A的所有对象B
- 获取每个对象B的所有对象C
- 获取每个对象的所有添加 C
- 每次添加都扩展所有 objectC
private fetchData(): void {
// get your initial object
this.backendServie.getObjectA().pipe(
// switch to forkJoin, which waits until all inner observables complete and returns them as an array
switchMap((objects: IObjectA[]) => forkJoin(
// map all your objects to an array of observables which will then be waited on by forkJoin
objects.map(obj => this.backendService.getObjectB(obj.id))
)),
// Again, switch to another forkJoin
switchMap((objects: IObjectB[]) => forkJoin(
// Again, map all your objects to observables that forkJoin will collect and wait on
objects.map(obj => this.backendService.getObjectC(obj.id))
)),
// switch to another forkJoin that retrieves all your extensions for every object
switchMap((objects: IObjectC[]) => forkJoin(
// map each object to a forkJoin that retrieves the extensions for that object and map it to the extended object
objects.map(obj => forkJoin([
this.backendService.getAdditionalInformationD(obj.id),
this.backendService.getAdditionalInformationE(obj.id),
this.backendService.getAdditionalInformationF(obj.id)
]).pipe(
map(([infoD, infoE, infoF]) => ({
...obj,
additionalInfoD: infoD,
additionalInfoE: infoE,
additionalInfoF: infoF
}))
)
/* Alternatively, you can use the dictionary syntax to shorten this
objects.map(obj => forkJoin({
additionalInfoD: this.backendService.getAdditionalInformationD(obj.id),
additionalInfoE: this.backendService.getAdditionalInformationE(obj.id),
additionalInfoF: this.backendService.getAdditionalInformationF(obj.id)
}).pipe(map(additionalInfo => ({...obj, ...additionalInfo })))
*/
))
// You're now getting a list of the extended objects
).subscribe((extendedObjects: IObejectCExtended[]) => {
this.objects = extendedObjects;
});
}
为了使代码更清晰一些,您可以将不同的部分分组到不同的函数中。这可能看起来像这样。
private fetchData(): void {
this.fetchExtendedObjects()
.subscribe(extendedObjects => this.objects = extendedObjects);
}
private fetchExtendedObjects():Observable<IObjectCExtended[]> {
return this.backendServie.getObjectA().pipe(
switchMap(objectsA => this.getAllObjectsB(objectsA)),
switchMap(objectsB => this.getAllObjectsC(objectsB)),
switchMap(objectsC => this.extendAllObjectsC(objectsC))
)
}
private getAllObjectsB(objects: IObjectA[]):Observable<IObjectB[]> {
return forkJoin(objects.map(obj => this.backendService.getObjectB(obj.id)));
}
private getAllObjectsC(objects: IObjectB[]):Observable<IObjectC[]> {
return forkJoin(objects.map(obj => this.backendService.getObjectC(obj.id)));
}
private extendAllObjectsC(objects: IObjectC[]):Observable<IObjectCExtended[]> {
return forkJoin(objects.map(obj => this.extendObjectC(obj)));
}
private extendObjectC(object: IObjectC):Observable<IObejectCExtended> {
objects.map(obj => forkJoin({
additionalInfoD: this.backendService.getAdditionalInformationD(obj.id),
additionalInfoE: this.backendService.getAdditionalInformationE(obj.id),
additionalInfoF: this.backendService.getAdditionalInformationF(obj.id)
}).pipe(map(additionalInfo => ({...obj, ...additionalInfo })))
}
分别精简每个条目
作为对上述的优化,您可以单独简化每个对象,这将给您带来小幅性能提升。总的来说,这应该不会有太大影响,因为您仍然需要等待所有请求完成,但如果您的某些 API 对于某些对象来说速度较慢,这可能会有所帮助。
这基本上意味着:
- 获取所有objectAs
- 对每个objectA并行获取objectB
- 对于每个对象B,并行得到对象C
- 为每个 objectC 获取所有扩展(为每个 objectC 并行)
- 对于每个 objectC 扩展 objectC 的所有扩展(对于每个 objectC 并行)
- 将所有扩展的 objectC 组合成一个数组
private fetchData(): void {
// get your initial object
this.backendServie.getObjectA().pipe(
switchMap((objects: IObjectA[]) => forkJoin(
// Switch each objectA to an observable that retrieves objectB
// In contrast to the first version, this is done for each objectA separately
objects.map(obj => this.backendService.getObjectB(obj.id).pipe(
// Switch each objectB to an observable that retrieves objectC
switchMap((object: IObjectB) => this.backendService.getObjectC(obj.id)),
// Switch each objectC to an observable that retrieves all of the
// extensions an combines them with objectC
switchMap((object: IObjectC) =>
// Retrieves all extensions and provides them as a dictionary
forkJoin({
additionalInfoD: this.backendService.getAdditionalInformationD(obj.id),
additionalInfoE: this.backendService.getAdditionalInformationE(obj.id),
additionalInfoF: this.backendService.getAdditionalInformationF(obj.id)
}).pipe(
// combine the original objectC with all the extensions
map(additionalInfo => ({...object, ...additionalInfo}))
)
)
))
))
// You're now getting a list of the extended objects
).subscribe((extendedObjects: IObejectCExtended[]) => {
this.objects = extendedObjects;
});
}
同样,为了使其更具可读性,您可以将各个部分分成函数:
private fetchData(): void {
this.fetchExtendedObjects().subscribe(objects => this.objects = objects);
}
private fetchExtendedObjects():Observable<IObjectCExtended[]> {
return this.backendServie.getObjectA().pipe(
switchMap((objects: IObjectA[]) => this.getExtendedObjectsC(objects))
);
}
private getExtendedObjectsC(objects: IObjectA[]):Observable<IObjectCExtended[]> {
return forkJoin(objects.map(obj => this.getExtendedObjectC(obj)));
}
private getExtendedObjectC(objectA: IObjectA):Observable<IObjectCExtended> {
return this.backendService.getObjectB(objectA.id).pipe(
switchMap((object: IObjectB) => this.backendService.getObjectC(obj.id)),
switchMap((object: IObjectC) => this.extendObjectC(object))
);
}
private extendObjectC(object: IObjectC):Observable<IObejectCExtended> {
objects.map(obj => forkJoin({
additionalInfoD: this.backendService.getAdditionalInformationD(obj.id),
additionalInfoE: this.backendService.getAdditionalInformationE(obj.id),
additionalInfoF: this.backendService.getAdditionalInformationF(obj.id)
}).pipe(map(additionalInfo => ({...obj, ...additionalInfo })))
}
我只是想学习 Angular 和 RxJS (Observables),但我在以正确的方式链接请求时遇到了一些问题。
场景如下:
- 我运行请求A得到一个JSON对象,其中包含一个数组 对象。
- 对于这些对象中的每一个,我都需要执行请求 B 依次 returns 一个带有对象数组的 JSON 对象。
- 对于请求 B 的响应中的每个对象,我需要执行请求 C returns 一个简单的 JSON 对象。
- 现在我必须存储从请求 C 收到的所有对象并使用信息扩展它们(信息来自请求 D、E 和 F 的答案)。
- 只有当所有请求都完成后,我才想显示我的应用程序的 UI,在此之前只有一个加载微调器应该是可见的。
我当前的解决方案为我提供了所有必要的信息,但我找不到等待所有请求完成的方法。 它看起来像这样:
private fetchData(): void {
this.backendServie.getObjectA().pipe(
flatMap((objects: IObjectA[]) => {
// return objects the get them one by one
return objects
}),
flatMap((object: IObjectA) => {
return this.backendService.getObjectB(object.id)
}),
flatMap((objects: IObjectB[]) => {
// return objects the get them one by one
return objects
}),
flatMap((object: IObjectB) => {
return this.backendService.getObjectC(object.id)
}),
flatMap((object: IObjectC) => {
// convert object to make additional fields available
const extendedObject: IObjectCExtended = object as IObjectCExtended;
this.backendService.getAdditionalInformationD(object.id).subscribe((info: string) >= {
extendedObject.additionalInfoD = info;
});
this.backendService.getAdditionalInformationE(object.id).subscribe((info: string) >= {
extendedObject.additionalInfoE = info;
});
this.backendService.getAdditionalInformationF(object.id).subscribe((info: string) >= {
extendedObject.additionalInfoF = info;
})
return extendedObject;
})
).subscribe((extendedObject: IObejectCExtended) => {
this.objects.push(extendedObject);
})
}
不幸的是,这种链接请求的方式感觉不对,正如我所说,我找不到一种方法来检查是否所有请求都已完成并且 UI 只是一点一点地填满。
我希望我的问题表达得可以理解,在此先感谢您的帮助。
此致
如前所述,这种设计效率很低。因此,最好更改您的后端 - 正如评论中已经提到的那样。
如果您无法更改后端,您可以使用 switchMap
和 forkJoin
的组合对您的请求进行分组。
一次精简所有条目
您可以针对每个要调用的 API 一次性简化所有条目。所以基本上:
- 获取所有 objectAs
- 获取每个对象A的所有对象B
- 获取每个对象B的所有对象C
- 获取每个对象的所有添加 C
- 每次添加都扩展所有 objectC
private fetchData(): void {
// get your initial object
this.backendServie.getObjectA().pipe(
// switch to forkJoin, which waits until all inner observables complete and returns them as an array
switchMap((objects: IObjectA[]) => forkJoin(
// map all your objects to an array of observables which will then be waited on by forkJoin
objects.map(obj => this.backendService.getObjectB(obj.id))
)),
// Again, switch to another forkJoin
switchMap((objects: IObjectB[]) => forkJoin(
// Again, map all your objects to observables that forkJoin will collect and wait on
objects.map(obj => this.backendService.getObjectC(obj.id))
)),
// switch to another forkJoin that retrieves all your extensions for every object
switchMap((objects: IObjectC[]) => forkJoin(
// map each object to a forkJoin that retrieves the extensions for that object and map it to the extended object
objects.map(obj => forkJoin([
this.backendService.getAdditionalInformationD(obj.id),
this.backendService.getAdditionalInformationE(obj.id),
this.backendService.getAdditionalInformationF(obj.id)
]).pipe(
map(([infoD, infoE, infoF]) => ({
...obj,
additionalInfoD: infoD,
additionalInfoE: infoE,
additionalInfoF: infoF
}))
)
/* Alternatively, you can use the dictionary syntax to shorten this
objects.map(obj => forkJoin({
additionalInfoD: this.backendService.getAdditionalInformationD(obj.id),
additionalInfoE: this.backendService.getAdditionalInformationE(obj.id),
additionalInfoF: this.backendService.getAdditionalInformationF(obj.id)
}).pipe(map(additionalInfo => ({...obj, ...additionalInfo })))
*/
))
// You're now getting a list of the extended objects
).subscribe((extendedObjects: IObejectCExtended[]) => {
this.objects = extendedObjects;
});
}
为了使代码更清晰一些,您可以将不同的部分分组到不同的函数中。这可能看起来像这样。
private fetchData(): void {
this.fetchExtendedObjects()
.subscribe(extendedObjects => this.objects = extendedObjects);
}
private fetchExtendedObjects():Observable<IObjectCExtended[]> {
return this.backendServie.getObjectA().pipe(
switchMap(objectsA => this.getAllObjectsB(objectsA)),
switchMap(objectsB => this.getAllObjectsC(objectsB)),
switchMap(objectsC => this.extendAllObjectsC(objectsC))
)
}
private getAllObjectsB(objects: IObjectA[]):Observable<IObjectB[]> {
return forkJoin(objects.map(obj => this.backendService.getObjectB(obj.id)));
}
private getAllObjectsC(objects: IObjectB[]):Observable<IObjectC[]> {
return forkJoin(objects.map(obj => this.backendService.getObjectC(obj.id)));
}
private extendAllObjectsC(objects: IObjectC[]):Observable<IObjectCExtended[]> {
return forkJoin(objects.map(obj => this.extendObjectC(obj)));
}
private extendObjectC(object: IObjectC):Observable<IObejectCExtended> {
objects.map(obj => forkJoin({
additionalInfoD: this.backendService.getAdditionalInformationD(obj.id),
additionalInfoE: this.backendService.getAdditionalInformationE(obj.id),
additionalInfoF: this.backendService.getAdditionalInformationF(obj.id)
}).pipe(map(additionalInfo => ({...obj, ...additionalInfo })))
}
分别精简每个条目
作为对上述的优化,您可以单独简化每个对象,这将给您带来小幅性能提升。总的来说,这应该不会有太大影响,因为您仍然需要等待所有请求完成,但如果您的某些 API 对于某些对象来说速度较慢,这可能会有所帮助。
这基本上意味着:
- 获取所有objectAs
- 对每个objectA并行获取objectB
- 对于每个对象B,并行得到对象C
- 为每个 objectC 获取所有扩展(为每个 objectC 并行)
- 对于每个 objectC 扩展 objectC 的所有扩展(对于每个 objectC 并行)
- 将所有扩展的 objectC 组合成一个数组
private fetchData(): void {
// get your initial object
this.backendServie.getObjectA().pipe(
switchMap((objects: IObjectA[]) => forkJoin(
// Switch each objectA to an observable that retrieves objectB
// In contrast to the first version, this is done for each objectA separately
objects.map(obj => this.backendService.getObjectB(obj.id).pipe(
// Switch each objectB to an observable that retrieves objectC
switchMap((object: IObjectB) => this.backendService.getObjectC(obj.id)),
// Switch each objectC to an observable that retrieves all of the
// extensions an combines them with objectC
switchMap((object: IObjectC) =>
// Retrieves all extensions and provides them as a dictionary
forkJoin({
additionalInfoD: this.backendService.getAdditionalInformationD(obj.id),
additionalInfoE: this.backendService.getAdditionalInformationE(obj.id),
additionalInfoF: this.backendService.getAdditionalInformationF(obj.id)
}).pipe(
// combine the original objectC with all the extensions
map(additionalInfo => ({...object, ...additionalInfo}))
)
)
))
))
// You're now getting a list of the extended objects
).subscribe((extendedObjects: IObejectCExtended[]) => {
this.objects = extendedObjects;
});
}
同样,为了使其更具可读性,您可以将各个部分分成函数:
private fetchData(): void {
this.fetchExtendedObjects().subscribe(objects => this.objects = objects);
}
private fetchExtendedObjects():Observable<IObjectCExtended[]> {
return this.backendServie.getObjectA().pipe(
switchMap((objects: IObjectA[]) => this.getExtendedObjectsC(objects))
);
}
private getExtendedObjectsC(objects: IObjectA[]):Observable<IObjectCExtended[]> {
return forkJoin(objects.map(obj => this.getExtendedObjectC(obj)));
}
private getExtendedObjectC(objectA: IObjectA):Observable<IObjectCExtended> {
return this.backendService.getObjectB(objectA.id).pipe(
switchMap((object: IObjectB) => this.backendService.getObjectC(obj.id)),
switchMap((object: IObjectC) => this.extendObjectC(object))
);
}
private extendObjectC(object: IObjectC):Observable<IObejectCExtended> {
objects.map(obj => forkJoin({
additionalInfoD: this.backendService.getAdditionalInformationD(obj.id),
additionalInfoE: this.backendService.getAdditionalInformationE(obj.id),
additionalInfoF: this.backendService.getAdditionalInformationF(obj.id)
}).pipe(map(additionalInfo => ({...obj, ...additionalInfo })))
}