为 Observable<sourceitem[]> 中的每个项目调用 api 并 "merge" 返回 Observable<targetitem[]>

Call api for every item in Observable<sourceitem[]> and "merge" it back to an Observable<targetitem[]>

我有一个 Observable,其中包含用户选择的项目。每个选定的项目都用作 api 调用的来源。 api 调用的结果应该是 "merged" 到 Observable。

我似乎不明白的是我如何将结果合并回来。我尝试使用扫描运算符,但 "accumulator" 仅 "grows".

我有这个:

import {
  Observable,
  Subject,
  of ,
  from
} from 'rxjs';
import {
  map,
  merge,
  flatMap
} from 'rxjs/operators';

function apiUri(albumId: String) {
  return `https://jsonplaceholder.typicode.com/albums/${albumId}`
}


const albums$ = new Subject < String[] > ();

const responseStream = albums$.pipe(
  flatMap(albumIds => albumIds),
  flatMap(id => from(fetch(apiUri(id)))),
  flatMap(resp => resp.json()),
  /* I'm in the dark on how to "merge" it back into an Observable<String[]> */
);

responseStream.subscribe(resp => console.log(resp))
const randomCodes1 = ['5', '1', '3'];
albums$.next(randomCodes1);

Stackblitz link 与 运行 示例:https://stackblitz.com/edit/rxjs-flatmap-galore?&file=index.ts

这是允许您创建您正在寻找的 responseStream 的代码。

const responseStream = albums$.pipe(
  map(albumIds => albumIds.map(id => from(fetch(apiUrl(id))).pipe(mergeMap(resp => resp.json())))),
  mergeMap(obsOfIds => forkJoin(obsOfIds)),
);

首先,让我们关注forkJoin。您可以将一个 Observable 数组传递给此运算符,并且此运算符将 return 一个 Observable,当输入数组中的所有 Observable 都已发出时发出,并且它将发出一个数组,其中包含输入中每个 Observable 发出的值数组。

这可能是您正在寻找的。如何创建您需要的 Observable 数组?这是第一个 map 运算符执行的操作。

当您查看该运算符时请考虑一件事。

map(albumIds => albumIds.map(id => from(fetch(apiUrl(id))).pipe(mergeMap(resp => resp.json()))))

您使用了 2 次 map,但它们并不相同 map。第一个 map,即最外部的,是 Observable 的 map 运算符。第二个map,也就是最内部的一个,是Array的map方法。

我已经更新了你的 stackblitz,它似乎工作正常。