FS2 through2 在第一个流完成时关闭所有资源?
FS2 through2 closing all resources when the first stream is finished?
假设我们有 2 个 fs2 流:
val stream1 = fs2.Stream.bracket(IO { println("Acquire 1"); 2})(_ => IO { println("Release 1") })
.flatMap(p => fs2.Stream.range(1,p))
val stream2 = fs2.Stream.bracket(IO { println("Acquire 2"); 4})(_ => IO { println("Release 2") })
.flatMap(p => fs2.Stream.range(1,p))
我想互相联系:
def connect[F[_]]: (fs2.Stream[F, Int], fs2.Stream[F, Int]) => fs2.Stream[F, Int] = {
def go(stream1: fs2.Stream[F, Int], stream2: fs2.Stream[F, Int]): Pull[F, Int, Unit] =
stream1.pull.uncons1.flatMap { stream1Element =>
stream2.pull.uncons1.flatMap { stream2Element =>
(stream1Element, stream2Element) match {
case (Some((stream1Head, stream1Tail)), Some((stream2Head, stream2Tail))) =>
println("Some, Some")
Pull.output1(stream1Head + stream2Head) >> go(stream1Tail, stream2Tail)
case (Some((stream1Head, stream1Tail)), None) =>
println("1 Stream still available")
Pull.output1(stream1Head) >> go(fs2.Stream.empty, stream1Tail)
case (None, Some((stream2Head, stream2Tail))) =>
println("2 Stream still available")
Pull.output1(stream2Head) >> go(fs2.Stream.empty, stream2Tail)
case _ => Pull.output1(-1)
}
}
}
(one, two) => go(one, two).stream
}
正在检查我看到的日志:
Acquire 1
Acquire 2
Some, Some
Release 2
Release 1
2 Stream still available
2 Stream still available
这让我有点惊讶,因为似乎一旦第一个 Stream 完成,第二个 Stream 的资源也将关闭。假设现在资源是与数据库的连接,那么第二个流中的元素将无法再获取。
这是正确的行为吗?有没有办法避免关闭第二个流的资源?令人惊讶的是,如果第一个 Stream 的元素多于第二个,一切都会按预期工作(当第二个流完成时,stream 1 的资源未关闭)
通过检查 zipAllWith 函数的实现,我发现在这种情况下确实应该避免使用 uncons1。最终的解决方案是使用 stepLeg 函数而不是 uncons1。所以上面的函数应该是这样的:
def connect[F[_]]: (fs2.Stream[F, Int], fs2.Stream[F, Int]) => fs2.Stream[F, Int] = {
def go(stream1: fs2.Stream[F, Int], stream2: fs2.Stream[F, Int]): Pull[F, Int, Unit] =
stream1.pull.stepLeg.flatMap { stream1Element =>
stream2.pull.stepLeg.flatMap { stream2Element =>
(stream1Element, stream2Element) match {
case (Some(sl1), Some(sl2)) =>
println("Some, Some")
val one = sl1.head(0)
val two = sl2.head(0)
Pull.output1(one + two) >> go(sl1.stream, sl2.stream)
case (Some(sl1), None) =>
val one = sl1.head(0)
println("1 Stream still available")
Pull.output1(one) >> go(sl1.stream, fs2.Stream.empty)
case (None, Some(sl2)) =>
val two = sl2.head(0)
println("2 Stream still available")
Pull.output1(two) >> go(fs2.Stream.empty, sl2.stream)
case _ => Pull.output1(-1)
}
}
}
(one, two) => {
go(one.flatMap(fs2.Stream.emit), two.flatMap(fs2.Stream.emit)).stream
}
}
和日志:
Acquire 1
Acquire 2
Some, Some
Release 1
2 Stream still available
2 Stream still available
Release 2
可以在此处找到此问题的其他示例:
uncons vs stepLeg
假设我们有 2 个 fs2 流:
val stream1 = fs2.Stream.bracket(IO { println("Acquire 1"); 2})(_ => IO { println("Release 1") })
.flatMap(p => fs2.Stream.range(1,p))
val stream2 = fs2.Stream.bracket(IO { println("Acquire 2"); 4})(_ => IO { println("Release 2") })
.flatMap(p => fs2.Stream.range(1,p))
我想互相联系:
def connect[F[_]]: (fs2.Stream[F, Int], fs2.Stream[F, Int]) => fs2.Stream[F, Int] = {
def go(stream1: fs2.Stream[F, Int], stream2: fs2.Stream[F, Int]): Pull[F, Int, Unit] =
stream1.pull.uncons1.flatMap { stream1Element =>
stream2.pull.uncons1.flatMap { stream2Element =>
(stream1Element, stream2Element) match {
case (Some((stream1Head, stream1Tail)), Some((stream2Head, stream2Tail))) =>
println("Some, Some")
Pull.output1(stream1Head + stream2Head) >> go(stream1Tail, stream2Tail)
case (Some((stream1Head, stream1Tail)), None) =>
println("1 Stream still available")
Pull.output1(stream1Head) >> go(fs2.Stream.empty, stream1Tail)
case (None, Some((stream2Head, stream2Tail))) =>
println("2 Stream still available")
Pull.output1(stream2Head) >> go(fs2.Stream.empty, stream2Tail)
case _ => Pull.output1(-1)
}
}
}
(one, two) => go(one, two).stream
}
正在检查我看到的日志:
Acquire 1
Acquire 2
Some, Some
Release 2
Release 1
2 Stream still available
2 Stream still available
这让我有点惊讶,因为似乎一旦第一个 Stream 完成,第二个 Stream 的资源也将关闭。假设现在资源是与数据库的连接,那么第二个流中的元素将无法再获取。
这是正确的行为吗?有没有办法避免关闭第二个流的资源?令人惊讶的是,如果第一个 Stream 的元素多于第二个,一切都会按预期工作(当第二个流完成时,stream 1 的资源未关闭)
通过检查 zipAllWith 函数的实现,我发现在这种情况下确实应该避免使用 uncons1。最终的解决方案是使用 stepLeg 函数而不是 uncons1。所以上面的函数应该是这样的:
def connect[F[_]]: (fs2.Stream[F, Int], fs2.Stream[F, Int]) => fs2.Stream[F, Int] = {
def go(stream1: fs2.Stream[F, Int], stream2: fs2.Stream[F, Int]): Pull[F, Int, Unit] =
stream1.pull.stepLeg.flatMap { stream1Element =>
stream2.pull.stepLeg.flatMap { stream2Element =>
(stream1Element, stream2Element) match {
case (Some(sl1), Some(sl2)) =>
println("Some, Some")
val one = sl1.head(0)
val two = sl2.head(0)
Pull.output1(one + two) >> go(sl1.stream, sl2.stream)
case (Some(sl1), None) =>
val one = sl1.head(0)
println("1 Stream still available")
Pull.output1(one) >> go(sl1.stream, fs2.Stream.empty)
case (None, Some(sl2)) =>
val two = sl2.head(0)
println("2 Stream still available")
Pull.output1(two) >> go(fs2.Stream.empty, sl2.stream)
case _ => Pull.output1(-1)
}
}
}
(one, two) => {
go(one.flatMap(fs2.Stream.emit), two.flatMap(fs2.Stream.emit)).stream
}
}
和日志:
Acquire 1
Acquire 2
Some, Some
Release 1
2 Stream still available
2 Stream still available
Release 2
可以在此处找到此问题的其他示例: uncons vs stepLeg