FS2 Stream 异常处理不起作用
FS2 Stream exception handling not working
我在 FS2 和异常处理方面遇到问题。我想要的是,给定一个 Stream[IO,A]
,当我使用可以抛出异常的 f: A => B
映射到它时,我得到一个 Stream[IO,Either[Throwable,B]]
。
我尝试了以下方法,它按预期工作:
import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt
x1.compile.toVector.unsafeRunSync().foreach(println)
它打印:
Right(1)
Right(4)
Left(java.lang.RuntimeException: I don't like 9s)
但是,当我尝试用它做任何事情时,我的问题就开始了 Stream
。
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt.map(identity)
x1.compile.toVector.unsafeRunSync().foreach(println)
炸毁异常并终止应用程序:
java.lang.RuntimeException: I don't like 9s
at swaps.fm.A$A32$A$A32.$anonfun$x1(tmp2.sc:7)
at scala.runtime.java8.JFunction1$mcII$sp.apply(tmp2.sc:8)
...
更奇怪的是,使用 take
让 Stream
return 只有我知道没问题的元素,仍然以同样的方式爆炸:
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt.take(2)
x1.compile.toVector.unsafeRunSync().foreach(println)
任何人都可以解释为什么会这样吗?这是错误还是(非)预期的行为?
N.B. 此行为存在于 FS2 0.10.0-M7 和 0.10 中。 0
看来问题出在这里:
self
.stage(depth.increment, defer, o => emit(f(o)), os => {
var i = 0; while (i < os.size) { emit(f(os(i))); i += 1; }
这是Segment.map
里面的代码。当您使用以下方式分配向量时:
Stream.emits(Vector(1,2,3,4))
fs2 将分配单个段。看上面map
的代码,os.size
表示段的大小,也就是说,map
总是会映射整个段的大小。这意味着即使您问过 take(2)
,我们仍然有效地绘制了整个片段。
我们可以通过稍微更改代码来证明这一点:
def main(args: Array[String]): Unit = {
val x1 = fs2.Stream
.emits(Vector(1, 2, 3, 4))
.segmentLimit(1)
.covary[IO]
.map { seg =>
if (seg.sum.force.run > 3) throw new RuntimeException("I don't like 9s")
else seg
}
.attempt
.take(2)
println(x1.compile.toVector.unsafeRunSync())
这里的重要部分是 segmentLimit
,它使流将内部流动的数据分块为大小为 1 的段。当我们 运行 这段代码时,我们得到:
Vector(Right(Chunk(1)), Right(Chunk(2)))
这是否是错误?不确定。我会在 Gitter 频道上咨询维护人员。
这里的问题是要使用 fs2
你必须编写纯代码。抛出异常不是纯粹的,因此如果您希望管道中的某个步骤可能失败,则需要将其明确化。这里有两种方式:
import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) Left[Throwable, Int](new RuntimeException("I don't like 9s")) else Right(i)}
x1.compile.toVector.unsafeRunSync().foreach(println)
// Explicit Left annotation is so you can .rethrow if desired; it can be omitted or added later with .widen
或
import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.flatMap { i => if(i == 9) Stream.raiseError(new RuntimeException("I don't like 9s")) else Stream.emit(i) }
.attempt
x1.compile.toVector.unsafeRunSync().foreach(println)
其中第一个是可取的,因为 flatMap
和 emit
将导致 size-1 块,通常效率较低。如果您想在第一个错误时停止处理,请在流的末尾添加一个 .rethrow
。
我在 FS2 和异常处理方面遇到问题。我想要的是,给定一个 Stream[IO,A]
,当我使用可以抛出异常的 f: A => B
映射到它时,我得到一个 Stream[IO,Either[Throwable,B]]
。
我尝试了以下方法,它按预期工作:
import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt
x1.compile.toVector.unsafeRunSync().foreach(println)
它打印:
Right(1)
Right(4)
Left(java.lang.RuntimeException: I don't like 9s)
但是,当我尝试用它做任何事情时,我的问题就开始了 Stream
。
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt.map(identity)
x1.compile.toVector.unsafeRunSync().foreach(println)
炸毁异常并终止应用程序:
java.lang.RuntimeException: I don't like 9s
at swaps.fm.A$A32$A$A32.$anonfun$x1(tmp2.sc:7)
at scala.runtime.java8.JFunction1$mcII$sp.apply(tmp2.sc:8)
...
更奇怪的是,使用 take
让 Stream
return 只有我知道没问题的元素,仍然以同样的方式爆炸:
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt.take(2)
x1.compile.toVector.unsafeRunSync().foreach(println)
任何人都可以解释为什么会这样吗?这是错误还是(非)预期的行为?
N.B. 此行为存在于 FS2 0.10.0-M7 和 0.10 中。 0
看来问题出在这里:
self
.stage(depth.increment, defer, o => emit(f(o)), os => {
var i = 0; while (i < os.size) { emit(f(os(i))); i += 1; }
这是Segment.map
里面的代码。当您使用以下方式分配向量时:
Stream.emits(Vector(1,2,3,4))
fs2 将分配单个段。看上面map
的代码,os.size
表示段的大小,也就是说,map
总是会映射整个段的大小。这意味着即使您问过 take(2)
,我们仍然有效地绘制了整个片段。
我们可以通过稍微更改代码来证明这一点:
def main(args: Array[String]): Unit = {
val x1 = fs2.Stream
.emits(Vector(1, 2, 3, 4))
.segmentLimit(1)
.covary[IO]
.map { seg =>
if (seg.sum.force.run > 3) throw new RuntimeException("I don't like 9s")
else seg
}
.attempt
.take(2)
println(x1.compile.toVector.unsafeRunSync())
这里的重要部分是 segmentLimit
,它使流将内部流动的数据分块为大小为 1 的段。当我们 运行 这段代码时,我们得到:
Vector(Right(Chunk(1)), Right(Chunk(2)))
这是否是错误?不确定。我会在 Gitter 频道上咨询维护人员。
这里的问题是要使用 fs2
你必须编写纯代码。抛出异常不是纯粹的,因此如果您希望管道中的某个步骤可能失败,则需要将其明确化。这里有两种方式:
import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) Left[Throwable, Int](new RuntimeException("I don't like 9s")) else Right(i)}
x1.compile.toVector.unsafeRunSync().foreach(println)
// Explicit Left annotation is so you can .rethrow if desired; it can be omitted or added later with .widen
或
import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.flatMap { i => if(i == 9) Stream.raiseError(new RuntimeException("I don't like 9s")) else Stream.emit(i) }
.attempt
x1.compile.toVector.unsafeRunSync().foreach(println)
其中第一个是可取的,因为 flatMap
和 emit
将导致 size-1 块,通常效率较低。如果您想在第一个错误时停止处理,请在流的末尾添加一个 .rethrow
。