如何为 RXJS 中的每个可观察对象执行函数

How to execute function for each observable in RXJS

我遇到这样的情况:

//pseudo code
var agreement:Observable<agreement[]> = Observable.ajax({...})
    .flatMap(agreements:[] => {
        agreements.forEach(agreement =>{
           //server request for each agreement
           //If request fail throw exception
        });
        return agreements;
    })
    .map(agreements => agreements);

在这里,我需要检查每个协议是否存在于其他服务器中,如果不存在则抛出异常。主要问题是如何等待检查所有协议,然后才得到结果。

这个问题的答案取决于 forEach 循环中发生的事情——如果 agreements(进入 flatMap 的那个)是一个你正在使用的 Observable forEach 发射,策略完全不同,因为您不一定知道将发射多少 agreement 个对象。

假设 forEach 只是 运行 一些其他基本的 http 调用,比如说,returns 承诺的...然后答案将涉及收集承诺和允许他们全部完成 Promise-way,这样完成就可以转换为链中下一个运算符的 Observable。

一些伪代码:

var agreement: Observable<agreement[]> = Observable.ajax({...})
    .flatMap(agreements:[] => {
        const promises = agreements.map(agreement => doHttpThing()
            .then(...)
            .catch(err => Observable.throw(err))
        );
        return Observable.fromPromise(Promise.all(promises));
    })
    .map(agreements => agreements);