为什么我的 http 请求 cancel/stop observables 流?

Why does my http request cancel/stop observables stream?

我有一个 applications 数组,我希望每个应用程序都有一个名为 Blobs 的新 属性。我必须发出一个新的 HTTP 请求来为每个应用程序获取 Blob,为此我使用 switchMap。但是,出于某种原因,它只发出最后一个应用程序,而不是每个应用程序。

(我知道解释很糟糕,但是当你阅读代码时你会明白的)

this.serviceOne.getAllApplications(visitId).pipe(
    switchMap((applications) => from(applications)),
    switchMap((application) => {
        console.log("successfully logs every application in applications array");
        return forkJoin([
            of(application),
            this.blobsService.getBlobBs({
                visitId,
                applicationId: application.id,
            }),
        ]);
    }),
    map(([application, blobs]) => {
        console.log("only logs on last emitted application");
        return { ...application, blobs };
    }),
    toArray()
)

它 returns 一个只有最后一个元素的数组(blob 属性 已正确添加到最后一个对象中。)

每当我将 forkJoin 更改为:

return forkJoin([
    of(application) ,
    of([])
]);

它开始正常工作。

获取 Blob:

getBlobBs(identifier: IIdentifier): Observable<BlobItem[]> {
    return this.http.get<BlobItem[]>(
        'service url goes here'
    )
}

使用 mergeMap 而不是 switchMap。

switchMap 正在取消所有请求,只处理最后一个请求。

switchMap : 当新请求到达时取消当前的 subscription/request。用于可取消的请求,例如搜索

mergeMap :并行运行 subscriptions/requests。不要取消当前请求。当顺序不重要时使用

concatMap :按顺序运行 subscriptions/requests 并且性能较低。当顺序很重要时使用。它处理所有 requests/subscriptions,但按顺序处理。

我认为在你的情况下 mergeMap 是合适的,因为你想提供对所有请求的响应。

这是 RxJS 中很常见的模式。您可以在没有 forkJoin 的情况下执行此操作,以避免创建和破坏中间数组。

this.serviceOne.getAllApplications(visitId).pipe(
  mergeMap(applications => applications),

  mergeMap(application => 
    this.blobsService.getBlobBs({
      visitId,
      applicationId: application.id,
    }).pipe(
      takeLast(1),
      map(blobs => ({...application, blobs}))
    )
  ),
  
  toArray()
)