Rx.Observable 验证流中的项目(通过或抛出)

Rx.Observable validate items in stream (pass or throw)

给定 Observable 中的流,我想 validate/check 每个项目。万一一个坏了我想通过Observable.throw抛出一个错误,因此中断所有进一步的处理。

我笨拙的解决方案是

import * as Rx from 'rxjs'

inputStream.mergeMap(item => (isValid(item))
    ? Rx.Observable.of(item)
    : Rx.Observable.throw(new Error("not valid"))
)

这看起来很丑陋,因为它为正向流构建了一堆不必要的 Observables。

是否有更好的方法来检查 Observable 中的项目?

您可以只使用普通 map 并在其中抛出异常:

inputStream.map(item => {
  if (isValid(item)) {
    return item;
  }
  throw new Error("not valid");
})

如果我很好地理解你的问题,你可以使用 takeWhile 运算符来做到这一点。例如;

yourObservable.takeWhile(item => {
     //your condition
}).subscribe(i => console.log(i));

当您的表达式为真时,它只取值。当它变为假时,它停止。

您可以从 here. Also you can check this page 中了解更多信息。希望对您有所帮助!