具有一些顺序订阅的 Angular2 自定义可观察对象
Angular2 custom observables with some sequential subscribes
你知道我的问题的解决方案吗?
我需要一个灵活的订阅序列,封装在一个可观察的对象中,如下所示:
saveData() {
return new Observable((observer) => {
let success = true;
if(example === true) {
this.saveCallbarrings().subscribe((response) => {
// this response was ignored from angular
success = (response === true);
});
}
if(example2 === true) {
this.saveCallbarrings().subscribe((response) => {
// this response was ignored from angular too
success = (response === true);
});
}
// and so on ... in the end I need a result of all responses
observer.next(success);
});
}
最后我在提交方法中调用了这个"response-collection"的结果:
onSubmit() {
// Validations and others
...
if(this.isNew) {
observable = this.create();
} else {
observable = this.update();
}
return observable.subscribe(success => {
if(success == true) {
let subscription = this.saveData().subscribe(successFinished => {
// And here is the problem, because this var doesnt have the correct response
if(successFinished === true) {
this.alertService.success('ALERT.success_saved', {value: 'ALERT.success_edit_user', param: {user: this.user.username}});
}
});
subscription.unsubscribe();
}
});
主要问题是 angular 不会等到 "success" var 在第一个代码块中被订阅。
为什么以及对我来说更好的解决方案是什么?
第一个问题:为什么它不起作用?
因为每个订阅都是异步的。当您执行 this.saveCallbarrings().subscribe(...)
时,订阅中的事情可能随时发生(也许永远不会!),因此程序继续执行下一条指令,即 observer.next(success);
,其初始值为 success
.
第二个问题:什么是最适合我的解决方案?
Rx.Observables 有 so many operators to deal with this asynchronous stuff. In your case, the operator you need is forkJoin。该运算符允许您向他传递一组流,它会订阅所有这些流,当所有流都完成时,它会给您一个数组,其中包含每个流的每个结果。所以你的代码会变成:
saveData() {
return Rx.Observable.defer(() => {
let streams = [];
if(example === true) {
streams.push(this.saveCallbarrings());
}
if(example2 === true) {
streams.push(this.saveCallbarrings());
}
// And so on
return Rx.Observable.forkJoin(streams);
});
}
话虽如此,但我不确定您为什么对同一个 this.saveCallbarrings()
进行多次订阅,我想这只是为了让问题更简单,举个例子。
此外,这里我使用了 .defer()
而不是创建。有了这个,你可以 return 另一个流,它会订阅它并将它传递给观察者。做 defer
和什么都不做(即设置流和只是 returning forkJoin)之间的区别是 defer
不会执行任何代码,直到有人订阅它,这样您的副作用就会减少。
你知道我的问题的解决方案吗? 我需要一个灵活的订阅序列,封装在一个可观察的对象中,如下所示:
saveData() {
return new Observable((observer) => {
let success = true;
if(example === true) {
this.saveCallbarrings().subscribe((response) => {
// this response was ignored from angular
success = (response === true);
});
}
if(example2 === true) {
this.saveCallbarrings().subscribe((response) => {
// this response was ignored from angular too
success = (response === true);
});
}
// and so on ... in the end I need a result of all responses
observer.next(success);
});
}
最后我在提交方法中调用了这个"response-collection"的结果:
onSubmit() {
// Validations and others
...
if(this.isNew) {
observable = this.create();
} else {
observable = this.update();
}
return observable.subscribe(success => {
if(success == true) {
let subscription = this.saveData().subscribe(successFinished => {
// And here is the problem, because this var doesnt have the correct response
if(successFinished === true) {
this.alertService.success('ALERT.success_saved', {value: 'ALERT.success_edit_user', param: {user: this.user.username}});
}
});
subscription.unsubscribe();
}
});
主要问题是 angular 不会等到 "success" var 在第一个代码块中被订阅。 为什么以及对我来说更好的解决方案是什么?
第一个问题:为什么它不起作用?
因为每个订阅都是异步的。当您执行 this.saveCallbarrings().subscribe(...)
时,订阅中的事情可能随时发生(也许永远不会!),因此程序继续执行下一条指令,即 observer.next(success);
,其初始值为 success
.
第二个问题:什么是最适合我的解决方案?
Rx.Observables 有 so many operators to deal with this asynchronous stuff. In your case, the operator you need is forkJoin。该运算符允许您向他传递一组流,它会订阅所有这些流,当所有流都完成时,它会给您一个数组,其中包含每个流的每个结果。所以你的代码会变成:
saveData() {
return Rx.Observable.defer(() => {
let streams = [];
if(example === true) {
streams.push(this.saveCallbarrings());
}
if(example2 === true) {
streams.push(this.saveCallbarrings());
}
// And so on
return Rx.Observable.forkJoin(streams);
});
}
话虽如此,但我不确定您为什么对同一个 this.saveCallbarrings()
进行多次订阅,我想这只是为了让问题更简单,举个例子。
此外,这里我使用了 .defer()
而不是创建。有了这个,你可以 return 另一个流,它会订阅它并将它传递给观察者。做 defer
和什么都不做(即设置流和只是 returning forkJoin)之间的区别是 defer
不会执行任何代码,直到有人订阅它,这样您的副作用就会减少。