组合多个 FirebaseListObservables

Combining multiple FirebaseListObservables

const placeId = this.getPlaceId();
        this.af.database.list(`placeUsers/${placeId}`).subscribe((userKeys) => {
            for (let index = 0; index < userKeys.length; index++) {
                let userKey = userKeys[index];

                this.af.database.list(`userDevices/${userKey.$key}`).subscribe((deviceKeys) => {

                    for (let index = 0; index < deviceKeys.length; index++) {
                        let deviceKey = deviceKeys[index];

                        this.af.database.object(`devices/${deviceKey.$key}`).subscribe((device) => {

                            console.log(device);
                            // Device received.    

                        });
                    }
                });
            }
        });

我目前正在尝试向关注某个地点的所有用户发送通知。目前的流程是这样的:

我想知道是否有办法将所有这些调用合并为一个可观察的调用。

我目前的问题是,我无法知道所有这些请求何时完成。我研究了 RxJs,它可以让我结合所有这些可观察到的东西。但是我还没有找到一个好的解决方案来解决如何使用四个节点。

您可以使用 concatMapforkJoin 来组合发射设备的可观察对象。这个组合的可观察对象将发出单个设备数组,然后完成(因为 first 运算符仅用于获取第一个发出的列表或对象):

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/concatMap';
import 'rxjs/add/operator/first';
import 'rxjs/add/operator/forkJoin';

this.af.database
  .list(`placeUsers/${placeId}`)
  .first()
  .concatMap(userKeys => {
    let observables = userKeys.map(userKey => this.af.database
      .list(`userDevices/${userKey.$key}`)
      .first()
    );
    return observables.length ?
      Observable.forkJoin(...observables, (...lists) => [].concat(...lists)) :
      Observable.of([])
  })
  .concatMap(deviceKeys => {
    let observables = deviceKeys.map(deviceKeys => this.af.database
      .object(`devices/${deviceKey.$key}`)
      .first()
    );
    return observables.length ?
      Observable.forkJoin(...observables) :
      Observable.of([])
  })
  .subscribe(devices => console.log(devices));

如果您想要一个不完成并在某个地点的用户或其设备发生变化时发出该地点的设备的可观察对象,请使用 switchMap 而不是 concatMapcombineLatest 而不是 forkJoin 并删除 first 运算符:

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/combineLatest';
import 'rxjs/add/operator/switchMap';

this.af.database
  .list(`placeUsers/${placeId}`)
  .switchMap(userKeys => {
    let observables = userKeys.map(userKey => this.af.database
      .list(`userDevices/${userKey.$key}`)
    );
    return observables.length ?
      Observable.combineLatest(...observables, (...lists) => [].concat(...lists)) :
      Observable.of([])
  })
  .switchMap(deviceKeys => {
    let observables = deviceKeys.map(deviceKeys => this.af.database
      .object(`devices/${deviceKey.$key}`)
    );
    return observables.length ?
      Observable.combineLatest(...observables) :
      Observable.of([])
  })
  .subscribe(devices => console.log(devices));