跳过无限流中的错误
Skip errors in the infinite Stream
我有一个无限的fs2.Stream
,可能会遇到错误。我想什么都不做(可能记录)来跳过这些错误,并继续流式传输更多元素。示例:
//An example
val stream = fs2.Stream
.awakeEvery[IO](1.second)
.evalMap(_ => IO.raiseError(new RuntimeException))
在这种特定情况下,我希望每秒发射 Left(new RuntimeException)
的无限 fs2.Stream
。
有一个 Stream.attempt
方法生成在遇到第一个错误后终止的流。有没有办法跳过错误并继续提取更多元素?
IO.raiseError(new RuntimeException).attempt
通常不起作用,因为它需要在流管道组合的所有位置尝试所有效果。
无法按照您描述的方式处理错误。
当流遇到第一个错误时,它会终止。请检查此 gitter question.
您可以通过两种方式处理:
尝试一下效果(但是你已经说过在你的情况下是不可能的)。
流终止后重新启动:
val stream: Stream[IO, Either[Throwable, Unit]] = Stream
.awakeEvery[IO](1.second)
.evalMap(_ => IO.raiseError(new RuntimeException))
.handleErrorWith(t => Stream(Left(t)) ++ stream) //put Left to the stream and restart it
//since stream will infinetelly restart I take only 3 first values
println(stream.take(3).compile.toList.unsafeRunSync())
我有一个无限的fs2.Stream
,可能会遇到错误。我想什么都不做(可能记录)来跳过这些错误,并继续流式传输更多元素。示例:
//An example
val stream = fs2.Stream
.awakeEvery[IO](1.second)
.evalMap(_ => IO.raiseError(new RuntimeException))
在这种特定情况下,我希望每秒发射 Left(new RuntimeException)
的无限 fs2.Stream
。
有一个 Stream.attempt
方法生成在遇到第一个错误后终止的流。有没有办法跳过错误并继续提取更多元素?
IO.raiseError(new RuntimeException).attempt
通常不起作用,因为它需要在流管道组合的所有位置尝试所有效果。
无法按照您描述的方式处理错误。 当流遇到第一个错误时,它会终止。请检查此 gitter question.
您可以通过两种方式处理:
尝试一下效果(但是你已经说过在你的情况下是不可能的)。
流终止后重新启动:
val stream: Stream[IO, Either[Throwable, Unit]] = Stream
.awakeEvery[IO](1.second)
.evalMap(_ => IO.raiseError(new RuntimeException))
.handleErrorWith(t => Stream(Left(t)) ++ stream) //put Left to the stream and restart it
//since stream will infinetelly restart I take only 3 first values
println(stream.take(3).compile.toList.unsafeRunSync())