concatMap 可变数量的承诺和管道结果作为下一个承诺的参数
concatMap variable amount of promises and pipe results as parameter for the next promise
我有数量可变的承诺,我想按顺序执行它们,将前一个承诺的结果作为下一个承诺的参数。目前我设法连接它们以便按顺序执行它们:
const promises = [p1, p2, p3, p4, ....];
const source$ = promises.map(p => Rx.Observable.defer(p));
const combination$ = Rx.Observable.concat(...source$);
combination.subscribe((x) => console.log(x));
但是我现在如何设法将参数通过管道传递到每个 promise 中?我读到我可以使用 concatMap。类似的东西:
Observable.from(p1).concatMap(res => p2(res)).concatMap(res => p3(res))
我想我总是需要 return 在 concatMap 中创建一个新的 Observable。还有如果许多 promise 应该是可变的,如何链接 concatMap?
有人能给我指出正确的方向吗?我是整个反应式编程的新手,但对于我已经了解的部分,它非常棒!
谢谢
看起来您可以使用 expand()
运算符递归地投影从内部 Observable 发出的值。
例如,这里我有一个 Observable 数组,我想将累加值与当前值相加:
const arr = [1, 2, 3, 4, 5].map(v => Observable.of(v));
Observable.of(arr)
.concatMap(arr => {
return Observable
.concat(arr[0]) // get the first item
.expand((prev, i) => {
i = i + 1; // We skipped the first Observable in `arr`
if (i === arr.length) {
return Observable.empty();
} else {
return arr[i].map(result => prev + result)
}
});
})
.subscribe(console.log);
这会打印:
1
3
6
10
15
或者如果使用 takeLast(1)
:
则只能得到最后一个值
return Observable
.concat(arr[0])
.expand((prev, i) => {
...
})
.takeLast(1);
或者使用 mergeScan
有更简单的解决方案,但我认为这不太容易理解:
Observable.of(arr)
.concatAll()
.mergeScan((acc, observable) => {
return observable.map(result => result + acc);
}, 0)
.subscribe(console.log);
结果与前面的例子相同。
好的,我终于得到了我想要的结果
const promises = [
(i=0) => Promise.resolve(i+2),
(i=2) => Promise.resolve(i+4),
(i=2) => Promise.resolve(i+8)
];
const initial$ = Rx.Observable.from(promises[0]());
promises.reduce((o, p) => o.concatMap(p), initial$)
.subscribe(console.log);
我有数量可变的承诺,我想按顺序执行它们,将前一个承诺的结果作为下一个承诺的参数。目前我设法连接它们以便按顺序执行它们:
const promises = [p1, p2, p3, p4, ....];
const source$ = promises.map(p => Rx.Observable.defer(p));
const combination$ = Rx.Observable.concat(...source$);
combination.subscribe((x) => console.log(x));
但是我现在如何设法将参数通过管道传递到每个 promise 中?我读到我可以使用 concatMap。类似的东西:
Observable.from(p1).concatMap(res => p2(res)).concatMap(res => p3(res))
我想我总是需要 return 在 concatMap 中创建一个新的 Observable。还有如果许多 promise 应该是可变的,如何链接 concatMap?
有人能给我指出正确的方向吗?我是整个反应式编程的新手,但对于我已经了解的部分,它非常棒!
谢谢
看起来您可以使用 expand()
运算符递归地投影从内部 Observable 发出的值。
例如,这里我有一个 Observable 数组,我想将累加值与当前值相加:
const arr = [1, 2, 3, 4, 5].map(v => Observable.of(v));
Observable.of(arr)
.concatMap(arr => {
return Observable
.concat(arr[0]) // get the first item
.expand((prev, i) => {
i = i + 1; // We skipped the first Observable in `arr`
if (i === arr.length) {
return Observable.empty();
} else {
return arr[i].map(result => prev + result)
}
});
})
.subscribe(console.log);
这会打印:
1
3
6
10
15
或者如果使用 takeLast(1)
:
return Observable
.concat(arr[0])
.expand((prev, i) => {
...
})
.takeLast(1);
或者使用 mergeScan
有更简单的解决方案,但我认为这不太容易理解:
Observable.of(arr)
.concatAll()
.mergeScan((acc, observable) => {
return observable.map(result => result + acc);
}, 0)
.subscribe(console.log);
结果与前面的例子相同。
好的,我终于得到了我想要的结果
const promises = [
(i=0) => Promise.resolve(i+2),
(i=2) => Promise.resolve(i+4),
(i=2) => Promise.resolve(i+8)
];
const initial$ = Rx.Observable.from(promises[0]());
promises.reduce((o, p) => o.concatMap(p), initial$)
.subscribe(console.log);