fs2.Stream 挂了两次
fs2.Stream hangs on taking twice
问题:
我想重复从一些第三方库提供的 fs2.Stream
中获取一些批次,因此将客户端从 fs2.Stream
本身中抽象出来,并简单地给它们 F[List[Int]]
个批次作为一旦他们准备好了。
尝试次数:
我尝试使用 fs2.Stream::take
和 运行 一些示例。
我.
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val r = for {
queue <- fs2.concurrent.Queue.unbounded[IO, Int]
stream = queue.dequeue
_ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
_ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst))).iterateWhile(_.nonEmpty)
} yield ()
r.unsafeRunSync()
它打印第一批 List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
然后挂起。我预计从 0
到 1000
的所有批次都将被打印。
这里让事情简单一点是
II.
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val r = for {
queue <- fs2.concurrent.Queue.unbounded[IO, Int]
stream = queue.dequeue
_ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
_ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst)))
_ <- stream.take(20).compile.toList.flatTap(lst => IO(println(lst)))
} yield ()
r.unsafeRunSync()
行为与 I 完全相同。打印 List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
然后挂起。
问题:
给定一个 fs2.Stream[IO, Int]
如何提供一个效果 IO[List[Int]]
在评估时迭代流提供的连续批次?
嗯,你不能有代表多个批次的 IO[List[X]]
,IO
将是一个批次。
你能做的最好的事情是这样的:
def processByBatches(process: List[Int] => IO[Unit]): IO[Unit]
也就是说,你的用户会给你一个操作来为每个批处理执行,你会给他们一个 IO
将阻塞当前光纤,直到使用该函数消耗整个流。
实现这种功能的最简单方法是:
def processByBatches(process: List[Int] => IO[Unit]): IO[Unit] =
getStreamFromThirdParty
.chunkN(n = ChunkSize)
.evalMap(chunk => process(chunk.toList))
.compile
.drain
问题:
我想重复从一些第三方库提供的 fs2.Stream
中获取一些批次,因此将客户端从 fs2.Stream
本身中抽象出来,并简单地给它们 F[List[Int]]
个批次作为一旦他们准备好了。
尝试次数:
我尝试使用 fs2.Stream::take
和 运行 一些示例。
我.
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val r = for {
queue <- fs2.concurrent.Queue.unbounded[IO, Int]
stream = queue.dequeue
_ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
_ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst))).iterateWhile(_.nonEmpty)
} yield ()
r.unsafeRunSync()
它打印第一批 List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
然后挂起。我预计从 0
到 1000
的所有批次都将被打印。
这里让事情简单一点是
II.
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val r = for {
queue <- fs2.concurrent.Queue.unbounded[IO, Int]
stream = queue.dequeue
_ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
_ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst)))
_ <- stream.take(20).compile.toList.flatTap(lst => IO(println(lst)))
} yield ()
r.unsafeRunSync()
行为与 I 完全相同。打印 List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
然后挂起。
问题:
给定一个 fs2.Stream[IO, Int]
如何提供一个效果 IO[List[Int]]
在评估时迭代流提供的连续批次?
嗯,你不能有代表多个批次的 IO[List[X]]
,IO
将是一个批次。
你能做的最好的事情是这样的:
def processByBatches(process: List[Int] => IO[Unit]): IO[Unit]
也就是说,你的用户会给你一个操作来为每个批处理执行,你会给他们一个 IO
将阻塞当前光纤,直到使用该函数消耗整个流。
实现这种功能的最简单方法是:
def processByBatches(process: List[Int] => IO[Unit]): IO[Unit] =
getStreamFromThirdParty
.chunkN(n = ChunkSize)
.evalMap(chunk => process(chunk.toList))
.compile
.drain