具有内部订阅的 Rxjs observable 需要很长时间,并且外部 observable 在后续请求中首先执行

Rxjs observable with Inner subscription taking long time and outer observable executed firstly in subsequent requests

我有一个启动 Observable 的按钮,然后我订阅结果并嵌套我有一个订阅取决于之前的 Observable 结果。

this.getData(params).subscribe(result => {
    // Make some first level process
    console.log('[Outter Execution]: ',result);

    this.getInnerData(result).subscribe(res => {
        // Make some inner level process
        console.log('[Inner Execution]: ',res);
    });
});

取决于用户的点击速度,顺序不一样:

//User click slowly

[Outer Execution]
[Inner Execution]

[Outer Execution]
[Inner Execution]

//User start clicking quickly

[Outer Execution]
[Inner Execution]

[Outer Execution]
[Inner Execution]

[Outer Execution]
[Outer Execution]
[Outer Execution]

[Inner Execution]
[Inner Execution]
[Inner Execution]

[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]

如您所见,如果嵌套订阅耗时较长,用户在解决内部订阅之前再次点击,则在解决内部执行之前,第一个[外部执行]消息会被注销。一段时间后,解决了之前长时间的内部订阅,并返回了记录的消息。

我尝试使用 switchMapmergeMap 运算符但没有成功。

[已编辑]:我需要的功能是作为一个块执行,下一次外部执行需要在第一次执行完成后执行(外部和内部订阅)。

使用 RxJS 的方法有很多种,这里没有具体的示例,很难为您提供具体的答案。我能做的就是从我的代码中为您提供一个示例,并希望它接近您要完成的目标。

我有他们供应商的产品。当我得到一个产品时,它有一系列供应商 ID。所以我需要获得产品,然后在该操作中找到供应商。 (听起来与您的场景相似?)

这是我的代码:

  selectedProductSuppliers$ = this.selectedProduct$
    .pipe(
      filter(selectedProduct => Boolean(selectedProduct)),
      switchMap(selectedProduct =>
        forkJoin(selectedProduct.supplierIds.map(supplierId => this.http.get<Supplier>(`${this.suppliersUrl}/${supplierId}`)))
      ),
      tap(suppliers => console.log('product suppliers', JSON.stringify(suppliers)))
    );

此代码使用 switchMap 来确保如果用户多次单击它会切换到最新的选择。

然后它使用 forkJoin 来处理每个供应商 ID,发出一个 http get 请求来检索每个供应商数据并将它们加入一个发出的数组中。

return 值是 Observable<Supplier[]>

您可以在此处找到完整示例:https://github.com/DeborahK/Angular-RxJS

更新:

如果您将上面的代码更改为使用 concatMap 而不是 switchMap,它将处理每个请求,等待一个完成后再执行下一个。

为了确保处理等待,你需要在主Observable上使用concatMap,而不是在依赖的Observable上。否则不等待集合。

我在您的最新更改之前进行了分叉,但这是我想出的似乎有效的方法:

  private clickSubject = new Subject<number>();
  clickAction$ = this.clickSubject.asObservable();

  ngOnInit() {
    this.clickAction$
      .pipe(
        concatMap(value => this.mainProcess(value)
          .pipe(
            mergeMap(x => this.dependantProcess(x))
          )
        )
      )
      .subscribe();
  }

  onClick(value) {
    // Emits a value into the action stream
    this.clickSubject.next(value);
  }

  mainProcess(value) {
    console.log("[Emitted] Main", value);
    return of(value).pipe(delay(10));
  }

  dependantProcess(value) {
    console.log("[Emitted] Dependent", value);
    return of(value).pipe(delay(2000));
  }

注意 concatMap 用于等待 mainProcess。

  • 此代码通过使用 RxJS Subject.

  • 定义操作流来响应用户点击
  • 每次用户单击按钮时,都会发出操作流。

  • 动作流管道使用concatMap缓存请求并等待处理它,直到处理完先前的请求。

  • 当主进程发出时,从属进程执行。

  • 当管道完成(主进程及其依赖进程)时,下一个缓存请求将由 concatMap 处理。

有道理吗?

您可以在此处找到更新后的代码:

https://stackblitz.com/edit/angular-dependent-order-deborahk