Angular + RxJS:在 flatMap 中进行多个服务器调用并在订阅中检索它们

Angular + RxJS: Make multiple Server calls within flatMap and retrieve them in subscribe

下面是我的问题的代码片段。

this.locationService.getUserLocation()
    .flatMap(location => {
        // Some code
        return this.searchService.getResults(location); // returns an observable
    })
    .flatMap(searchResponse => {
        // some code  
        // NEED ANOTHER CALL with searchResponse.  --- (A)
        return this.resultsService.getCount(searchResponse.results); --- (B)
    })
    .subscribe(count => {
        console.log(count);
    });

现在我需要在第二个 flatMap 块中再调用一次。我该如何处理?

如果我进行多次调用,我如何订阅多个结果并检索订阅中的数据?

P.S: A 和 B 调用可以并行进行。他们只需要 searchResponse 作为输入。

您可以使用 Observable.forkJoin,这将并行调用并且 return 一旦所有内部可观察对象完成:

this.locationService.getUserLocation()
    .flatMap(location => {
        // Some code
        return this.searchService.getResults(location); // returns an observable
    })
    .flatMap(searchResponse => {
        return Observable.forkJoin(
          this.resultsService.getCount(searchResponse.results),
          Observable.of(searchResponse) // replace with second call here
        );
    })
    .subscribe(([count, results]) => { // forkJoin returns array of results
        console.log(count, results);
    });

这里有关于 forkJoin 运算符的更多信息:Docs


更新

正如@ChauTran 在评论中注意到的那样,如果您的一个或多个可观察对象没有完成(或者您需要在完成之前获得结果),那么您可以使用 Observable.combineLatestDocs).

一旦每个内部 observable 至少发出 一次,此运算符就会 return。这是 combineLatest:

的示例
this.locationService.getUserLocation()
    //...
    .flatMap(searchResponse => {
        return Observable.combineLatest(
          this.resultsService.getCount(searchResponse.results),
          Observable.of(searchResponse) // replace with second call here
        );
    })
    .subscribe(([count, results]) => { // forkJoin returns array of results
        console.log(count, results);
    });