Rxjs - 将可观察对象重新映射为值的对象

Rxjs - remap object with observables as values

我得到了一个这样的观察结果:

  const original = Observable.of({
    a: this.http.get('https://jsonplaceholder.typicode.com/todos/11'),
    b: this.http.get('https://jsonplaceholder.typicode.com/todos/22'),
    c: this.http.get('https://jsonplaceholder.typicode.com/todos/33')
  });

我需要解析内部可观察对象并在订阅处理程序中得到类似这样的东西:

  {
    a: ResponseFromServer,
    b: ResponseFromServer,
    c: ResponseFromServer,
  }

我该如何解决这个问题?

谢谢。

编辑:我已经弄明白了,请阅读下文。

似乎没有多少人知道 *Map 运算符曾经有一个叫做 resultSelector 的东西作为他们的第二个参数。现在在 rxjs v6 中,你可以用 inner map 做同样的事情,让我告诉你。

const original = Observable.of({
    a: this.http.get('https://jsonplaceholder.typicode.com/todos/11'),
    b: this.http.get('https://jsonplaceholder.typicode.com/todos/22'),
    c: this.http.get('https://jsonplaceholder.typicode.com/todos/33')
});

const transformed = original.pipe(
    mergeMap(sourceValue => 
        forkJoin(_.values(sourceValue)).pipe(map(resolvedHttpRequests => {
            // here you have access to sourceValue as well as resolvedHttpRequests so you can do manual join to match the data.
        }))
    )
)

你在上面做的方式(Observable.of)本质上是创建一个由低级 Observable 组成的高级 Observable。

我认为更好的运算符是 forkJoin,因为这些 Observable 中的每一个都是冷的和有限的并且 forkJoin 捕获每个 Observable 的第一次发射并在所有值完成时发射所有值:

  const original = forkJoin([
    this.http.get('https://jsonplaceholder.typicode.com/todos/11'),
    this.http.get('https://jsonplaceholder.typicode.com/todos/22'),
    this.http.get('https://jsonplaceholder.typicode.com/todos/33')
  ]);

  let result = {a: null, b: null, c: null};

  original.subscribe([a,b,c] => result = {a,b,c});

请注意,forkJoin 发出的项将是一个数组,其索引与传入的 Observable 的索引相匹配。

如果源对象已经包含 Observables 你可以像下面这样(显然有多种方法可以做到这一点):

const mockXHRCall = (url: string) => {
  return of('response');
};

const original = of({
  a: mockXHRCall('https://jsonplaceholder.typicode.com/todos/11'),
  b: mockXHRCall('https://jsonplaceholder.typicode.com/todos/22'),
  c: mockXHRCall('https://jsonplaceholder.typicode.com/todos/33')
}).pipe(
  mergeMap(obj => {
    const observables$ = Object.keys(obj).map(key => obj[key].pipe(
      map(response => (
        { [key]: response } // wrap the response with eg. { c: ResponseFromC }
      )),
    ));

    return merge(...observables$);
  }),
  scan((acc, wrapped) => (
    { ...acc, ...wrapped }
  ), {}),
  takeLast(1),
).subscribe(console.log);

scan 收集所有响应并将它们合并到一个对象中。

现场演示:https://stackblitz.com/edit/rxjs-ffza8b

2020 年更新

forkJoin(
  // as of RxJS 6.5+ we can use a dictionary of sources
  {
    google: ajax.getJSON('https://api.github.com/users/google'),
    microsoft: ajax.getJSON('https://api.github.com/users/microsoft'),
    users: ajax.getJSON('https://api.github.com/users')
  }
)
  // { google: object, microsoft: object, users: array }
  .subscribe(console.log);

https://www.learnrxjs.io/learn-rxjs/operators/combination/forkjoin