如何组合多个 rxjs BehaviourSubjects
How to combine multiple rxjs BehaviourSubjects
我正在构建一个 Angular2 应用程序,并且有两个 BehaviourSubjects
我想在逻辑上合并为一个订阅。我正在发出两个 http 请求,并希望在它们都返回时触发一个事件。我正在查看 forkJoin
与 combineLatest
。似乎 combineLatest 会在 behvaviorSubjects 更新时触发,而 forkJoin 只会在所有 behavoirSubjects 更新后触发。这个对吗?必须有一个普遍接受的模式,不是吗?
编辑
这是我的一个 behaviorSubjects 的示例,我的 angular2 组件正在订阅:
export class CpmService {
public cpmSubject: BehaviorSubject<Cpm[]>;
constructor(private _http: Http) {
this.cpmSubject = new BehaviorSubject<Cpm[]>(new Array<Cpm>());
}
getCpm(id: number): void {
let params: URLSearchParams = new URLSearchParams();
params.set('Id', id.toString());
this._http.get('a/Url/Here', { search: params })
.map(response => <Cpm>response.json())
.subscribe(_cpm => {
this.cpmSubject.subscribe(cpmList => {
//double check we dont already have the cpm in the observable, if we dont have it, push it and call next to propigate new cpmlist everywheres
if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
cpmList.push(_cpm);
this.cpmSubject.next(cpmList);
}
})
});
}
}
这是我的组件订阅的片段:
this._cpmService.cpmSubject.subscribe(cpmList => {
doSomeWork();
});
但我不想在单个订阅上触发 doSomeWork(),而是希望仅在 cpmSubject 和 fooSubject 触发时触发 doSomeWork()。
您可以使用 zip
-operator,其工作方式类似于 combineLatest 或 forkJoin,但仅在两个流都已发出时触发:http://reactivex.io/documentation/operators/zip.html
zip
和combineLatest
的区别是:
Zip 只会触发 "in parallel",而 combineLatest
会触发任何更新并发出每个流的最新值。
因此,假设有以下 2 个流:
streamA => 1--2--3
streamB => 10-20-30
与 zip
:
- "1, 10"
- "2, 20"
- "3, 30"
与 combineLatest
:
- "1, 10"
- "2, 10"
- "2, 20"
- "3, 20"
- "3, 30"
这里还有一个活生生的例子:
const a = new Rx.Subject();
const b = new Rx.Subject();
Rx.Observable.zip(a,b)
.subscribe(x => console.log("zip: " + x.join(", ")));
Rx.Observable.combineLatest(a,b)
.subscribe(x => console.log("combineLatest: " + x.join(", ")));
a.next(1);
b.next(10);
a.next(2);
b.next(20);
a.next(3);
b.next(30);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
还有另一个旁注:永远不要在订阅中订阅。
改为做这样的事情:
this._http.get('a/Url/Here', { search: params })
.map(response => <Cpm>response.json())
.withLatestFrom(this.cpmSubject)
.subscribe([_cpm, cpmList] => {
if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
cpmList.push(_cpm);
this.cpmSubject.next(cpmList);
}
});
我正在构建一个 Angular2 应用程序,并且有两个 BehaviourSubjects
我想在逻辑上合并为一个订阅。我正在发出两个 http 请求,并希望在它们都返回时触发一个事件。我正在查看 forkJoin
与 combineLatest
。似乎 combineLatest 会在 behvaviorSubjects 更新时触发,而 forkJoin 只会在所有 behavoirSubjects 更新后触发。这个对吗?必须有一个普遍接受的模式,不是吗?
编辑
这是我的一个 behaviorSubjects 的示例,我的 angular2 组件正在订阅:
export class CpmService {
public cpmSubject: BehaviorSubject<Cpm[]>;
constructor(private _http: Http) {
this.cpmSubject = new BehaviorSubject<Cpm[]>(new Array<Cpm>());
}
getCpm(id: number): void {
let params: URLSearchParams = new URLSearchParams();
params.set('Id', id.toString());
this._http.get('a/Url/Here', { search: params })
.map(response => <Cpm>response.json())
.subscribe(_cpm => {
this.cpmSubject.subscribe(cpmList => {
//double check we dont already have the cpm in the observable, if we dont have it, push it and call next to propigate new cpmlist everywheres
if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
cpmList.push(_cpm);
this.cpmSubject.next(cpmList);
}
})
});
}
}
这是我的组件订阅的片段:
this._cpmService.cpmSubject.subscribe(cpmList => {
doSomeWork();
});
但我不想在单个订阅上触发 doSomeWork(),而是希望仅在 cpmSubject 和 fooSubject 触发时触发 doSomeWork()。
您可以使用 zip
-operator,其工作方式类似于 combineLatest 或 forkJoin,但仅在两个流都已发出时触发:http://reactivex.io/documentation/operators/zip.html
zip
和combineLatest
的区别是:
Zip 只会触发 "in parallel",而 combineLatest
会触发任何更新并发出每个流的最新值。
因此,假设有以下 2 个流:
streamA => 1--2--3
streamB => 10-20-30
与 zip
:
- "1, 10"
- "2, 20"
- "3, 30"
与 combineLatest
:
- "1, 10"
- "2, 10"
- "2, 20"
- "3, 20"
- "3, 30"
这里还有一个活生生的例子:
const a = new Rx.Subject();
const b = new Rx.Subject();
Rx.Observable.zip(a,b)
.subscribe(x => console.log("zip: " + x.join(", ")));
Rx.Observable.combineLatest(a,b)
.subscribe(x => console.log("combineLatest: " + x.join(", ")));
a.next(1);
b.next(10);
a.next(2);
b.next(20);
a.next(3);
b.next(30);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
还有另一个旁注:永远不要在订阅中订阅。 改为做这样的事情:
this._http.get('a/Url/Here', { search: params })
.map(response => <Cpm>response.json())
.withLatestFrom(this.cpmSubject)
.subscribe([_cpm, cpmList] => {
if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
cpmList.push(_cpm);
this.cpmSubject.next(cpmList);
}
});