rxjs:如何让 foreach 循环等待内部可观察
rxjs: how to make foreach loop wait for inner observable
我有一个 foreach 循环,在这个循环中我可能需要一个 http 请求来获取一些信息。我尝试在 foreach 循环内使用 forkjoin 到 'make the foreach loop wait for the observables'(在这个例子中只有一个可观察的,但实际上我会有更多......)。但是使用这段代码,每个 foreach 都保持 运行 而不等待 observables 完成,我找不到任何解决方案来防止这种情况。
非常感谢您的帮助!!!
...
let wishes: Wish[] = [];
response.wishes.forEach((wish) => {
let newWish : Wish = new Wish( wish._id, wish.title, wish.price );
let personObservables: Observable<any>[] = [];
if (wish.reservation){
personObservables.push(this.peopleService.getPersonById(wish.reservation.reservedBy));
} else {
personObservables.push(of(null));
}
forkJoin(personObservables).subscribe( ([reservedBy]) => {
if (reservedBy) {
newWish.reservation = {
...wish.reservation,
reservedBy
};
}
wishes.push(newWish);
} );
});
...
编辑:没有 foreach 循环的完整工作解决方案。在管道中使用 map 运算符,在数组中使用 map 函数要容易得多。我了解到,将这种逻辑拆分为多个运算符比尝试在 1 个映射运算符中修复它更容易...
public getWishlist ( receiver : Person) : Observable<Wish[]> {
return this.http$.get<IWishlistResponse[]>(
environment.apiUrl + 'wishlist/' + receiver.id
).pipe(
// Create wish instances from each wish in API response and save reservation for later use
map( wishlistResponse =>
wishlistResponse[0].wishes.map(wish => ({
wish: this.createWishInstanceFromResponse(wish, receiver),
reservation: wish.reservation
}))),
// For each wish with reservation: get person info for 'reservedBy' id
map( wishesAndReservationObjects => wishesAndReservationObjects.map( ({wish, reservation}) =>
!reservation ?
of(wish) :
this.peopleService.getPersonById(reservation.reservedBy)
.pipe(
map ( reservedBy => {
if (reservedBy) wish.reservation = {
...reservation,
reservedBy: new Person(reservedBy.id, reservedBy.firstName, reservedBy.lastName)
}
return wish;
})
)
)),
// forkJoin all observables, so the result is an array of all the wishes
switchMap(reservedByObservables => reservedByObservables.length !== 0 ? forkJoin(reservedByObservables) : of(<Wish[]>[])), //
// Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
map ( wishes => wishes.map( wish => {
wish.setUserIsFlags(this.userService.currentUser);
return wish;
})),
// For each wish: get state via API call
map ( wishesWithoutState => wishesWithoutState.map( wishWithoutState =>
this.http$.get<wishStatus>(environment.apiUrl + 'wish/' + wishWithoutState.id + '/state')
.pipe(
catchError(() => of(null)),
map( state => {
wishWithoutState.status = state;
return wishWithoutState;
})
)
)),
// Combine all stateObservables into 1 array
switchMap(stateObservables => stateObservables.length !== 0 ? forkJoin(stateObservables) : of(<Wish[]>[]))
)
}
private createWishInstanceFromResponse ( wishResponse : IWishResponse, receiver: Person ) : Wish {
let wish : Wish = new Wish (
wishResponse._id,
wishResponse.title,
wishResponse.price,
...
);
return wish;
}
您使用 from
和 concatMap
来完成此操作。
let wishes: Wish[] = [];
from(response.wishes).pipe(concatMap(wish) => {
let newWish : Wish = new Wish( wish._id, wish.title, wish.price );
let personObservables: Observable<any>[] = [];
if (wish.reservation){
personObservables.push(this.peopleService.getPersonById(wish.reservation.reservedBy));
} else {
personObservables.push(of(null));
}
return forkJoin(personObservables).pipe(tap([reservedBy]) => {
if (reservedBy) {
newWish.reservation = {
...wish.reservation,
reservedBy
};
}
wishes.push(newWish);
}));
}).subscribe();
您可以创建此函数来处理您的流程:
completeData(items): Observable<any> {
let $apiCallItems: Observable<any>[] = [];
const itemsNeedServiceCall: Array<any> = [];
const itemsCompleted: Array<any> = [];
items.forEach((item) => {
if (item.reservation) {
$apiCallItems.push(this.peopleService.getPersonById(item.reservation.reservedBy));
itemsNeedServiceCall.push(item);
} else {
itemsCompleted.push(item);
}
});
return forkJoin($apiCallItems).pipe(
map((r) => {
for (let i = 0; i < r.length; i++) {
itemsNeedServiceCall[i] = {
...itemsNeedServiceCall[i],
...r[i],
};
}
return [...itemsCompleted, ...itemsNeedServiceCall];
})
);
}
然后在任何你想使用的地方你都可以这样做:
this.completeData(response.wishes).subscribe(r => {
//some code
})
作为一般规则,永远不要在可观察对象上调用订阅,除非您是最终消费者并且您准备好丢弃可观察对象(不再需要对数据进行转换)。这通常只发生在向用户显示或写入数据库时。
我不太清楚您的代码在做什么(您的各种调用 return)。所以这是我将如何做你似乎正在尝试的事情的近似值。
按照这些思路应该可行。
请注意,在我准备好将最终的愿望列表记录到控制台之前,我是如何从不订阅的?这是设计使然,因此信息符合预期。
return this.http$.get<IWishlistResponse[]>(
environment.apiUrl +
'wishlist/' +
receiver.id
).pipe(
map(wishlist => wishlist.wishes.map(wish => ({
oldWish: wish,
newWish: new Wish( wish._id, wish.title, wish.price)
}))),
map(wishes => wishes.map(({oldWish, newWish}) =>
!oldWish.reservation ?
of(newWish) :
this.peopleService.getPersonById(oldWish.reservation.reservedBy).pipe(
map(([reservedBy]) => reservedBy ?
{
...newWish,
reservation: {
...oldWish.reservation,
reservedBy
}
} :
{
...newWish,
reservation: oldWish.reservation
}
)
)
)),
switchMap(newWishObservables => forkJoin(newWishObservables))
).subscribe(newWishes => console.log(newWishes));
更新 #1:清理代码
一些使代码更易于维护、错误检查等的方法是将可观察对象的创建外包到单独的函数中,并保持转换逻辑的管道“干净(呃)”。
完成后,此模式:
map(things => things.map(thing => observableThing)),
mergeMap(observableThings => forkJoin(observableThings))
可以组合起来而不会造成太多混乱
mergeMap(things => forkJoin(
things.map(thing => observableThing)
))
以下是我如何整理更新后的代码中的管道(这只是即时快速完成,未经测试)。
您会注意到我去掉了处理空 forkJoin
的代码。我不认为这真的是个问题,您可以在 observable 完成后在管道中解决这个问题。
您还会注意到,我在创建可观察对象的两个函数中做了一些快速的错误处理。两次我都只是替换了一个默认值(尽管在管道的不同部分,效果略有不同)。实际上,您可能需要检查错误的名称或类型并做出相应的响应。
我只是忽略错误,但有些错误不应该被悄悄忽略。由你决定。
public getWishlist ( receiver : Person) : Observable<Wish[]> {
return this.http$.get<IWishlistResponse[]>(
environment.apiUrl + 'wishlist/' + receiver.id
).pipe(
// Create wish instances from each wish in API response and save reservation for later use
map( wishlistResponse =>
wishlistResponse[0].wishes.map(wish => ({
wish: this.createWishInstanceFromResponse(wish, receiver),
reservation: wish.reservation
}))
),
// For each wish with reservation: get person info for 'reservedBy' id
map(wishesAndReservationObjects =>
wishesAndReservationObjects.map(({wish, reservation}) =>
this.addReservationToWish(wish, reservation)
)
),
// forkJoin all observables, so the result is an array of all the wishes
switchMap(reservedByObservables =>
forkJoin(reservedByObservables)
),
// Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
map (wishes => wishes.map(wish => {
wish.setUserIsFlags(this.userService.currentUser);
return wish;
})),
// For each wish: get state via API call
map(wishesWithoutState => wishesWithoutState.map(wishWithoutState =>
this.addStatusToWish(wishWithoutState)
)),
// Combine all stateObservables into 1 array
switchMap(stateObservables =>
forkJoin(stateObservables)
),
// If this.http$.get<IWishlistResponse[]> is empty, this will all
// complete without emitting anything. So we can just give it a default
// thing to emit in case that happens.
defaultIfEmpty(<Wish[]>[])
);
}
private createWishInstanceFromResponse(
wishResponse : IWishResponse,
receiver: Person
): Wish {
let wish : Wish = new Wish (
wishResponse._id,
wishResponse.title,
wishResponse.price
);
return wish;
}
/***
* Create an observable that tries to add reservation and
* reservedBy to the wish
***/
private addReservationToWish(wish: Wish, reservation): Observable<Wish>{
return this.peopleService.getPersonById(reservation?.reservedBy).pipe(
// if peopleService.getPersonById fails (Say, because reservation was
// null), convert the error into a null response, the filter will Ignore
// the failed call and just return wish.
catchError(_ => of(null)),
// Since errors become null, this filters errors. Furthermore if peopleService
// doesn't error and just emits a null/undefined value instead, this will
// filter that situation as well
filter(reservedBy => reservedBy != null),
// Now we know reservedBy isn't null, so map doesn't need to check
map(reservedBy => {
wish.reservation = reservation;
wish.reservation.reservedBy = new Person(
reservedBy.id,
reservedBy.firstName,
reservedBy.lastName,
...
);
return wish;
}),
// If reservedBy was null (due to error or otherwise), then just emit
// the wish without a reservation
defaultIfEmpty(wish)
);
}
/***
* Create an observable that tries to add status to the wish
***/
private addStatusToWish(wish: Wish): Observable<Wish>{
return this.http$.get<WishStatus>(
environment.apiUrl +
'wish/' +
wishWithoutState.id +
'/state'
).pipe(
map(state => ({
...wish,
status: state
})),
// If http$.get<WishStatus> failed, then just return the wish
// without a status
catchError(_ => of(wish)),
);
}
既然 map
到 forkJoin(mapped)
的转换看起来足够干净,我可能会合并它们,但这取决于你。
这是它的样子:
public getWishlist ( receiver : Person) : Observable<Wish[]> {
return this.http$.get<IWishlistResponse[]>(
environment.apiUrl + 'wishlist/' + receiver.id
).pipe(
// Create wish instances from each wish in API response
// For each wish with reservation: get person info for 'reservedBy' id
switchMap(wishlistResponse => forkJoin(
wishlistResponse[0].wishes.map(wish =>
this.addReservationToWish(
this.createWishInstanceFromResponse(wish, receiver),
wish.reservation
)
)
)),
// Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
map (wishes => wishes.map(wish => {
wish.setUserIsFlags(this.userService.currentUser);
return wish;
})),
// For each wish: get state via API call
switchMap(wishesWithoutState => forkJoin(
wishesWithoutState.map(wishWithoutState => this.addStatusToWish(wishWithoutState))
)),
// If this.http$.get<IWishlistResponse[]> is empty, this will all
// complete without emitting anything. So we can just give it a default
// thing to emit in case that happens.
defaultIfEmpty(<Wish[]>[])
);
}
我有一个 foreach 循环,在这个循环中我可能需要一个 http 请求来获取一些信息。我尝试在 foreach 循环内使用 forkjoin 到 'make the foreach loop wait for the observables'(在这个例子中只有一个可观察的,但实际上我会有更多......)。但是使用这段代码,每个 foreach 都保持 运行 而不等待 observables 完成,我找不到任何解决方案来防止这种情况。
非常感谢您的帮助!!!
...
let wishes: Wish[] = [];
response.wishes.forEach((wish) => {
let newWish : Wish = new Wish( wish._id, wish.title, wish.price );
let personObservables: Observable<any>[] = [];
if (wish.reservation){
personObservables.push(this.peopleService.getPersonById(wish.reservation.reservedBy));
} else {
personObservables.push(of(null));
}
forkJoin(personObservables).subscribe( ([reservedBy]) => {
if (reservedBy) {
newWish.reservation = {
...wish.reservation,
reservedBy
};
}
wishes.push(newWish);
} );
});
...
编辑:没有 foreach 循环的完整工作解决方案。在管道中使用 map 运算符,在数组中使用 map 函数要容易得多。我了解到,将这种逻辑拆分为多个运算符比尝试在 1 个映射运算符中修复它更容易...
public getWishlist ( receiver : Person) : Observable<Wish[]> {
return this.http$.get<IWishlistResponse[]>(
environment.apiUrl + 'wishlist/' + receiver.id
).pipe(
// Create wish instances from each wish in API response and save reservation for later use
map( wishlistResponse =>
wishlistResponse[0].wishes.map(wish => ({
wish: this.createWishInstanceFromResponse(wish, receiver),
reservation: wish.reservation
}))),
// For each wish with reservation: get person info for 'reservedBy' id
map( wishesAndReservationObjects => wishesAndReservationObjects.map( ({wish, reservation}) =>
!reservation ?
of(wish) :
this.peopleService.getPersonById(reservation.reservedBy)
.pipe(
map ( reservedBy => {
if (reservedBy) wish.reservation = {
...reservation,
reservedBy: new Person(reservedBy.id, reservedBy.firstName, reservedBy.lastName)
}
return wish;
})
)
)),
// forkJoin all observables, so the result is an array of all the wishes
switchMap(reservedByObservables => reservedByObservables.length !== 0 ? forkJoin(reservedByObservables) : of(<Wish[]>[])), //
// Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
map ( wishes => wishes.map( wish => {
wish.setUserIsFlags(this.userService.currentUser);
return wish;
})),
// For each wish: get state via API call
map ( wishesWithoutState => wishesWithoutState.map( wishWithoutState =>
this.http$.get<wishStatus>(environment.apiUrl + 'wish/' + wishWithoutState.id + '/state')
.pipe(
catchError(() => of(null)),
map( state => {
wishWithoutState.status = state;
return wishWithoutState;
})
)
)),
// Combine all stateObservables into 1 array
switchMap(stateObservables => stateObservables.length !== 0 ? forkJoin(stateObservables) : of(<Wish[]>[]))
)
}
private createWishInstanceFromResponse ( wishResponse : IWishResponse, receiver: Person ) : Wish {
let wish : Wish = new Wish (
wishResponse._id,
wishResponse.title,
wishResponse.price,
...
);
return wish;
}
您使用 from
和 concatMap
来完成此操作。
let wishes: Wish[] = [];
from(response.wishes).pipe(concatMap(wish) => {
let newWish : Wish = new Wish( wish._id, wish.title, wish.price );
let personObservables: Observable<any>[] = [];
if (wish.reservation){
personObservables.push(this.peopleService.getPersonById(wish.reservation.reservedBy));
} else {
personObservables.push(of(null));
}
return forkJoin(personObservables).pipe(tap([reservedBy]) => {
if (reservedBy) {
newWish.reservation = {
...wish.reservation,
reservedBy
};
}
wishes.push(newWish);
}));
}).subscribe();
您可以创建此函数来处理您的流程:
completeData(items): Observable<any> {
let $apiCallItems: Observable<any>[] = [];
const itemsNeedServiceCall: Array<any> = [];
const itemsCompleted: Array<any> = [];
items.forEach((item) => {
if (item.reservation) {
$apiCallItems.push(this.peopleService.getPersonById(item.reservation.reservedBy));
itemsNeedServiceCall.push(item);
} else {
itemsCompleted.push(item);
}
});
return forkJoin($apiCallItems).pipe(
map((r) => {
for (let i = 0; i < r.length; i++) {
itemsNeedServiceCall[i] = {
...itemsNeedServiceCall[i],
...r[i],
};
}
return [...itemsCompleted, ...itemsNeedServiceCall];
})
);
}
然后在任何你想使用的地方你都可以这样做:
this.completeData(response.wishes).subscribe(r => {
//some code
})
作为一般规则,永远不要在可观察对象上调用订阅,除非您是最终消费者并且您准备好丢弃可观察对象(不再需要对数据进行转换)。这通常只发生在向用户显示或写入数据库时。
我不太清楚您的代码在做什么(您的各种调用 return)。所以这是我将如何做你似乎正在尝试的事情的近似值。
按照这些思路应该可行。
请注意,在我准备好将最终的愿望列表记录到控制台之前,我是如何从不订阅的?这是设计使然,因此信息符合预期。
return this.http$.get<IWishlistResponse[]>(
environment.apiUrl +
'wishlist/' +
receiver.id
).pipe(
map(wishlist => wishlist.wishes.map(wish => ({
oldWish: wish,
newWish: new Wish( wish._id, wish.title, wish.price)
}))),
map(wishes => wishes.map(({oldWish, newWish}) =>
!oldWish.reservation ?
of(newWish) :
this.peopleService.getPersonById(oldWish.reservation.reservedBy).pipe(
map(([reservedBy]) => reservedBy ?
{
...newWish,
reservation: {
...oldWish.reservation,
reservedBy
}
} :
{
...newWish,
reservation: oldWish.reservation
}
)
)
)),
switchMap(newWishObservables => forkJoin(newWishObservables))
).subscribe(newWishes => console.log(newWishes));
更新 #1:清理代码
一些使代码更易于维护、错误检查等的方法是将可观察对象的创建外包到单独的函数中,并保持转换逻辑的管道“干净(呃)”。
完成后,此模式:
map(things => things.map(thing => observableThing)),
mergeMap(observableThings => forkJoin(observableThings))
可以组合起来而不会造成太多混乱
mergeMap(things => forkJoin(
things.map(thing => observableThing)
))
以下是我如何整理更新后的代码中的管道(这只是即时快速完成,未经测试)。
您会注意到我去掉了处理空 forkJoin
的代码。我不认为这真的是个问题,您可以在 observable 完成后在管道中解决这个问题。
您还会注意到,我在创建可观察对象的两个函数中做了一些快速的错误处理。两次我都只是替换了一个默认值(尽管在管道的不同部分,效果略有不同)。实际上,您可能需要检查错误的名称或类型并做出相应的响应。
我只是忽略错误,但有些错误不应该被悄悄忽略。由你决定。
public getWishlist ( receiver : Person) : Observable<Wish[]> {
return this.http$.get<IWishlistResponse[]>(
environment.apiUrl + 'wishlist/' + receiver.id
).pipe(
// Create wish instances from each wish in API response and save reservation for later use
map( wishlistResponse =>
wishlistResponse[0].wishes.map(wish => ({
wish: this.createWishInstanceFromResponse(wish, receiver),
reservation: wish.reservation
}))
),
// For each wish with reservation: get person info for 'reservedBy' id
map(wishesAndReservationObjects =>
wishesAndReservationObjects.map(({wish, reservation}) =>
this.addReservationToWish(wish, reservation)
)
),
// forkJoin all observables, so the result is an array of all the wishes
switchMap(reservedByObservables =>
forkJoin(reservedByObservables)
),
// Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
map (wishes => wishes.map(wish => {
wish.setUserIsFlags(this.userService.currentUser);
return wish;
})),
// For each wish: get state via API call
map(wishesWithoutState => wishesWithoutState.map(wishWithoutState =>
this.addStatusToWish(wishWithoutState)
)),
// Combine all stateObservables into 1 array
switchMap(stateObservables =>
forkJoin(stateObservables)
),
// If this.http$.get<IWishlistResponse[]> is empty, this will all
// complete without emitting anything. So we can just give it a default
// thing to emit in case that happens.
defaultIfEmpty(<Wish[]>[])
);
}
private createWishInstanceFromResponse(
wishResponse : IWishResponse,
receiver: Person
): Wish {
let wish : Wish = new Wish (
wishResponse._id,
wishResponse.title,
wishResponse.price
);
return wish;
}
/***
* Create an observable that tries to add reservation and
* reservedBy to the wish
***/
private addReservationToWish(wish: Wish, reservation): Observable<Wish>{
return this.peopleService.getPersonById(reservation?.reservedBy).pipe(
// if peopleService.getPersonById fails (Say, because reservation was
// null), convert the error into a null response, the filter will Ignore
// the failed call and just return wish.
catchError(_ => of(null)),
// Since errors become null, this filters errors. Furthermore if peopleService
// doesn't error and just emits a null/undefined value instead, this will
// filter that situation as well
filter(reservedBy => reservedBy != null),
// Now we know reservedBy isn't null, so map doesn't need to check
map(reservedBy => {
wish.reservation = reservation;
wish.reservation.reservedBy = new Person(
reservedBy.id,
reservedBy.firstName,
reservedBy.lastName,
...
);
return wish;
}),
// If reservedBy was null (due to error or otherwise), then just emit
// the wish without a reservation
defaultIfEmpty(wish)
);
}
/***
* Create an observable that tries to add status to the wish
***/
private addStatusToWish(wish: Wish): Observable<Wish>{
return this.http$.get<WishStatus>(
environment.apiUrl +
'wish/' +
wishWithoutState.id +
'/state'
).pipe(
map(state => ({
...wish,
status: state
})),
// If http$.get<WishStatus> failed, then just return the wish
// without a status
catchError(_ => of(wish)),
);
}
既然 map
到 forkJoin(mapped)
的转换看起来足够干净,我可能会合并它们,但这取决于你。
这是它的样子:
public getWishlist ( receiver : Person) : Observable<Wish[]> {
return this.http$.get<IWishlistResponse[]>(
environment.apiUrl + 'wishlist/' + receiver.id
).pipe(
// Create wish instances from each wish in API response
// For each wish with reservation: get person info for 'reservedBy' id
switchMap(wishlistResponse => forkJoin(
wishlistResponse[0].wishes.map(wish =>
this.addReservationToWish(
this.createWishInstanceFromResponse(wish, receiver),
wish.reservation
)
)
)),
// Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
map (wishes => wishes.map(wish => {
wish.setUserIsFlags(this.userService.currentUser);
return wish;
})),
// For each wish: get state via API call
switchMap(wishesWithoutState => forkJoin(
wishesWithoutState.map(wishWithoutState => this.addStatusToWish(wishWithoutState))
)),
// If this.http$.get<IWishlistResponse[]> is empty, this will all
// complete without emitting anything. So we can just give it a default
// thing to emit in case that happens.
defaultIfEmpty(<Wish[]>[])
);
}