使用 RxJS pipe() 将数组转换为 Angular 中的异步值流
Use RxJS pipe() to turn array into stream of asynchronous values in Angular
type Movie = {id: string};
type FullMovie = {id: string, picture: string};
我有一个 url returns 类型数组 Movie
:
http.get(url).subscribe(res: Movie[])
我将 http.get(movie.id)
用于数组中的 每个 电影返回 FullMovie
:
http.get(movie.id).subscribe(res: FullMovie)
所以本质上我想创建一个方法来 returns FullMovie 对象流,作为请求解析:getAll = (url): Observable<FullMovie>
getAll = (url): Observable<FullMovie> => {
return http.get(url)
//must pipe the array into a stream of FullMovies but not a stream of FullMovie Observables. I don't want to subscribe to each of the returned FullMovies
//something like
.pipe(//map(array => array.forEach(movie => return http.get(movie.id))))
}
目前我有以下有效的解决方案,但我想要一个更简洁的解决方案:
private getFull = (queryGroup: string): Observable<TMDBMovie> =>
new Observable<TMDBMovie>((observer) => {
//get movie array
this.httpGet(queryGroup).subscribe((movies) => {
var j = 0;
if (movies.length === 0) return observer.complete();
//loop through elements
movies.forEach(movie => {
this.getById(movie.id).subscribe(
(res) => complete(observer.next(res)),
(error) => complete()
);
});
}
const complete = (arg: any = 0) => {
if (++j === len) observer.complete();
};
});
});
编辑:
这有效
newGetFull = (queryGroup: string) =>
this.httpGet(queryGroup)
.pipe(concatMap((arr) => from(arr)))
.pipe(
mergeMap((movie) => this.getById(movie.id).pipe(catchError(() => of())))
);
您可能想按照这些思路尝试一些东西
getAll = (url): Observable<FullMovie> => {
return http.get(url)
.pipe(
// turn the array Movie[] into a stream of Movie, i.e. an Obsevable<Movie>
concatMap(arrayOfMovies => from(arrayOfMovies)),
// then use mergeMap to "flatten" the various Obaservable<FullMovie> that you get calling http.get(movie.id)
// in other words, with mergeMap, you turn a stream of Observables into a stream of the results returned when each Observable is resolved
mergeMap(movie => http.get(movie.id))
)
}
请考虑使用上述 mergeMap
,您无法保证最终流的顺序与您从第一次调用中获得的 Movie
数组的顺序相同。这是因为每个 http.get(movie.id)
到 return 可能需要不同的时间,因此不能保证顺序。
如果需要保证顺序,使用concatMap
而不是mergeMap
(实际上concatMap
是mergeMap
,并发设置为1)。
如果您希望所有 http.get(movie.id)
在 return 结果之前完成,请使用 forkJoin
而不是像这样 mergeMap
getAll = (url): Observable<FullMovie> => {
return http.get(url)
.pipe(
// turn the array Movie[] into an array of Observable<Movie>
map(arrayOfMovies => arrayOfMovies.map(movie => http.get(movie.id))),
// then use forkJoin to resolve all the Observables in parallel
concatMap(arrayOfObservables => forkJoin(arrayOfObservables))
).subscribe(
arrayOfFullMovies => {
// the result notified by forkJoin is an array of FullMovie objects
}
)
}
type Movie = {id: string};
type FullMovie = {id: string, picture: string};
我有一个 url returns 类型数组 Movie
:
http.get(url).subscribe(res: Movie[])
我将 http.get(movie.id)
用于数组中的 每个 电影返回 FullMovie
:
http.get(movie.id).subscribe(res: FullMovie)
所以本质上我想创建一个方法来 returns FullMovie 对象流,作为请求解析:getAll = (url): Observable<FullMovie>
getAll = (url): Observable<FullMovie> => {
return http.get(url)
//must pipe the array into a stream of FullMovies but not a stream of FullMovie Observables. I don't want to subscribe to each of the returned FullMovies
//something like
.pipe(//map(array => array.forEach(movie => return http.get(movie.id))))
}
目前我有以下有效的解决方案,但我想要一个更简洁的解决方案:
private getFull = (queryGroup: string): Observable<TMDBMovie> =>
new Observable<TMDBMovie>((observer) => {
//get movie array
this.httpGet(queryGroup).subscribe((movies) => {
var j = 0;
if (movies.length === 0) return observer.complete();
//loop through elements
movies.forEach(movie => {
this.getById(movie.id).subscribe(
(res) => complete(observer.next(res)),
(error) => complete()
);
});
}
const complete = (arg: any = 0) => {
if (++j === len) observer.complete();
};
});
});
编辑:
这有效
newGetFull = (queryGroup: string) =>
this.httpGet(queryGroup)
.pipe(concatMap((arr) => from(arr)))
.pipe(
mergeMap((movie) => this.getById(movie.id).pipe(catchError(() => of())))
);
您可能想按照这些思路尝试一些东西
getAll = (url): Observable<FullMovie> => {
return http.get(url)
.pipe(
// turn the array Movie[] into a stream of Movie, i.e. an Obsevable<Movie>
concatMap(arrayOfMovies => from(arrayOfMovies)),
// then use mergeMap to "flatten" the various Obaservable<FullMovie> that you get calling http.get(movie.id)
// in other words, with mergeMap, you turn a stream of Observables into a stream of the results returned when each Observable is resolved
mergeMap(movie => http.get(movie.id))
)
}
请考虑使用上述 mergeMap
,您无法保证最终流的顺序与您从第一次调用中获得的 Movie
数组的顺序相同。这是因为每个 http.get(movie.id)
到 return 可能需要不同的时间,因此不能保证顺序。
如果需要保证顺序,使用concatMap
而不是mergeMap
(实际上concatMap
是mergeMap
,并发设置为1)。
如果您希望所有 http.get(movie.id)
在 return 结果之前完成,请使用 forkJoin
而不是像这样 mergeMap
getAll = (url): Observable<FullMovie> => {
return http.get(url)
.pipe(
// turn the array Movie[] into an array of Observable<Movie>
map(arrayOfMovies => arrayOfMovies.map(movie => http.get(movie.id))),
// then use forkJoin to resolve all the Observables in parallel
concatMap(arrayOfObservables => forkJoin(arrayOfObservables))
).subscribe(
arrayOfFullMovies => {
// the result notified by forkJoin is an array of FullMovie objects
}
)
}