Rx.Observable 验证流中的项目(通过或抛出)
Rx.Observable validate items in stream (pass or throw)
给定 Observable 中的流,我想 validate/check 每个项目。万一一个坏了我想通过Observable.throw抛出一个错误,因此中断所有进一步的处理。
我笨拙的解决方案是
import * as Rx from 'rxjs'
inputStream.mergeMap(item => (isValid(item))
? Rx.Observable.of(item)
: Rx.Observable.throw(new Error("not valid"))
)
这看起来很丑陋,因为它为正向流构建了一堆不必要的 Observables。
是否有更好的方法来检查 Observable 中的项目?
您可以只使用普通 map
并在其中抛出异常:
inputStream.map(item => {
if (isValid(item)) {
return item;
}
throw new Error("not valid");
})
如果我很好地理解你的问题,你可以使用 takeWhile
运算符来做到这一点。例如;
yourObservable.takeWhile(item => {
//your condition
}).subscribe(i => console.log(i));
当您的表达式为真时,它只取值。当它变为假时,它停止。
给定 Observable 中的流,我想 validate/check 每个项目。万一一个坏了我想通过Observable.throw抛出一个错误,因此中断所有进一步的处理。
我笨拙的解决方案是
import * as Rx from 'rxjs'
inputStream.mergeMap(item => (isValid(item))
? Rx.Observable.of(item)
: Rx.Observable.throw(new Error("not valid"))
)
这看起来很丑陋,因为它为正向流构建了一堆不必要的 Observables。
是否有更好的方法来检查 Observable 中的项目?
您可以只使用普通 map
并在其中抛出异常:
inputStream.map(item => {
if (isValid(item)) {
return item;
}
throw new Error("not valid");
})
如果我很好地理解你的问题,你可以使用 takeWhile
运算符来做到这一点。例如;
yourObservable.takeWhile(item => {
//your condition
}).subscribe(i => console.log(i));
当您的表达式为真时,它只取值。当它变为假时,它停止。