在 RxJS 中链接 Observable
Chaining Observables in RxJS
我正在学习 RxJS 和 Angular 2. 假设我有一个包含多个异步函数调用的 promise 链,它取决于前一个函数的结果,如下所示:
var promiseChain = new Promise((resolve, reject) => {
setTimeout(() => {
resolve(1);
}, 1000);
}).then((result) => {
console.log(result);
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(result + 2);
}, 1000);
});
}).then((result) => {
console.log(result);
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(result + 3);
}, 1000);
});
});
promiseChain.then((finalResult) => {
console.log(finalResult);
});
我在不使用 promise 的情况下仅使用 RxJS 做同样的尝试产生了以下结果:
var observableChain = Observable.create((observer) => {
setTimeout(() => {
observer.next(1);
observer.complete();
}, 1000);
}).flatMap((result) => {
console.log(result);
return Observable.create((observer) => {
setTimeout(() => {
observer.next(result + 2);
observer.complete()
}, 1000);
});
}).flatMap((result) => {
console.log(result);
return Observable.create((observer) => {
setTimeout(() => {
observer.next(result + 3);
observer.complete()
}, 1000);
});
});
observableChain.subscribe((finalResult) => {
console.log(finalResult);
});
它产生与承诺链相同的输出。我的问题是
我这样做对吗?我可以对上面的代码做任何与 RxJS 相关的改进吗
如何让这个可观察链重复执行?即在最后添加另一个订阅只会产生额外的 6,尽管我希望它打印 1、3 和 6。
observableChain.subscribe((finalResult) => {
console.log(最终结果);
});
observableChain.subscribe((finalResult) => {
console.log(最终结果);
});
1
3个
6个
6
关于promise composition vs. Rxjs,因为这是一个经常被问到的问题,你可以参考一些以前在SO上问过的问题,其中:
- How to do the chain sequence in rxjs
基本上,flatMap
相当于 Promise.then
。
对于你的第二个问题,你是想重播已经发出的值,还是想在新值到达时对其进行处理?在第一种情况下,检查 publishReplay
运算符。在第二种情况下,标准订阅就足够了。但是,您可能需要注意寒冷。与热二分法取决于你的来源(参见 对概念的解释说明)
说明示例:
管道顶部可以发出 n 个值(这回答了“我如何让这个可观察链重复执行”),但后续链接流发出一个值(因此模仿承诺)。
// Emit three values into the top of this pipe
const topOfPipe = of<string>('chaining', 'some', 'observables');
// If any of the chained observables emit more than 1 value
// then don't use this unless you understand what is going to happen.
const firstObservablePipe = of(1);
const secondObservablePipe = of(2);
const thirdObservablePipe = of(3);
const fourthObservablePipe = of(4);
const addToPreviousStream = (previous) => map(current => previous + current);
const first = (one) => firstObservablePipe.pipe(addToPreviousStream(one));
const second = (two) => secondObservablePipe.pipe(addToPreviousStream(two));
const third = (three) => thirdObservablePipe.pipe(addToPreviousStream(three));
const fourth = (four) => fourthObservablePipe.pipe(addToPreviousStream(four));
topOfPipe.pipe(
mergeMap(first),
mergeMap(second),
mergeMap(third),
mergeMap(fourth),
).subscribe(console.log);
// Output: chaining1234 some1234 observables1234
您也可以使用 concatMap 或 switchMap。它们都有细微的差别。请参阅 rxjs 文档以了解。
合并地图:
https://www.learnrxjs.io/learn-rxjs/operators/transformation/mergemap
连接图:
https://www.learnrxjs.io/learn-rxjs/operators/transformation/concatmap
切换地图:
https://www.learnrxjs.io/learn-rxjs/operators/transformation/switchmap
我正在学习 RxJS 和 Angular 2. 假设我有一个包含多个异步函数调用的 promise 链,它取决于前一个函数的结果,如下所示:
var promiseChain = new Promise((resolve, reject) => {
setTimeout(() => {
resolve(1);
}, 1000);
}).then((result) => {
console.log(result);
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(result + 2);
}, 1000);
});
}).then((result) => {
console.log(result);
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(result + 3);
}, 1000);
});
});
promiseChain.then((finalResult) => {
console.log(finalResult);
});
我在不使用 promise 的情况下仅使用 RxJS 做同样的尝试产生了以下结果:
var observableChain = Observable.create((observer) => {
setTimeout(() => {
observer.next(1);
observer.complete();
}, 1000);
}).flatMap((result) => {
console.log(result);
return Observable.create((observer) => {
setTimeout(() => {
observer.next(result + 2);
observer.complete()
}, 1000);
});
}).flatMap((result) => {
console.log(result);
return Observable.create((observer) => {
setTimeout(() => {
observer.next(result + 3);
observer.complete()
}, 1000);
});
});
observableChain.subscribe((finalResult) => {
console.log(finalResult);
});
它产生与承诺链相同的输出。我的问题是
我这样做对吗?我可以对上面的代码做任何与 RxJS 相关的改进吗
如何让这个可观察链重复执行?即在最后添加另一个订阅只会产生额外的 6,尽管我希望它打印 1、3 和 6。
observableChain.subscribe((finalResult) => { console.log(最终结果); });
observableChain.subscribe((finalResult) => { console.log(最终结果); });
1 3个 6个 6
关于promise composition vs. Rxjs,因为这是一个经常被问到的问题,你可以参考一些以前在SO上问过的问题,其中:
- How to do the chain sequence in rxjs
基本上,flatMap
相当于 Promise.then
。
对于你的第二个问题,你是想重播已经发出的值,还是想在新值到达时对其进行处理?在第一种情况下,检查 publishReplay
运算符。在第二种情况下,标准订阅就足够了。但是,您可能需要注意寒冷。与热二分法取决于你的来源(参见
说明示例:
管道顶部可以发出 n 个值(这回答了“我如何让这个可观察链重复执行”),但后续链接流发出一个值(因此模仿承诺)。
// Emit three values into the top of this pipe
const topOfPipe = of<string>('chaining', 'some', 'observables');
// If any of the chained observables emit more than 1 value
// then don't use this unless you understand what is going to happen.
const firstObservablePipe = of(1);
const secondObservablePipe = of(2);
const thirdObservablePipe = of(3);
const fourthObservablePipe = of(4);
const addToPreviousStream = (previous) => map(current => previous + current);
const first = (one) => firstObservablePipe.pipe(addToPreviousStream(one));
const second = (two) => secondObservablePipe.pipe(addToPreviousStream(two));
const third = (three) => thirdObservablePipe.pipe(addToPreviousStream(three));
const fourth = (four) => fourthObservablePipe.pipe(addToPreviousStream(four));
topOfPipe.pipe(
mergeMap(first),
mergeMap(second),
mergeMap(third),
mergeMap(fourth),
).subscribe(console.log);
// Output: chaining1234 some1234 observables1234
您也可以使用 concatMap 或 switchMap。它们都有细微的差别。请参阅 rxjs 文档以了解。
合并地图: https://www.learnrxjs.io/learn-rxjs/operators/transformation/mergemap
连接图: https://www.learnrxjs.io/learn-rxjs/operators/transformation/concatmap
切换地图: https://www.learnrxjs.io/learn-rxjs/operators/transformation/switchmap