RxJS 和 Angular HttpClient:使用 forkJoin 处理多个顺序请求

RxJS and Angular HttpClient: use of forkJoin for multiple sequential requests

在 RxJS (ES6) 中,我试图在单个 Observable 中的一系列连续操作中获得结果。

我不确定我是否必须使用 forkJoin(但我希望操作按顺序执行)或 concat 运算符(但我希望在所有操作结束时收到通知已执行)。

我试过了:

forkJoin

  sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        map(products => {
            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            return forkJoin(batch);
        })
    );
  }

连接

  sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        map(products => {
            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            return concat(batch);
        })
    );
  }

在这两种情况下,我都看不到来自 HttpClient 的可观察对象正在执行它们的工作(没有发送 http 请求),但我可以看到日志记录。

批处理中调用的方法如下:

  updateProduct(product: Product) {
      console.log('calling...');
      const options = this.getDefaultRequestOptions();
      return this.http.post('update_product', product, options);
  }

我调用 sync() 函数如下:

this.productService.sync().subscribe(() => {
  console.log('sync done');
}, error => this.utils.handleError(error));

输出:

(8) calling...
sync done

但没有 HTTP 请求开始。

如果我在管道图中的 forkJoin/concat 之外执行相同的操作,我可以看到发送的 HTTP 请求。

sync(): Observable<any> {

    const product = new Product(null, 'Title', 'Desc.', 'CODE');
    return this.api.updateProduct(product);

}

我错过了什么?

---更新-解决方案---

sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        flatMap(products => {

            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            console.log(batch);

            return concat(...batch);
            // return forkJoin(batch);
        })
    );

您的问题不在于 forkJoin/concat,而是使用 map 到 return 另一个可观察值。

在您的代码中,map 会 return Observable<Observable<any>>。您的 IDE 应该突出显示这些东西(不要使用 any,可能这里有问题)。

您的解决方案是将 map 更改为 concatMap(至少用于测试),然后下一个 chain 应该可以工作。

此外,调试 subscribe 及其 return 值以查看您拥有的内容。

尝试

sync(): Observable<any> {
  return from(this.db.getProducts()).pipe(
    map(products => {
      const batch: Observable<any>[] = [];
      for (const product of products) {
        batch.push(this.api.updateProduct(product));
      }
     return batch;
   }),
   switchMap(batch => forkJoin(batch)),
  )
}

这是假设 this.db.getProducts() 是同步静态数据而不是 observable/asynchronous 数据。

那你可以试试

  this.productService.sync().subscribe(() => { console.log('sync done'); });

并查看是否有任何 API 调用。