RxJS:根据先前的值来恢复对象,然后将这些先前的值与恢复的对象连接并发出

RxJS: Depending on previous values to recover an object and then join and emit those previous values with the recovered object

我有一个案例,我有一堆我从 route.params 恢复的 id,像这样:

const ids$ = this.route.params.pipe(
  map(params => {
    const modelId = +params['modelId'];
    const deliverId = +params['deliverId'];
    const compositionId = +params['compositionId'];

    return { modelId, deliverId, compositionId };
  })
);

之后,我必须从服务器恢复 composition 对象,所以我这样做:

const getComposition$ = ids$.pipe(
  switchMap(ids =>
    this.compositionsService.getComposition(ids.compositionId)
  )
);

现在我想要一个聚合对象,其中包含前两个 ID,而不是最后一个 (compositionId),composition 对象,我的意思是:

entities: {
  modelId: number;
  deliverId: number;
  composition: Composition;
};

所以我这样做:

const aggregated$ = forkJoin(ids$, getComposition$).pipe(
  map(arr => ({
    modelId: arr[0].modelId,
    deliverId: arr[0].deliverId,
    aggregated: arr[1]
  }))
);

然后,我订阅了它:

const aggregated$.subscribe(aggregated => {
  console.log(aggregated);
});

但它从不在控制台上打印任何内容。有趣的是,如果我检查是否对服务器进行了调用,它实际上是,但最后一个可观察的 (aggregated$) 永远不会发出任何东西。

我做错了什么?有没有更好的方法来实现这个?

正如你所说,ids$getComposition$ 实际上都是 this.route.params,它来自 Angular。 forkJoin 运算符需要其所有源 Observables 至少发出一项,并且 所有这些都必须完成 。当您使用 this.route.params 时不会发生这种情况,因为它是一个永远不会完成的主题。

因此您可以使用 take(1) 来完成来源 ids$getComposition$:

forkJoin(ids$.pipe(take(1)), getComposition$.pipe(take(1))
  ...

来自 Observable 文档:

forkJoin: Wait for Observables to complete and then combine last values they emitted.

我认为 forkJoin 在你的情况下不会发出,因为 ids$ 没有完成。

您可以尝试以下方法

const getComposition$ = ids$.pipe(
  switchMap(ids =>
    this.compositionsService.getComposition(ids.compositionId).pipe(
       map(composition => ({composition, modelId: ids.modelId, deliverId: ids.deliverId))
    )
  )
);