Angular/ RxJS:带有可观察对象的嵌套服务调用

Angular/ RxJS: nested service calls with observables

我是 RxJS 可观察对象的新手,我正在尝试解决一个相当简单的用例。

在一项服务中,我首先进行了一个 http 调用,return是一个项目(作为可观察对象)。该项目包含一个 id 数组,其中一些重复。对于每个不同的 id,我需要调用另一个 http 服务(同样 returns 一个可观察的),并将其 return 值添加到原始项目以代替相应的 id。这些调用应该并行发生。最后,每次调用完成后,我的服务应该 return 原始项目的可观察对象,现在其子项目就位。

为了给出一个更好的主意,这就是 promises 而不是 observables 的样子:

MyService() {
    return HttpGetMainItem()
        .then(item => {
            var promises = _.uniq(item.subItems)
                 .map(sid => HttpGetSubItem(sid)
                            .then(subItem => {
                                // add to matching item.subItems
                             }));
            // wait for all promises to complete and return main item
            return Promise.all(promises).then(() => item);
        });
}

使用可观察对象完成这项工作的最佳方法是什么?

编辑:从答案看来我不是很清楚。 promises 的例子只是为了清楚起见,在我的例子中,http 调用实际上是 Angular 的 HttpClient.get,所以它们是 return observables——我希望用 observables 做所有事情。

也许你可以使用像 async.each -> https://caolan.github.io/async/docs.html#each 这样的库(可能是每个系列)。

会是这样的:

async.each(array, (obj, cb) => {
  observable with obj in parameter and with subscriber result : 
  cb(err, subscriberRes);
}, (err, res) => {
  console.log(res);
}

我相信你需要这样的东西:

import { from, forkJoin } from 'rxjs';
import { switchMap } from 'rxjs/operators';

itemWithSubItems$ = from(HttpGetMainItem()).pipe(
    switchMap(item => {
        const promises = _.uniq(item.subItems).map(item => HttpGetSubItem(item.sid));
        return forkJoin(promises)
            .map(resultArray => item.subItems = resultArray)
            .map(_ => item);
    })
);

首先获取主要项目。然后使用 forkJoin 解决所有子查询并丰富主项。之后就是 return 主要项目。

这是您可以使用 rxjs 完成上述任务的一种方法。

  1. 使用 from.
  2. 将承诺转换为可观察对象
  3. 在外部管道调用 switchMap 以便您可以调用另一个 observable 和 return 作为外部 observable 的结果。您正在切换执行上下文。
  4. 在 switchMap 中像以前一样做子项的映射,然后使用 forkJoin 创建所有元素承诺的 Observable。一旦所有承诺完成,ForkJoin 将发出所有结果的数组。就像 promise.all.
  5. 像您之前计划的那样添加项目,然后 return 原始项目。

代码

from(HttpGetMainItem()).pipe(
    switchMap(item => 
        forkJoin(_.uniq(item.subItems).map(sid => HttpGetSubItem(sid)))
            .pipe(
                map(results => { 
                    /* add results to matching items.subItems and */
                    return item;
                })
            )
     )
);

我觉得这看起来有点笨拙,因为需要保留根项及其所需的嵌套。您可以使用 switchMap 的选择器参数来组合外部和内部可观察对象。您可以使用该参数代替您在 map() 中的逻辑,并且由于两个 observables 的结果都已传递,因此您不需要任何进一步的嵌套。

from(HttpGetMainItem()).pipe(
    switchMap(
        item => forkJoin(_.uniq(item.subItems).map(sid => HttpGetSubItem(sid))),
        (item, results) => { 
             /* add results to matching items.subItems and */
             return item;
        }
    )
);