从一组数据创建可观察对象并将所有这些链接在一起
create observables from an array of data and chain all these together
我有以下需求:
- 我有一组数据
- 对于数组中的每一项,我需要执行一个调用,从而产生一个 Observable
- 我需要将所有这些 observables 链接在一起
- 如果其中一个可观察结果导致错误响应,我需要 return false
- 不必进行进一步的观察
上下文是 Angular 中的守卫,它使用 canActivate() returning 一个 Observable
我想避免在守卫中进行以下构造(伪代码):
// the actions to check
var actions = ['x', 'y', 'z'];
canActivate() : Observable<boolean> {
return this.performAction(actions[0]).subscribe(result => {
if(result){
this.performAction(actions[1]).subscribe(result2 => {
if(result2){
this.performAction(actions[2]).subscribe(result3 => {
...
};
}
});
}
//result = false;
};
}
你可以这样做:
import { Observable, Subject, ReplaySubject } from 'rxjs';
const actions = ['x', 'y', 'z'];
const performAction = (action): Observable<boolean> => {
if (action === 'y') {
return Observable.of(false);
}
return Observable.of(true);
}
const observable = Observable.from(actions)
.concatMap(a => performAction(a))
.multicast(new ReplaySubject(1),
s => s.takeWhile(result => result !== false).concat(s.take(1))
);
observable.subscribe(console.log);
查看现场演示(打开控制台):https://stackblitz.com/edit/rxjs5-hnzwtt?file=index.ts
最重要的部分是 multicast
运算符,它遍历所有内容,直到接收到 false
。然后这个 false
值是完成链之前的最后一个值(感谢 concat
)。
输出为:
true
false
rxjs 的更新答案7.x (7.5.4)
这个答案也确实满足了 OP 的要求:
- further observables should not have to proceed
为了让它工作,我必须使用添加到 takeWhile 运算符 (rxjs 6+) 的新参数,该参数 returns 导致 takeWhile 运算符完成 observable 的值。
工作代码示例:https://stackblitz.com/edit/rxjs5-85mw81?file=index.ts
const arrayOfValues = ['x', 'y', 'z'];
const performAction = (action): Observable<boolean> => {
if (action === 'y') {
return of(false);
}
return of(true);
}
const observable = of(...arrayOfValues).pipe(
concatMap(a => performAction(a)),
takeWhile(result => result !== false, true),
takeLast(1));
observable.subscribe(console.log);
我有以下需求:
- 我有一组数据
- 对于数组中的每一项,我需要执行一个调用,从而产生一个 Observable
- 我需要将所有这些 observables 链接在一起
- 如果其中一个可观察结果导致错误响应,我需要 return false
- 不必进行进一步的观察
上下文是 Angular 中的守卫,它使用 canActivate() returning 一个 Observable
我想避免在守卫中进行以下构造(伪代码):
// the actions to check
var actions = ['x', 'y', 'z'];
canActivate() : Observable<boolean> {
return this.performAction(actions[0]).subscribe(result => {
if(result){
this.performAction(actions[1]).subscribe(result2 => {
if(result2){
this.performAction(actions[2]).subscribe(result3 => {
...
};
}
});
}
//result = false;
};
}
你可以这样做:
import { Observable, Subject, ReplaySubject } from 'rxjs';
const actions = ['x', 'y', 'z'];
const performAction = (action): Observable<boolean> => {
if (action === 'y') {
return Observable.of(false);
}
return Observable.of(true);
}
const observable = Observable.from(actions)
.concatMap(a => performAction(a))
.multicast(new ReplaySubject(1),
s => s.takeWhile(result => result !== false).concat(s.take(1))
);
observable.subscribe(console.log);
查看现场演示(打开控制台):https://stackblitz.com/edit/rxjs5-hnzwtt?file=index.ts
最重要的部分是 multicast
运算符,它遍历所有内容,直到接收到 false
。然后这个 false
值是完成链之前的最后一个值(感谢 concat
)。
输出为:
true
false
rxjs 的更新答案7.x (7.5.4)
这个答案也确实满足了 OP 的要求:
- further observables should not have to proceed
为了让它工作,我必须使用添加到 takeWhile 运算符 (rxjs 6+) 的新参数,该参数 returns 导致 takeWhile 运算符完成 observable 的值。
工作代码示例:https://stackblitz.com/edit/rxjs5-85mw81?file=index.ts
const arrayOfValues = ['x', 'y', 'z'];
const performAction = (action): Observable<boolean> => {
if (action === 'y') {
return of(false);
}
return of(true);
}
const observable = of(...arrayOfValues).pipe(
concatMap(a => performAction(a)),
takeWhile(result => result !== false, true),
takeLast(1));
observable.subscribe(console.log);