解决可观察对象内的多个承诺不起作用

Resolving multiple promises inside an observable not working

我正在使用 Firebase 存储,并且我正在尝试通过函数调用加载所有资产。获得资产 url 的唯一方法是调用 getDownloadURL 其中 returns 一个承诺。我需要为每个资产调用它,但由于某种原因我不能让它等待所有承诺完成后再继续。

我认为从 mergeMap 返回一个承诺会让它等待所有的人,但事实似乎并非如此。

我查看了很多关于 promises 和 RXJS 的问题,但我似乎无法弄清楚代码有什么问题。

getAssets() {

    return this.authService.user$.pipe(
      first(),
      switchMap(user => defer(() => from(this.afs.storage.ref(`${user.uid}/assets`).listAll()))),
      switchMap(assets => from(assets.items).pipe(
        mergeMap(async (asset) => {
        
          return new Promise((res, rej) => {
          
            asset.getDownloadURL().then(url => {
              
              const _asset = {
                name: asset.name,
                url,
              };
  
              this.assets.push(_asset);

              res(_asset);
            })
            .catch((e) => rej(e));
          });
        }),
      )),
      map(() => this.assets),
    );
  }

  ...

  this.getAssets().subscribe(assets => console.log(assets)); // this runs before all asset's url has been resolved

const { from } = rxjs
const { mergeMap } = rxjs.operators

const assets = [1,2,3,4,5]

function getUrl (index) {
  return new Promise((res) => {
    setTimeout(() => res(`http://example.com/${index}`), Math.random() * 3 + 1000)
  })
}

// add param2 1 for mergeMap === concatMap
from(assets).pipe(
  mergeMap(asset => {
    return getUrl(asset)
  }, 1)
).subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.5/rxjs.umd.min.js"></script>

使用concatMap到运行一个接一个。

概览

mergeMap 不等待所有内部可观察值。它并行启动 n 个 运行 的内部可观察管道,并将所有值吐出管道底部的相同耦合(在本例中为您的订阅语句)作为单独的排放。因此,为什么 this.getAssets().subscribe(assets => console.log(assets)) 运行s 在所有并行内部 mergeMap 管道完成它们各自的计算之前,因为 mergeMap 不会在发射之前等待所有它们(它将在它们发出时一个一个地发射结束)。如果你想等待 n 个可观察的管道完成,那么你需要使用 forkJoin.


分叉加入

forkJoin 最适合当您有一组可观察对象并且只关心每个对象的最终发射值时。一个常见的用例是,如果您希望在页面加载(或其他事件)时发出多个请求,并且只想在收到所有响应后才采取行动。这样,它类似于您可能使用 Promise.all.

的方式

解决方案

getAssets(): Observable<Asset[]> {
  return this.authService.user$.pipe(
    // first() will deliver an EmptyError to the observer's error callback if the
    // observable completes before any next notification was sent. If you don't
    // want this behavior, use take(1) instead.
    first(),
    // Switch to users firebase asset stream.
    switchMap(user => {
      // You might have to tweak this part. I'm not exactly sure what 
      // listAll() returns. I guessed that it returns a promise with
      // firebase asset metadata.
      return from(this.afs.storage.ref(`${user.uid}/assets`).listAll());
    }),
    // Map to objects that contain method to get image url.
    map(firebaseAssetMetadata => firebaseAssetMetadata?.items ?? []),
    // Switch to parallel getDownloadUrl streams.
    switchMap(assets => {
      // Not an rxjs map, a regular list map. Returns a list of getAssetUrlPipes.
      const parallelGetAssetUrlPipes = assets.map(asset => {
        return from(asset.getDownloadUrl()).pipe(
          map(url => { name: asset.name, url })
        );
      });
      // 1) Listen to all parallel pipes.
      // 2) Wait until they've all completed.
      // 3) Merge all parallel data into a list.
      // 4) Then move list down the pipe.
      return forkJoin(parallelGetAssetUrlPipes);
    }),
    // Outputs all parallel pipe data as a single emission in list form.
    // Set local variable to users asset data.
    tap(assetObjects => this.assets = assetObjects)
  );
}

// Outputs the list of user asset data.
this.getAssets().subscribe(console.log);

祝你好运,享受你的瑞典肉丸!