在 RxJS 中链接 Observable

Chaining Observables in RxJS

我正在学习 RxJS 和 Angular 2. 假设我有一个包含多个异步函数调用的 promise 链,它取决于前一个函数的结果,如下所示:

var promiseChain = new Promise((resolve, reject) => {
  setTimeout(() => {
    resolve(1);
  }, 1000);
}).then((result) => {
  console.log(result);

  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(result + 2);
    }, 1000);
  });
}).then((result) => {
  console.log(result);

  return new Promise((resolve, reject) => {
      setTimeout(() => {
      resolve(result + 3);
        }, 1000);
  });
});

promiseChain.then((finalResult) => {
  console.log(finalResult);
});

我在不使用 promise 的情况下仅使用 RxJS 做同样的尝试产生了以下结果:

var observableChain = Observable.create((observer) => {
  setTimeout(() => {
    observer.next(1);
    observer.complete();
  }, 1000);
}).flatMap((result) => {
  console.log(result);

  return Observable.create((observer) => {
    setTimeout(() => {
      observer.next(result + 2);
      observer.complete()
    }, 1000);
  });
}).flatMap((result) => {
  console.log(result);

  return Observable.create((observer) => {
    setTimeout(() => {
      observer.next(result + 3);
      observer.complete()
    }, 1000);
  });
});

observableChain.subscribe((finalResult) => {
  console.log(finalResult);
});

它产生与承诺链相同的输出。我的问题是

  1. 我这样做对吗?我可以对上面的代码做任何与 RxJS 相关的改进吗

  2. 如何让这个可观察链重复执行?即在最后添加另一个订阅只会产生额外的 6,尽管我希望它打印 1、3 和 6。

    observableChain.subscribe((finalResult) => { console.log(最终结果); });

    observableChain.subscribe((finalResult) => { console.log(最终结果); });

    1 3个 6个 6

关于promise composition vs. Rxjs,因为这是一个经常被问到的问题,你可以参考一些以前在SO上问过的问题,其中:

  • How to do the chain sequence in rxjs

基本上,flatMap 相当于 Promise.then

对于你的第二个问题,你是想重播已经发出的值,还是想在新值到达时对其进行处理?在第一种情况下,检查 publishReplay 运算符。在第二种情况下,标准订阅就足够了。但是,您可能需要注意寒冷。与热二分法取决于你的来源(参见 对概念的解释说明)

说明示例:

管道顶部可以发出 n 个值(这回答了“我如何让这个可观察链重复执行”),但后续链接流发出一个值(因此模仿承诺)。

// Emit three values into the top of this pipe
const topOfPipe = of<string>('chaining', 'some', 'observables');

// If any of the chained observables emit more than 1 value
// then don't use this unless you understand what is going to happen.
const firstObservablePipe = of(1); 
const secondObservablePipe = of(2);
const thirdObservablePipe = of(3);
const fourthObservablePipe = of(4);

const addToPreviousStream = (previous) => map(current => previous + current);
const first = (one) => firstObservablePipe.pipe(addToPreviousStream(one));
const second = (two) => secondObservablePipe.pipe(addToPreviousStream(two));
const third = (three) => thirdObservablePipe.pipe(addToPreviousStream(three));
const fourth = (four) => fourthObservablePipe.pipe(addToPreviousStream(four));

topOfPipe.pipe(
  mergeMap(first),
  mergeMap(second),
  mergeMap(third),
  mergeMap(fourth),
).subscribe(console.log);

// Output: chaining1234 some1234 observables1234

您也可以使用 concatMap 或 switchMap。它们都有细微的差别。请参阅 rxjs 文档以了解。

合并地图: https://www.learnrxjs.io/learn-rxjs/operators/transformation/mergemap

连接图: https://www.learnrxjs.io/learn-rxjs/operators/transformation/concatmap

切换地图: https://www.learnrxjs.io/learn-rxjs/operators/transformation/switchmap