具有一些顺序订阅的 Angular2 自定义可观察对象

Angular2 custom observables with some sequential subscribes

你知道我的问题的解决方案吗? 我需要一个灵活的订阅序列,封装在一个可观察的对象中,如下所示:

saveData() {
    return new Observable((observer) => {
      let success = true;

      if(example === true) {
        this.saveCallbarrings().subscribe((response) => {
          // this response was ignored from angular
          success = (response === true);
        });
      }

      if(example2 === true) {
        this.saveCallbarrings().subscribe((response) => {
          // this response was ignored from angular too
           success = (response === true);
        });
      }

      // and so on ... in the end I need a result of all responses
      observer.next(success);
    });
  }

最后我在提交方法中调用了这个"response-collection"的结果:

onSubmit() {
// Validations and others
...
if(this.isNew) {
        observable = this.create();
      } else {
        observable = this.update();
      }

      return observable.subscribe(success => {
        if(success == true) {
          let subscription = this.saveData().subscribe(successFinished => {
            // And here is the problem, because this var doesnt have the correct response
            if(successFinished === true) {
              this.alertService.success('ALERT.success_saved', {value: 'ALERT.success_edit_user', param: {user: this.user.username}});
            }
          });

          subscription.unsubscribe();
        }
      });

主要问题是 angular 不会等到 "success" var 在第一个代码块中被订阅。 为什么以及对我来说更好的解决方案是什么?

第一个问题:为什么它不起作用?

因为每个订阅都是异步的。当您执行 this.saveCallbarrings().subscribe(...) 时,订阅中的事情可能随时发生(也许永远不会!),因此程序继续执行下一条指令,即 observer.next(success);,其初始值为 success .

第二个问题:什么是最适合我的解决方案?

Rx.Observables 有 so many operators to deal with this asynchronous stuff. In your case, the operator you need is forkJoin。该运算符允许您向他传递一组流,它会订阅所有这些流,当所有流都完成时,它会给您一个数组,其中包含每个流的每个结果。所以你的代码会变成:

saveData() {
    return Rx.Observable.defer(() => {
        let streams = [];
        if(example === true) {
            streams.push(this.saveCallbarrings());
        }
        if(example2 === true) {
            streams.push(this.saveCallbarrings());
        }

        // And so on

        return Rx.Observable.forkJoin(streams);
    });
}

话虽如此,但我不确定您为什么对同一个 this.saveCallbarrings() 进行多次订阅,我想这只是为了让问题更简单,举个例子。

此外,这里我使用了 .defer() 而不是创建。有了这个,你可以 return 另一个流,它会订阅它并将它传递给观察者。做 defer 和什么都不做(即设置流和只是 returning forkJoin)之间的区别是 defer 不会执行任何代码,直到有人订阅它,这样您的副作用就会减少。