如何组合多个 rxjs BehaviourSubjects

How to combine multiple rxjs BehaviourSubjects

我正在构建一个 Angular2 应用程序,并且有两个 BehaviourSubjects 我想在逻辑上合并为一个订阅。我正在发出两个 http 请求,并希望在它们都返回时触发一个事件。我正在查看 forkJoincombineLatest。似乎 combineLatest 会在 behvaviorSubjects 更新时触发,而 forkJoin 只会在所有 behavoirSubjects 更新后触发。这个对吗?必须有一个普遍接受的模式,不是吗?

编辑
这是我的一个 behaviorSubjects 的示例,我的 angular2 组件正在订阅:

export class CpmService {

    public cpmSubject: BehaviorSubject<Cpm[]>;

    constructor(private _http: Http) {
        this.cpmSubject = new BehaviorSubject<Cpm[]>(new Array<Cpm>());
    }

    getCpm(id: number): void {
        let params: URLSearchParams = new URLSearchParams();
        params.set('Id', id.toString());

        this._http.get('a/Url/Here', { search: params })
            .map(response => <Cpm>response.json())
            .subscribe(_cpm => {
                this.cpmSubject.subscribe(cpmList => {
                    //double check we dont already have the cpm in the observable, if we dont have it, push it and call next to propigate new cpmlist everywheres
                    if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
                        cpmList.push(_cpm);
                        this.cpmSubject.next(cpmList);
                    }
                })
            });
    }
}

这是我的组件订阅的片段:

  this._cpmService.cpmSubject.subscribe(cpmList => {
      doSomeWork();
  });

但我不想在单个订阅上触发 doSomeWork(),而是希望仅在 cpmSubject 和 fooSubject 触发时触发 doSomeWork()。

您可以使用 zip-operator,其工作方式类似于 combineLatest 或 forkJoin,但仅在两个流都已发出时触发:http://reactivex.io/documentation/operators/zip.html

zipcombineLatest的区别是: Zip 只会触发 "in parallel",而 combineLatest 会触发任何更新并发出每个流的最新值。 因此,假设有以下 2 个流:

streamA => 1--2--3
streamB => 10-20-30

zip:

  • "1, 10"
  • "2, 20"
  • "3, 30"

combineLatest:

  • "1, 10"
  • "2, 10"
  • "2, 20"
  • "3, 20"
  • "3, 30"

这里还有一个活生生的例子:

const a = new Rx.Subject();
const b = new Rx.Subject();

Rx.Observable.zip(a,b)
  .subscribe(x => console.log("zip: " + x.join(", ")));
Rx.Observable.combineLatest(a,b)
  .subscribe(x => console.log("combineLatest: " + x.join(", ")));

a.next(1);
b.next(10);
a.next(2);
b.next(20);
a.next(3);
b.next(30);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>


还有另一个旁注:永远不要在订阅中订阅。 改为做这样的事情:

this._http.get('a/Url/Here', { search: params })
            .map(response => <Cpm>response.json())
            .withLatestFrom(this.cpmSubject)
            .subscribe([_cpm, cpmList] => {
                if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
                    cpmList.push(_cpm);
                    this.cpmSubject.next(cpmList);
                }
            });