使用 RxJS pipe() 将数组转换为 Angular 中的异步值流

Use RxJS pipe() to turn array into stream of asynchronous values in Angular

type Movie = {id: string};
type FullMovie = {id: string, picture: string};

我有一个 url returns 类型数组 Movie:

http.get(url).subscribe(res: Movie[])

我将 http.get(movie.id) 用于数组中的 每个 电影返回 FullMovie:

http.get(movie.id).subscribe(res: FullMovie)

所以本质上我想创建一个方法来 returns FullMovie 对象流,作为请求解析:getAll = (url): Observable<FullMovie>

getAll = (url): Observable<FullMovie> => {
  return http.get(url)
    //must pipe the array into a stream of FullMovies but not a stream of FullMovie Observables. I don't want to subscribe to each of the returned FullMovies
    //something like
   .pipe(//map(array => array.forEach(movie => return http.get(movie.id))))
}

目前我有以下有效的解决方案,但我想要一个更简洁的解决方案:

 private getFull = (queryGroup: string): Observable<TMDBMovie> =>
    new Observable<TMDBMovie>((observer) => {
      //get movie array
      this.httpGet(queryGroup).subscribe((movies) => {
        var j = 0;

        if (movies.length === 0) return observer.complete();

        //loop through elements
        movies.forEach(movie => {
          this.getById(movie.id).subscribe(
            (res) => complete(observer.next(res)),
            (error) => complete()
          );
        });
          
        }

        const complete = (arg: any = 0) => {
          if (++j === len) observer.complete();
        };
      });
    });

编辑:

这有效

newGetFull = (queryGroup: string) =>
    this.httpGet(queryGroup)
      .pipe(concatMap((arr) => from(arr)))
      .pipe(
        mergeMap((movie) => this.getById(movie.id).pipe(catchError(() => of())))
      );

您可能想按照这些思路尝试一些东西

getAll = (url): Observable<FullMovie> => {
  return http.get(url)
   .pipe(
      // turn the array Movie[] into a stream of Movie, i.e. an Obsevable<Movie>
      concatMap(arrayOfMovies => from(arrayOfMovies)),
      // then use mergeMap to "flatten" the various Obaservable<FullMovie> that you get calling http.get(movie.id)
      // in other words, with mergeMap, you turn a stream of Observables into a stream of the results returned when each Observable is resolved
      mergeMap(movie => http.get(movie.id))
   )
}

请考虑使用上述 mergeMap,您无法保证最终流的顺序与您从第一次调用中获得的 Movie 数组的顺序相同。这是因为每个 http.get(movie.id) 到 return 可能需要不同的时间,因此不能保证顺序。

如果需要保证顺序,使用concatMap而不是mergeMap(实际上concatMapmergeMap,并发设置为1)。

如果您希望所有 http.get(movie.id) 在 return 结果之前完成,请使用 forkJoin 而不是像这样 mergeMap

getAll = (url): Observable<FullMovie> => {
  return http.get(url)
   .pipe(
      // turn the array Movie[] into an array of Observable<Movie>
      map(arrayOfMovies => arrayOfMovies.map(movie => http.get(movie.id))),
      // then use forkJoin to resolve all the Observables in parallel
      concatMap(arrayOfObservables => forkJoin(arrayOfObservables))
   ).subscribe(
      arrayOfFullMovies => {
        // the result notified by forkJoin is an array of FullMovie objects
      }
   )
}