concatMap 可变数量的承诺和管道结果作为下一个承诺的参数

concatMap variable amount of promises and pipe results as parameter for the next promise

我有数量可变的承诺,我想按顺序执行它们,将前一个承诺的结果作为下一个承诺的参数。目前我设法连接它们以便按顺序执行它们:

const promises = [p1, p2, p3, p4, ....];
const source$ = promises.map(p => Rx.Observable.defer(p));
const combination$ = Rx.Observable.concat(...source$);
combination.subscribe((x) => console.log(x));

但是我现在如何设法将参数通过管道传递到每个 promise 中?我读到我可以使用 concatMap。类似的东西:

Observable.from(p1).concatMap(res => p2(res)).concatMap(res => p3(res))

我想我总是需要 return 在 concatMap 中创建一个新的 Observable。还有如果许多 promise 应该是可变的,如何链接 concatMap?

有人能给我指出正确的方向吗?我是整个反应式编程的新手,但对于我已经了解的部分,它非常棒!

谢谢

看起来您可以使用 expand() 运算符递归地投影从内部 Observable 发出的值。

例如,这里我有一个 Observable 数组,我想将累加值与当前值相加:

const arr = [1, 2, 3, 4, 5].map(v => Observable.of(v));

Observable.of(arr)
    .concatMap(arr => {
        return Observable
            .concat(arr[0]) // get the first item
            .expand((prev, i) => {
                i = i + 1; // We skipped the first Observable in `arr`
                if (i ===  arr.length) {
                    return Observable.empty();
                } else {
                    return arr[i].map(result => prev + result)
                }
            });
    })
    .subscribe(console.log);

这会打印:

1
3
6
10
15

或者如果使用 takeLast(1):

则只能得到最后一个值
return Observable
    .concat(arr[0])
    .expand((prev, i) => {
        ...
    })
    .takeLast(1);

或者使用 mergeScan 有更简单的解决方案,但我认为这不太容易理解:

Observable.of(arr)
    .concatAll()
    .mergeScan((acc, observable) => {
        return observable.map(result => result + acc);
    }, 0)
    .subscribe(console.log);

结果与前面的例子相同。

好的,我终于得到了我想要的结果

const promises = [
  (i=0) => Promise.resolve(i+2), 
  (i=2) => Promise.resolve(i+4), 
  (i=2) => Promise.resolve(i+8)
];

const initial$ = Rx.Observable.from(promises[0]());

promises.reduce((o, p) => o.concatMap(p), initial$)
.subscribe(console.log);