解决可观察对象内的多个承诺不起作用
Resolving multiple promises inside an observable not working
我正在使用 Firebase 存储,并且我正在尝试通过函数调用加载所有资产。获得资产 url 的唯一方法是调用 getDownloadURL
其中 returns 一个承诺。我需要为每个资产调用它,但由于某种原因我不能让它等待所有承诺完成后再继续。
我认为从 mergeMap
返回一个承诺会让它等待所有的人,但事实似乎并非如此。
我查看了很多关于 promises 和 RXJS 的问题,但我似乎无法弄清楚代码有什么问题。
getAssets() {
return this.authService.user$.pipe(
first(),
switchMap(user => defer(() => from(this.afs.storage.ref(`${user.uid}/assets`).listAll()))),
switchMap(assets => from(assets.items).pipe(
mergeMap(async (asset) => {
return new Promise((res, rej) => {
asset.getDownloadURL().then(url => {
const _asset = {
name: asset.name,
url,
};
this.assets.push(_asset);
res(_asset);
})
.catch((e) => rej(e));
});
}),
)),
map(() => this.assets),
);
}
...
this.getAssets().subscribe(assets => console.log(assets)); // this runs before all asset's url has been resolved
const { from } = rxjs
const { mergeMap } = rxjs.operators
const assets = [1,2,3,4,5]
function getUrl (index) {
return new Promise((res) => {
setTimeout(() => res(`http://example.com/${index}`), Math.random() * 3 + 1000)
})
}
// add param2 1 for mergeMap === concatMap
from(assets).pipe(
mergeMap(asset => {
return getUrl(asset)
}, 1)
).subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.5/rxjs.umd.min.js"></script>
使用concatMap
到运行一个接一个。
概览
mergeMap
不等待所有内部可观察值。它并行启动 n 个 运行 的内部可观察管道,并将所有值吐出管道底部的相同耦合(在本例中为您的订阅语句)作为单独的排放。因此,为什么 this.getAssets().subscribe(assets => console.log(assets))
运行s 在所有并行内部 mergeMap
管道完成它们各自的计算之前,因为 mergeMap 不会在发射之前等待所有它们(它将在它们发出时一个一个地发射结束)。如果你想等待 n 个可观察的管道完成,那么你需要使用 forkJoin
.
分叉加入
forkJoin
最适合当您有一组可观察对象并且只关心每个对象的最终发射值时。一个常见的用例是,如果您希望在页面加载(或其他事件)时发出多个请求,并且只想在收到所有响应后才采取行动。这样,它类似于您可能使用 Promise.all.
的方式
解决方案
getAssets(): Observable<Asset[]> {
return this.authService.user$.pipe(
// first() will deliver an EmptyError to the observer's error callback if the
// observable completes before any next notification was sent. If you don't
// want this behavior, use take(1) instead.
first(),
// Switch to users firebase asset stream.
switchMap(user => {
// You might have to tweak this part. I'm not exactly sure what
// listAll() returns. I guessed that it returns a promise with
// firebase asset metadata.
return from(this.afs.storage.ref(`${user.uid}/assets`).listAll());
}),
// Map to objects that contain method to get image url.
map(firebaseAssetMetadata => firebaseAssetMetadata?.items ?? []),
// Switch to parallel getDownloadUrl streams.
switchMap(assets => {
// Not an rxjs map, a regular list map. Returns a list of getAssetUrlPipes.
const parallelGetAssetUrlPipes = assets.map(asset => {
return from(asset.getDownloadUrl()).pipe(
map(url => { name: asset.name, url })
);
});
// 1) Listen to all parallel pipes.
// 2) Wait until they've all completed.
// 3) Merge all parallel data into a list.
// 4) Then move list down the pipe.
return forkJoin(parallelGetAssetUrlPipes);
}),
// Outputs all parallel pipe data as a single emission in list form.
// Set local variable to users asset data.
tap(assetObjects => this.assets = assetObjects)
);
}
// Outputs the list of user asset data.
this.getAssets().subscribe(console.log);
祝你好运,享受你的瑞典肉丸!
我正在使用 Firebase 存储,并且我正在尝试通过函数调用加载所有资产。获得资产 url 的唯一方法是调用 getDownloadURL
其中 returns 一个承诺。我需要为每个资产调用它,但由于某种原因我不能让它等待所有承诺完成后再继续。
我认为从 mergeMap
返回一个承诺会让它等待所有的人,但事实似乎并非如此。
我查看了很多关于 promises 和 RXJS 的问题,但我似乎无法弄清楚代码有什么问题。
getAssets() {
return this.authService.user$.pipe(
first(),
switchMap(user => defer(() => from(this.afs.storage.ref(`${user.uid}/assets`).listAll()))),
switchMap(assets => from(assets.items).pipe(
mergeMap(async (asset) => {
return new Promise((res, rej) => {
asset.getDownloadURL().then(url => {
const _asset = {
name: asset.name,
url,
};
this.assets.push(_asset);
res(_asset);
})
.catch((e) => rej(e));
});
}),
)),
map(() => this.assets),
);
}
...
this.getAssets().subscribe(assets => console.log(assets)); // this runs before all asset's url has been resolved
const { from } = rxjs
const { mergeMap } = rxjs.operators
const assets = [1,2,3,4,5]
function getUrl (index) {
return new Promise((res) => {
setTimeout(() => res(`http://example.com/${index}`), Math.random() * 3 + 1000)
})
}
// add param2 1 for mergeMap === concatMap
from(assets).pipe(
mergeMap(asset => {
return getUrl(asset)
}, 1)
).subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.5/rxjs.umd.min.js"></script>
使用concatMap
到运行一个接一个。
概览
mergeMap
不等待所有内部可观察值。它并行启动 n 个 运行 的内部可观察管道,并将所有值吐出管道底部的相同耦合(在本例中为您的订阅语句)作为单独的排放。因此,为什么 this.getAssets().subscribe(assets => console.log(assets))
运行s 在所有并行内部 mergeMap
管道完成它们各自的计算之前,因为 mergeMap 不会在发射之前等待所有它们(它将在它们发出时一个一个地发射结束)。如果你想等待 n 个可观察的管道完成,那么你需要使用 forkJoin
.
分叉加入
forkJoin
最适合当您有一组可观察对象并且只关心每个对象的最终发射值时。一个常见的用例是,如果您希望在页面加载(或其他事件)时发出多个请求,并且只想在收到所有响应后才采取行动。这样,它类似于您可能使用 Promise.all.
解决方案
getAssets(): Observable<Asset[]> {
return this.authService.user$.pipe(
// first() will deliver an EmptyError to the observer's error callback if the
// observable completes before any next notification was sent. If you don't
// want this behavior, use take(1) instead.
first(),
// Switch to users firebase asset stream.
switchMap(user => {
// You might have to tweak this part. I'm not exactly sure what
// listAll() returns. I guessed that it returns a promise with
// firebase asset metadata.
return from(this.afs.storage.ref(`${user.uid}/assets`).listAll());
}),
// Map to objects that contain method to get image url.
map(firebaseAssetMetadata => firebaseAssetMetadata?.items ?? []),
// Switch to parallel getDownloadUrl streams.
switchMap(assets => {
// Not an rxjs map, a regular list map. Returns a list of getAssetUrlPipes.
const parallelGetAssetUrlPipes = assets.map(asset => {
return from(asset.getDownloadUrl()).pipe(
map(url => { name: asset.name, url })
);
});
// 1) Listen to all parallel pipes.
// 2) Wait until they've all completed.
// 3) Merge all parallel data into a list.
// 4) Then move list down the pipe.
return forkJoin(parallelGetAssetUrlPipes);
}),
// Outputs all parallel pipe data as a single emission in list form.
// Set local variable to users asset data.
tap(assetObjects => this.assets = assetObjects)
);
}
// Outputs the list of user asset data.
this.getAssets().subscribe(console.log);
祝你好运,享受你的瑞典肉丸!