将一个 Observable 映射到另一个 Observable 的数组项中,将结果展平

Mapping an Observable in array items of another Observable, flattening the result

这个标题可能需要更多解释。

基本上我从后端得到的是一个带有赛车手数组的 Observable,我想为每个数组项映射另一个 属性 isOnTrack,它由另一个 Observable(简单布尔值)组成我从后端检索。我想压平最终结果,这样我就没有 Observable 中的 Observable 了。我已经尝试了很多 rxjs 运算符,但我无法让它工作。

无效的代码:

this.drivers$ = this.db.list('users').valueChanges().pipe(
  map(arr => arr.map( (driver:any) => {
    driver.isOnTrack = this.db.object(`telemetry/${driver.uid}/values/IsOnTrack`).valueChanges();
     return driver
  })),
  mergeAll()
);

这成功地将 isOnTrack 可观察对象映射到数组项,但我无法将其展平。

项目在 RxJS 6 上

更新 1

在 Jonathan 的回答之后我相信我应该使用解压这个词而不是展平

我要寻找的转换后的 Observable 应该提供类似于

的东西
of([
  {id: 1, name: 'foo', isOnTrack: true},
  {id: 2, name: 'bar', isOnTrack: true},
  {id: 3, name: 'baz', isOnTrack: false},
])

在后端更改一个 IsOnTrack 后,它应该再次发出完整的数组。

of([
  {id: 1, name: 'foo', isOnTrack: false},
  {id: 2, name: 'bar', isOnTrack: true},
  {id: 3, name: 'baz', isOnTrack: false},
])

模拟数据库函数

// this.db.list('users').valueChanges()
const requestIsOnTrack$ = (id: number): Observable<boolean> => interval(1000).pipe(
  take(3),
  map(() => Math.random() >= 0.5)
)

// this.db.object(`telemetry/${driver.uid}/values/IsOnTrack`).valueChanges()
const requestDrivers$ = () => of([
  {id: 1, name: 'foo'},
  {id: 2, name: 'bar'},
  {id: 3, name: 'baz'},
])

实施

const drivers$ = requestDrivers$().pipe(
  map(drivers => drivers.map(driver => requestIsOnTrack$(driver.id).pipe(
    take(1),
    map(isOnTrack => ({
      ...driver,
      isOnTrack
    }))
  ))),
  mergeAll(),
  combineAll()
)

说明

observables中的对象类型只是<T>为了方便不使用接口

  • 从您的数据库请求所有驱动程序:requestDrivers$() => Observable
  • 将请求 isOnTrack 映射到每个驱动程序 requestIsOnTrack$(id) => Observable<Observable<T>[]>
  • 使用 take(1) => Observable<Observable<T>[]>
  • 将您的 requestOnTrack 更新限制为 1
  • 将以前的值映射到新对象中({...driver, isOnTrack}) => Observable<Observable<T>[]>
  • mergeAll 将您的数组拆分为多个发射器 mergeAll() => Observable<Observable<T>>
  • combineAll 拆箱 observables 并将所有映射值绑定到数组 combineAll() => Observable<T[]>

这里是运行stackblitz