如何订阅包含 Promise 的 Observables 链

How to subscribe to a chain of Observables that contains a Promise

我有一个嵌套的 Observable 情况,经过一些研究我发现最好使用 RxJS switchMap

对于我的情况,第一个 observable 包含一个 Promise,因此我必须等待第一个订阅结果,然后才能将其传递给第二个 observable。现在第二个 Observable 没有被订阅并且返回一个 Observable 而不是订阅它的结果。

firstObservable(x) {
  return this.productService.getProduct(x).pipe(
    map(async result => {
       this.productObject = {...result};
       let timeVariable = await 
       this.distanceMatrixService.getTime(this.productObject, this.currentLocation);
       return this.productObject;
    })
}

async secondObservable(passedProductObject) {    
  const productObject = await passedProductObject;
  return this.productService.getProductsByShop(productObject.store_id).pipe(
    map(result => {
      ////STUFF that depends on the timeVariable 
    }));
} 

chainingObservables(x) {
  return this.firstObservable(x).pipe(
  switchMap(async x => this.secondObservable(x)));
}

this.chainingObservables(this.passedData).subscribe(res => {
 console.log("result", res);
})

result Observable {_isScalar: false, source: Observable, operator: MapOperator}

如有任何建议,我们将不胜感激。

您可以像这样使用 switchMap 运算符在管道链中执行此操作。

你不应该使用 async with observables

假定 this.distanceMatrixService.getTime returns 一个有效的承诺。

    observable(x) {
        return this.productService.getProduct(x).pipe(
            tap(result => {
                this.productObject = {...result};
            }),
            switchMap(() => {
                return from(this.distanceMatrixService.getTime(this.productObject, this.currentLocation))
                    .pipe(map((time) => [time, this.productObject]));
            }),
            switchMap(([time, passedProductObject]) => {
                return this.productService.getProductsByShop(passedProductObject.store_id).pipe(
                    map(result => {
                        ////STUFF that depends on the timeVariable
                        return time;
                    }));
            }));
    }

    subMethod(x) {
        this.observable(this.passedData).subscribe(res => {
            console.log('result', res);
        });
    }

您可以使用 RxJS from 函数将 promise 转换为 observable。之后,switchMapmap 的几个 switchMap 应该可以满足您的要求。

尝试以下方法

this.productService.getProduct(x).pipe(
  map(result => ({...result})),
  switchMap(productObject => 
    from(this.distanceMatrixService.getTime(productObject, this.currentLocation)).pipe(
      map(timeVariable => ({
        productObject: productObject,
        timeVariable: timeVariable
      }))
    )
  ),
  switchMap(({productObject, timeVariable}) =>
    this.productService.getProductsByShop(productObject.store_id).pipe(
      map(result => {
        // stuff that depends on `timeVariable`
      })
    )
  )
).subscribe(
  res => {
    console.log("result", res);
  },
  error => {
    // handle error
  }
);