继续使用 mergeMap 在 RxJs pipeable 中出错
Continue on error in RxJs pipeable with mergeMap
我正在使用 RxJs 管道和 mergeMap 运算符进行一些并行 HTTP 获取。
在第一个请求失败时(假设 /urlnotexists 抛出 404 错误)它会停止所有其他请求。
我希望它继续查询所有剩余的 url,而不是为这个失败的请求调用所有剩余的 mergeMap。
我尝试使用 RxJs 中的 throwError 和 catchError 但没有成功。
index.js
const { from } = require('rxjs');
const { mergeMap, scan } = require('rxjs/operators');
const request = {
get: url => {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (url === '/urlnotexists') { return reject(new Error(url)); }
return resolve(url);
}, 1000);
});
}
};
(async function() {
await from([
'/urlexists',
'/urlnotexists',
'/urlexists2',
'/urlexists3',
])
.pipe(
mergeMap(async url => {
try {
console.log('mergeMap 1:', url);
const val = await request.get(url);
return val;
} catch(err) {
console.log('err:', err.message);
// a throw here prevent all remaining request.get() to be tried
}
}),
mergeMap(async val => {
// should not pass here if previous request.get() failed
console.log('mergeMap 2:', val);
return val;
}),
scan((acc, val) => {
// should not pass here if previous request.get() failed
acc.push(val);
return acc;
}, []),
)
.toPromise()
.then(merged => {
// should have merged /urlexists, /urlexists2 and /urlexists3
// even if /urlnotexists failed
console.log('merged:', merged);
})
.catch(err => {
console.log('catched err:', err);
});
})();
$ node index.js
mergeMap 1: /urlexists
mergeMap 1: /urlnotexists
mergeMap 1: /urlexists2
mergeMap 1: /urlexists3
err: /urlnotexists
mergeMap 2: /urlexists
mergeMap 2: undefined <- I didn't wanted this mergeMap to have been called
mergeMap 2: /urlexists2
mergeMap 2: /urlexists3
merged: [ '/urlexists', undefined, '/urlexists2', '/urlexists3' ]
我希望发出并发 GET 请求并在最后减少一个对象中它们各自的值。
但如果出现错误,我希望他们不要打断我的管道,而是记录它们。
有什么建议吗?
如果你愿意放弃 RXJS 而只用 async/await 解决,那就非常简单了:
const urls = ['/urlexists', '/urlnotexists', '/urlexists2', '/urlexists3']
const promises = urls.map(url => request(url)
const resolved = await Promise.allSettled(promises)
// print out errors
resolved.forEach((r, i) => {
if (r.status === "rejected') {
console.log(`${urls[i]} failed: ${r.reason})
}
})
// get the success results
const merged = resolved.filter(r => r.status === "resolved").map(r => r.value)
console.log('merged', merged)
这利用了Promise.allSettled proposed helper method. If your environment does not have this method, you can implement it as shown in .
如果你想使用 RxJS,你应该在使用 forkJoin
.
并发执行所有请求之前,使用 catchError
添加错误处理和任何其他任务到单个请求
const { of, from, forkJoin } = rxjs;
const { catchError, tap } = rxjs.operators;
// your promise factory, unchanged (just shorter)
const request = {
get: url => {
return new Promise((resolve, reject) => setTimeout(
() => url === '/urlnotexists' ? reject(new Error(url)) : resolve(url), 1000
));
}
};
// a single rxjs request with error handling
const fetch$ = url => {
console.log('before:', url);
return from(request.get(url)).pipe(
// add any additional operator that should be executed for each request here
tap(val => console.log('after:', val)),
catchError(error => {
console.log('err:', error.message);
return of(undefined);
})
);
};
// concurrently executed rxjs requests
forkJoin(["/urlexists", "/urlnotexists", "/urlexists2", "/urlexists3"].map(fetch$))
.subscribe(merged => console.log("merged:", merged));
<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>
我正在使用 RxJs 管道和 mergeMap 运算符进行一些并行 HTTP 获取。
在第一个请求失败时(假设 /urlnotexists 抛出 404 错误)它会停止所有其他请求。
我希望它继续查询所有剩余的 url,而不是为这个失败的请求调用所有剩余的 mergeMap。
我尝试使用 RxJs 中的 throwError 和 catchError 但没有成功。
index.js
const { from } = require('rxjs');
const { mergeMap, scan } = require('rxjs/operators');
const request = {
get: url => {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (url === '/urlnotexists') { return reject(new Error(url)); }
return resolve(url);
}, 1000);
});
}
};
(async function() {
await from([
'/urlexists',
'/urlnotexists',
'/urlexists2',
'/urlexists3',
])
.pipe(
mergeMap(async url => {
try {
console.log('mergeMap 1:', url);
const val = await request.get(url);
return val;
} catch(err) {
console.log('err:', err.message);
// a throw here prevent all remaining request.get() to be tried
}
}),
mergeMap(async val => {
// should not pass here if previous request.get() failed
console.log('mergeMap 2:', val);
return val;
}),
scan((acc, val) => {
// should not pass here if previous request.get() failed
acc.push(val);
return acc;
}, []),
)
.toPromise()
.then(merged => {
// should have merged /urlexists, /urlexists2 and /urlexists3
// even if /urlnotexists failed
console.log('merged:', merged);
})
.catch(err => {
console.log('catched err:', err);
});
})();
$ node index.js
mergeMap 1: /urlexists
mergeMap 1: /urlnotexists
mergeMap 1: /urlexists2
mergeMap 1: /urlexists3
err: /urlnotexists
mergeMap 2: /urlexists
mergeMap 2: undefined <- I didn't wanted this mergeMap to have been called
mergeMap 2: /urlexists2
mergeMap 2: /urlexists3
merged: [ '/urlexists', undefined, '/urlexists2', '/urlexists3' ]
我希望发出并发 GET 请求并在最后减少一个对象中它们各自的值。
但如果出现错误,我希望他们不要打断我的管道,而是记录它们。
有什么建议吗?
如果你愿意放弃 RXJS 而只用 async/await 解决,那就非常简单了:
const urls = ['/urlexists', '/urlnotexists', '/urlexists2', '/urlexists3']
const promises = urls.map(url => request(url)
const resolved = await Promise.allSettled(promises)
// print out errors
resolved.forEach((r, i) => {
if (r.status === "rejected') {
console.log(`${urls[i]} failed: ${r.reason})
}
})
// get the success results
const merged = resolved.filter(r => r.status === "resolved").map(r => r.value)
console.log('merged', merged)
这利用了Promise.allSettled proposed helper method. If your environment does not have this method, you can implement it as shown in
如果你想使用 RxJS,你应该在使用 forkJoin
.
catchError
添加错误处理和任何其他任务到单个请求
const { of, from, forkJoin } = rxjs;
const { catchError, tap } = rxjs.operators;
// your promise factory, unchanged (just shorter)
const request = {
get: url => {
return new Promise((resolve, reject) => setTimeout(
() => url === '/urlnotexists' ? reject(new Error(url)) : resolve(url), 1000
));
}
};
// a single rxjs request with error handling
const fetch$ = url => {
console.log('before:', url);
return from(request.get(url)).pipe(
// add any additional operator that should be executed for each request here
tap(val => console.log('after:', val)),
catchError(error => {
console.log('err:', error.message);
return of(undefined);
})
);
};
// concurrently executed rxjs requests
forkJoin(["/urlexists", "/urlnotexists", "/urlexists2", "/urlexists3"].map(fetch$))
.subscribe(merged => console.log("merged:", merged));
<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>