RxJS 重试整个链
RxJS retry entire chain
我从实时流中读取图像并定期 select 批量读取图像。
然后我将它们发送到服务器进行验证。如果任何验证失败,将抛出 HTTP 错误。如果发生这种情况,我想获得一批新的图像。
this.input.getImages()
.throttleTime(500)
.switchMap(image =>
new Observable<{}>(observer => {
// Some operation
})
.map(i => ({ image, i }))
).filter(({ i }) => {
// some filtering
})
.map(({ image }) => image)
.take(6)
.bufferCount(6)
.map(images => // switch map??
Observable.fromPromise(this.server.validate(images))
)
.retry(2) // This only retrys the request, I want it to retry the whole chain (to get valid images)
.subscribe(images => {
console.log('All done')
},
err => {console.log(err)}
)
我遇到的问题是只有 HTTP 请求被重试,因为这是新的可观察对象。必须有某种方法将链的开头封装到单个 Observable 中吗?
参见learnrxjs - retry。该示例显示了在抛出错误时从源开始重新启动的所有内容。
该页面显示 pipe
语法,但如果您愿意,JSBin 会显示流畅的运算符语法。
基本模式是
const retryMe = this.input.getImages()
.flatMap(val => {
Observable.of(val)
// more operators
})
.retry(2);
简单的方法是将复杂的 Observable 包装在 defer 中,然后对生成的 Observable 使用重试。
我从实时流中读取图像并定期 select 批量读取图像。 然后我将它们发送到服务器进行验证。如果任何验证失败,将抛出 HTTP 错误。如果发生这种情况,我想获得一批新的图像。
this.input.getImages()
.throttleTime(500)
.switchMap(image =>
new Observable<{}>(observer => {
// Some operation
})
.map(i => ({ image, i }))
).filter(({ i }) => {
// some filtering
})
.map(({ image }) => image)
.take(6)
.bufferCount(6)
.map(images => // switch map??
Observable.fromPromise(this.server.validate(images))
)
.retry(2) // This only retrys the request, I want it to retry the whole chain (to get valid images)
.subscribe(images => {
console.log('All done')
},
err => {console.log(err)}
)
我遇到的问题是只有 HTTP 请求被重试,因为这是新的可观察对象。必须有某种方法将链的开头封装到单个 Observable 中吗?
参见learnrxjs - retry。该示例显示了在抛出错误时从源开始重新启动的所有内容。
该页面显示 pipe
语法,但如果您愿意,JSBin 会显示流畅的运算符语法。
基本模式是
const retryMe = this.input.getImages()
.flatMap(val => {
Observable.of(val)
// more operators
})
.retry(2);
简单的方法是将复杂的 Observable 包装在 defer 中,然后对生成的 Observable 使用重试。