FS2 through2 在第一个流完成时关闭所有资源?

FS2 through2 closing all resources when the first stream is finished?

假设我们有 2 个 fs2 流:

val stream1 = fs2.Stream.bracket(IO { println("Acquire 1"); 2})(_ => IO { println("Release 1") })
  .flatMap(p => fs2.Stream.range(1,p))

val stream2 = fs2.Stream.bracket(IO { println("Acquire 2"); 4})(_ => IO { println("Release 2") })
  .flatMap(p => fs2.Stream.range(1,p))

我想互相联系:

def connect[F[_]]: (fs2.Stream[F, Int], fs2.Stream[F, Int]) => fs2.Stream[F, Int] = {
 def go(stream1: fs2.Stream[F, Int], stream2: fs2.Stream[F, Int]): Pull[F, Int, Unit] =
  stream1.pull.uncons1.flatMap { stream1Element =>
    stream2.pull.uncons1.flatMap { stream2Element =>
      (stream1Element, stream2Element) match {
        case (Some((stream1Head, stream1Tail)), Some((stream2Head, stream2Tail))) =>
          println("Some, Some")
          Pull.output1(stream1Head + stream2Head) >> go(stream1Tail, stream2Tail)
        case (Some((stream1Head, stream1Tail)), None) =>
          println("1 Stream still available")
          Pull.output1(stream1Head) >> go(fs2.Stream.empty, stream1Tail)
        case (None, Some((stream2Head, stream2Tail))) =>
          println("2 Stream still available")
          Pull.output1(stream2Head) >> go(fs2.Stream.empty, stream2Tail)
        case _ => Pull.output1(-1)
      }
    }
  }
(one, two) => go(one, two).stream

}

正在检查我看到的日志:

Acquire 1
Acquire 2
Some, Some
Release 2
Release 1
2 Stream still available
2 Stream still available

这让我有点惊讶,因为似乎一旦第一个 Stream 完成,第二个 Stream 的资源也将关闭。假设现在资源是与数据库的连接,那么第二个流中的元素将无法再获取。

这是正确的行为吗?有没有办法避免关闭第二个流的资源?令人惊讶的是,如果第一个 Stream 的元素多于第二个,一切都会按预期工作(当第二个流完成时,stream 1 的资源未关闭)

通过检查 zipAllWith 函数的实现,我发现在这种情况下确实应该避免使用 uncons1。最终的解决方案是使用 stepLeg 函数而不是 uncons1。所以上面的函数应该是这样的:

def connect[F[_]]: (fs2.Stream[F, Int], fs2.Stream[F, Int]) => fs2.Stream[F, Int] = {
def go(stream1: fs2.Stream[F, Int], stream2: fs2.Stream[F, Int]): Pull[F, Int, Unit] =
  stream1.pull.stepLeg.flatMap { stream1Element =>
    stream2.pull.stepLeg.flatMap { stream2Element =>
      (stream1Element, stream2Element) match {
        case (Some(sl1), Some(sl2)) =>
          println("Some, Some")
          val one = sl1.head(0)
          val two = sl2.head(0)
          Pull.output1(one + two) >> go(sl1.stream, sl2.stream)
        case (Some(sl1), None) =>
          val one = sl1.head(0)
          println("1 Stream still available")
          Pull.output1(one) >> go(sl1.stream, fs2.Stream.empty)
        case (None, Some(sl2)) =>
          val two = sl2.head(0)
          println("2 Stream still available")
          Pull.output1(two) >> go(fs2.Stream.empty, sl2.stream)
        case _ => Pull.output1(-1)
      }
    }
  }
(one, two) => {
  go(one.flatMap(fs2.Stream.emit), two.flatMap(fs2.Stream.emit)).stream
}
}

和日志:

Acquire 1
Acquire 2
Some, Some
Release 1
2 Stream still available
2 Stream still available
Release 2

可以在此处找到此问题的其他示例: uncons vs stepLeg