我的 fs2 scala 程序没有按预期执行
My fs2 scala program is not executing as expected
我正在尝试按顺序将流的元素从一个队列传递到另一个队列,以一种队列可以一次接收单个元素的方式。推送到队列的结果应该被拉取并传递到下一个队列进行计算。该代码未显示任何执行错误,但它会在第一个队列(拉和推)处停止。谁能解释一下我错过了什么??
import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class StreamTypeIntToDouble(q1: Queue[IO, Int], q2: Queue[IO, String])(
implicit timer: Timer[IO]
) {
val streamData = Stream(1, 2, 3).covary[IO]
val scheduledStream = Stream.fixedDelay(10.seconds) >> streamData
def storeInQueueFirst: Stream[IO, Unit] = {
scheduledStream
.evalTap(n => IO.delay(println(s"Pushing $n to Queue First")))
.metered(Random.between(1, 20).seconds)
.through(q1.enqueue)
}
def getFromQueueFirst: Stream[IO, Int] = {
q1.dequeue
.evalTap(n => IO.delay(println(s"Pulling from queue First $n")))
}
def storeInQueueSecond(s: Stream[IO, Int]): Stream[IO, Unit] = {
s.map { n =>
n.toString
}
.metered(Random.between(1, 20).seconds)
.evalTap(n => IO.delay(println(s"Pushing to queue second $n")))
.through(q2.enqueue)
}
def getFromQueueSecond: Stream[IO, Unit] = {
q2.dequeue
.evalMap(_ => IO.delay(println("Pulling element from queue second")))
}
}
object FiveTest extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q1 <- Queue.bounded[IO, Int](10)
q2 <- Queue.bounded[IO, String](10)
b = new StreamTypeIntToDouble(q1, q2)
_ <- b.storeInQueueFirst.compile.drain.start
a <- b.getFromQueueFirst.compile.lastOrError
_ <- b.storeInQueueSecond(Stream(a)).compile.drain
_ <- b.getFromQueueSecond.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
输出为:
Pushing 1 to Queue First
Pushing 2 to Queue First
Pushing 3 to Queue First
Pulling from queue First 1
Pulling from queue First 2
Pulling from queue First 3
预期输出为:
Pushing 1 to Queue First
Pushing 2 to Queue First
Pushing 3 to Queue First
Pulling from queue First 1
Pulling from queue First 2
Pulling from queue First 3
Pushing to queue second 1
Pushing to queue second 2
Pushing to queue second 3
Pulling from queue second 1
Pulling from queue second 2
Pulling from queue second 3
您似乎不了解流的工作原理。
当您 运行 stream.drain
时,这意味着将处理此流的 thread/fiber/whatver 将等到流终止。
所以当你有
_ <- b.storeInQueueSecond(Stream(a)).compile.drain
你实际上是在阻塞,下一个操作不会开始,直到这一步中的流读取所有元素,认为自己耗尽并关闭。 (并且一旦流关闭,就无法再次创建它 运行,您必须创建一个新的)。
如果你想:
- 将事情放入队列 1
- 订阅队列 1 并将每个元素推送到队列 2
- 订阅队列 2
你应该:
- 并行执行这些步骤 - 这可以使用纤程来实现(
.start
用于在新纤程中开始计算)
- 以这样的方式写入队列,即当所有元素都被接收和流式传输时流将关闭(接收所有元素时的条件必须明确定义,例如 None-终止 Queue/Stream -只要您希望队列保持打开状态,您将发送
Some(value)
,并发送 None
最终关闭它)
只有第二种方法可以让你得到这样的排序输出,但它也会破坏使用流的目的,因为它只是 将元素推入队列 - 关闭流 - 拉取元素 - 关闭stream - push elements - close stream - pull elements - close stream,所以它只作为练习才有意义。如果你走第一种方式,它会是这样的
object FiveTest extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
q1 <- Queue.bounded[IO, Int](10)
q2 <- Queue.bounded[IO, String](10)
b = new StreamTypeIntToDouble(q1, q2)
// q1 is started in a separate fiber to not block the next line
_ <- b.storeInQueueFirst.compile.drain.start
// output from q1 is redirected to q2 and stream is run in a separate fiber
// to not not block running the next operation
_ <- b.storeInQueueSecond(b.getFromQueueFirst).compile.drain.start
// stream from q2 is blocking until stream completes...
_ <- b.getFromQueueSecond.compile.drain
// ... so that we won't return ExitCode if program is still supposed to run
} yield ExitCode.Success
}
}
Pushing 1 to Queue First
Pushing 2 to Queue First
Pulling from queue First 1
Pushing to queue second 1
Pulling element from queue second
Pulling from queue First 2
Pushing to queue second 2
Pulling element from queue second
...
我正在尝试按顺序将流的元素从一个队列传递到另一个队列,以一种队列可以一次接收单个元素的方式。推送到队列的结果应该被拉取并传递到下一个队列进行计算。该代码未显示任何执行错误,但它会在第一个队列(拉和推)处停止。谁能解释一下我错过了什么??
import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class StreamTypeIntToDouble(q1: Queue[IO, Int], q2: Queue[IO, String])(
implicit timer: Timer[IO]
) {
val streamData = Stream(1, 2, 3).covary[IO]
val scheduledStream = Stream.fixedDelay(10.seconds) >> streamData
def storeInQueueFirst: Stream[IO, Unit] = {
scheduledStream
.evalTap(n => IO.delay(println(s"Pushing $n to Queue First")))
.metered(Random.between(1, 20).seconds)
.through(q1.enqueue)
}
def getFromQueueFirst: Stream[IO, Int] = {
q1.dequeue
.evalTap(n => IO.delay(println(s"Pulling from queue First $n")))
}
def storeInQueueSecond(s: Stream[IO, Int]): Stream[IO, Unit] = {
s.map { n =>
n.toString
}
.metered(Random.between(1, 20).seconds)
.evalTap(n => IO.delay(println(s"Pushing to queue second $n")))
.through(q2.enqueue)
}
def getFromQueueSecond: Stream[IO, Unit] = {
q2.dequeue
.evalMap(_ => IO.delay(println("Pulling element from queue second")))
}
}
object FiveTest extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q1 <- Queue.bounded[IO, Int](10)
q2 <- Queue.bounded[IO, String](10)
b = new StreamTypeIntToDouble(q1, q2)
_ <- b.storeInQueueFirst.compile.drain.start
a <- b.getFromQueueFirst.compile.lastOrError
_ <- b.storeInQueueSecond(Stream(a)).compile.drain
_ <- b.getFromQueueSecond.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
输出为:
Pushing 1 to Queue First
Pushing 2 to Queue First
Pushing 3 to Queue First
Pulling from queue First 1
Pulling from queue First 2
Pulling from queue First 3
预期输出为:
Pushing 1 to Queue First
Pushing 2 to Queue First
Pushing 3 to Queue First
Pulling from queue First 1
Pulling from queue First 2
Pulling from queue First 3
Pushing to queue second 1
Pushing to queue second 2
Pushing to queue second 3
Pulling from queue second 1
Pulling from queue second 2
Pulling from queue second 3
您似乎不了解流的工作原理。
当您 运行 stream.drain
时,这意味着将处理此流的 thread/fiber/whatver 将等到流终止。
所以当你有
_ <- b.storeInQueueSecond(Stream(a)).compile.drain
你实际上是在阻塞,下一个操作不会开始,直到这一步中的流读取所有元素,认为自己耗尽并关闭。 (并且一旦流关闭,就无法再次创建它 运行,您必须创建一个新的)。
如果你想:
- 将事情放入队列 1
- 订阅队列 1 并将每个元素推送到队列 2
- 订阅队列 2
你应该:
- 并行执行这些步骤 - 这可以使用纤程来实现(
.start
用于在新纤程中开始计算) - 以这样的方式写入队列,即当所有元素都被接收和流式传输时流将关闭(接收所有元素时的条件必须明确定义,例如 None-终止 Queue/Stream -只要您希望队列保持打开状态,您将发送
Some(value)
,并发送None
最终关闭它)
只有第二种方法可以让你得到这样的排序输出,但它也会破坏使用流的目的,因为它只是 将元素推入队列 - 关闭流 - 拉取元素 - 关闭stream - push elements - close stream - pull elements - close stream,所以它只作为练习才有意义。如果你走第一种方式,它会是这样的
object FiveTest extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
q1 <- Queue.bounded[IO, Int](10)
q2 <- Queue.bounded[IO, String](10)
b = new StreamTypeIntToDouble(q1, q2)
// q1 is started in a separate fiber to not block the next line
_ <- b.storeInQueueFirst.compile.drain.start
// output from q1 is redirected to q2 and stream is run in a separate fiber
// to not not block running the next operation
_ <- b.storeInQueueSecond(b.getFromQueueFirst).compile.drain.start
// stream from q2 is blocking until stream completes...
_ <- b.getFromQueueSecond.compile.drain
// ... so that we won't return ExitCode if program is still supposed to run
} yield ExitCode.Success
}
}
Pushing 1 to Queue First
Pushing 2 to Queue First
Pulling from queue First 1
Pushing to queue second 1
Pulling element from queue second
Pulling from queue First 2
Pushing to queue second 2
Pulling element from queue second
...