使用 RxJS 链接承诺

Chaining promises with RxJS

总的来说,我是 RxJS 和 FRP 的新手。我的想法是将我的 ExpressJS 应用程序中的现有承诺链转换为可观察的实践。我知道这可能不是最好的例子,但也许有人可以帮助阐明一些问题。

我想做什么:

  1. 我有两个承诺 - prom1 和 prom2
  2. 我希望 prom1 在 prom2
  3. 之前 运行
  4. 如果 prom1 发送 reject(err),我想在它开始之前取消 prom2。
  5. 我希望错误消息 prom1 returns 可用于观察者的 onError 方法。

var prom1 = new Promise(function(resolve, reject) {
    if (true) {
       reject('reason');
    }
    resolve(true);
});

var prom2 = new Promise(function(resolve, reject) {
    resolve(true);
});

// What do I do here? This is what I've tried so far...
var source1 = Rx.Observable.fromPromise(prom1);
var source2 = source1.flatMap(Rx.Observable.fromPromise(prom2));

var subscription = source2.subscribe(
    function (result) { console.log('Next: ' + result); },

    // I want my error 'reason' to be made available here
    function (err) { console.log('Error: ' + err); },

    function () { console.log('Completed'); });

flatMap 将一个 Observable of Observable 变成一个 Observable。它在许多 Promises 示例中使用,因为通常您有一个可观察对象,并且在 map 函数中您希望为每个 "item" 可观察对象创建一个承诺。因为每个 fromPromise 调用都会创建一个新的 Observable,这使它成为 "observable of observables"。 flatMap 将其减少为 "flat" 可观察。

在您的示例中,您做了一些不同的事情,您将一个单一的承诺变成了一个可观察的对象,并希望将它与另一个可观察的对象(也是从一个单一的承诺创建的)链接起来。 Concat 做你正在寻找的东西,它将两个可观察到的链接在一起。

错误案例将按您预期的方式工作。

如果我明白你想做什么 - 你需要从 return 承诺的函数创建两个延迟的可观察对象并连接它们:

var shouldFail = false;

function action1() {
    return new Promise(function (resolve, reject) {    
        console.log('start action1');
        if (shouldFail) {
            reject('reason');
        }
        resolve(true);
    });
}

function action2() {
    return new Promise(function (resolve, reject) {    
        console.log('start action2');
        resolve(true);
    });
}

var source1 = Rx.Observable.defer(action1);
var source2 = Rx.Observable.defer(action2);

var combination = Rx.Observable.concat(source1, source2);

var logObserver = Rx.Observer.create(

function (result) {
    console.log('Next: ' + result);
},

function (err) {
    console.log('Error: ' + err);
},

function () {
    console.log('Completed');
});

然后对于正常情况:

combination.subscribe(logObserver);
// start action1
// Next: true
// start action2
// Next: true
// Completed

第一个 promise 失败的情况:

shouldFail = true;
combination.subscribe(logObserver);
// start action1
// Error: reason

http://jsfiddle.net/cL37tgva/

Observable.forkJoin 在接收其他 Observable 数组时效果很好。

Rx.Observable.forkJoin([this.http.get('http://jsonplaceholder.typicode.com/posts'), this.http.get('http://jsonplaceholder.typicode.com/albums')]).subscribe((data) => {
      console.log(data);
    });