如何将元素排入队列然后将它们出队?
How to enqueue elements to a queue and then dequeue them?
以下片段的导入和隐含:
import cats.effect.{ConcurrentEffect, ContextShift, IO, Timer}
import fs2.Stream
import fs2.concurrent.Queue
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
implicit val ec: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
implicit val c: ConcurrentEffect[IO] = IO.ioConcurrentEffect
implicit val t: Timer[IO] = IO.timer(ec)
目标是将整数 1 到 3 添加到 fs2 Queue
,然后将它们出队。第一次尝试:
val s1 = for {
q <- Stream.eval(Queue.noneTerminated[IO, Int]) // Create unbounded queue that terminates when it gets None
_ <- Stream(1,2,3).map(Some(_)).through(q.enqueue) // Enqueue 1 to 3
_ <- Stream.eval(q.enqueue1(None)) // Terminate queue
outStream <- q.dequeue // Get dequeue stream
} yield outStream
s1.compile.toList.unsafeRunSync()
这个returnsList(1)
。不知道为什么。
第二次尝试:
val s2 = for {
q <- Stream.eval(Queue.noneTerminated[IO, Int]) // Create unbounded queue that terminates when it gets None
_ <- Stream(
Stream(1,2,3).map(Some(_)).through(q.enqueue), // Enqueue 1 to 3
Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None)) // Wait 1 second and terminate stream
).parJoin(2) // Run both streams in parallel
outStream <- q.dequeue // Get dequeue stream
} yield outStream
s2.compile.toList.unsafeRunSync()
这个returnsList(1,2)
。也不知道为什么。
为什么这些示例返回了它们返回的内容?正确的做法是什么?
看看你在这里实际定义了什么:
val s1 = for {
q <- Stream.eval(Queue.noneTerminated[IO, Int])
_ <- Stream(1,2,3).map(Some(_)).through(q.enqueue)
_ <- Stream.eval(q.enqueue1(None))
outStream <- q.dequeue // Get dequeue stream
} yield outStream
这与
相同
Stream.eval(Queue.noneTerminated[IO, Int]).flatMap { q =>
Stream(1,2,3).map(Some(_)).through(q.enqueue).flatMap { _ =>
Stream.eval(q.enqueue1(None)).flatMap { _ =>
q.dequeue
}
}
}
- 您在
Stream
中创建了一个队列
- 然后对于该流的每个元素(对于每个
Queue
),您将 3 个 Some
元素加入队列 - 懒惰意味着直到需要该元素时才会评估副作用
- 然后对于每个 enque,您还将
None
放入队列
- 在到达出队部分之前,您基本上创建了一个
Some(1), None, Some(2), None, Some(3), None
流!因为它是 none 终止队列,它在第一个 None
之后停止,所以你最终得到 Stream(1)
评估为 List(1)
同时你有第二个例子
val s2 = for {
q <- Stream.eval(Queue.noneTerminated[IO, Int])
_ <- Stream(
Stream(1,2,3).map(Some(_)).through(q.enqueue),
Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None))
).parJoin(2)
outStream <- q.dequeue
} yield outStream
等于
Stream.eval(Queue.noneTerminated[IO, Int]).flatMap { q =>
Stream(
Stream(1,2,3).map(Some(_)).through(q.enqueue),
Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None))
).parJoin(2).flatMap { _ =>
q.dequeue
}
}
- 您在
Stream
中创建了一个队列
- 然后为该流的每个元素(每个
Queue
)创建一个较小的流
- 您正在排队
Some
1、2、3 延迟 在一个流中
- 你排队
None
另一个
- 你不确定地结合两个流,所以当
Some
的序列将被 None
打断时,你没有强有力的保证,在你的测试中,环境使它如此在评估第一个流的 2 个元素后发生
你的基本错误是交替使用 Stream[IO, *]
和 IO
:当你在 Stream
上创建 flatMap
时,下一行不会在前一行的所有元素之后计算line 已经被评估,但是在上一行的每个元素都被评估之后。毕竟 Stream
是(懒惰的,有副作用的)集合,所以在理解方面它的行为更像 (Lazy
)List
当涉及到操作顺序时。
如果您没有评估一切 Stream
它会按您预期的那样工作:
val createQueue: IO[NoneTerminatedQueue[IO, Int]] = Queue.noneTerminated[IO, Int]
def enqueueValues(queue: NoneTerminatedQueue[IO, Int]) =
Stream(1,2,3).map(Some(_)).through(queue.enqueue) ++ Stream.eval(queue.enqueue1(None))
def dequeueValues(queue: NoneTerminatedQueue[IO, Int]) =
queue.dequeue
// these are IO, not Stream[IO, *] !!!
val io = for {
queue <- createQueue
_ <- enqueueValues(queue).compile.drain.start
result <- dequeueValues(queue).compile.toList
} yield result
io.runUnsafeSync
以下片段的导入和隐含:
import cats.effect.{ConcurrentEffect, ContextShift, IO, Timer}
import fs2.Stream
import fs2.concurrent.Queue
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
implicit val ec: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
implicit val c: ConcurrentEffect[IO] = IO.ioConcurrentEffect
implicit val t: Timer[IO] = IO.timer(ec)
目标是将整数 1 到 3 添加到 fs2 Queue
,然后将它们出队。第一次尝试:
val s1 = for {
q <- Stream.eval(Queue.noneTerminated[IO, Int]) // Create unbounded queue that terminates when it gets None
_ <- Stream(1,2,3).map(Some(_)).through(q.enqueue) // Enqueue 1 to 3
_ <- Stream.eval(q.enqueue1(None)) // Terminate queue
outStream <- q.dequeue // Get dequeue stream
} yield outStream
s1.compile.toList.unsafeRunSync()
这个returnsList(1)
。不知道为什么。
第二次尝试:
val s2 = for {
q <- Stream.eval(Queue.noneTerminated[IO, Int]) // Create unbounded queue that terminates when it gets None
_ <- Stream(
Stream(1,2,3).map(Some(_)).through(q.enqueue), // Enqueue 1 to 3
Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None)) // Wait 1 second and terminate stream
).parJoin(2) // Run both streams in parallel
outStream <- q.dequeue // Get dequeue stream
} yield outStream
s2.compile.toList.unsafeRunSync()
这个returnsList(1,2)
。也不知道为什么。
为什么这些示例返回了它们返回的内容?正确的做法是什么?
看看你在这里实际定义了什么:
val s1 = for {
q <- Stream.eval(Queue.noneTerminated[IO, Int])
_ <- Stream(1,2,3).map(Some(_)).through(q.enqueue)
_ <- Stream.eval(q.enqueue1(None))
outStream <- q.dequeue // Get dequeue stream
} yield outStream
这与
相同Stream.eval(Queue.noneTerminated[IO, Int]).flatMap { q =>
Stream(1,2,3).map(Some(_)).through(q.enqueue).flatMap { _ =>
Stream.eval(q.enqueue1(None)).flatMap { _ =>
q.dequeue
}
}
}
- 您在
Stream
中创建了一个队列
- 然后对于该流的每个元素(对于每个
Queue
),您将 3 个Some
元素加入队列 - 懒惰意味着直到需要该元素时才会评估副作用 - 然后对于每个 enque,您还将
None
放入队列 - 在到达出队部分之前,您基本上创建了一个
Some(1), None, Some(2), None, Some(3), None
流!因为它是 none 终止队列,它在第一个None
之后停止,所以你最终得到Stream(1)
评估为List(1)
同时你有第二个例子
val s2 = for {
q <- Stream.eval(Queue.noneTerminated[IO, Int])
_ <- Stream(
Stream(1,2,3).map(Some(_)).through(q.enqueue),
Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None))
).parJoin(2)
outStream <- q.dequeue
} yield outStream
等于
Stream.eval(Queue.noneTerminated[IO, Int]).flatMap { q =>
Stream(
Stream(1,2,3).map(Some(_)).through(q.enqueue),
Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None))
).parJoin(2).flatMap { _ =>
q.dequeue
}
}
- 您在
Stream
中创建了一个队列
- 然后为该流的每个元素(每个
Queue
)创建一个较小的流- 您正在排队
Some
1、2、3 延迟 在一个流中 - 你排队
None
另一个 - 你不确定地结合两个流,所以当
Some
的序列将被None
打断时,你没有强有力的保证,在你的测试中,环境使它如此在评估第一个流的 2 个元素后发生
- 您正在排队
你的基本错误是交替使用 Stream[IO, *]
和 IO
:当你在 Stream
上创建 flatMap
时,下一行不会在前一行的所有元素之后计算line 已经被评估,但是在上一行的每个元素都被评估之后。毕竟 Stream
是(懒惰的,有副作用的)集合,所以在理解方面它的行为更像 (Lazy
)List
当涉及到操作顺序时。
如果您没有评估一切 Stream
它会按您预期的那样工作:
val createQueue: IO[NoneTerminatedQueue[IO, Int]] = Queue.noneTerminated[IO, Int]
def enqueueValues(queue: NoneTerminatedQueue[IO, Int]) =
Stream(1,2,3).map(Some(_)).through(queue.enqueue) ++ Stream.eval(queue.enqueue1(None))
def dequeueValues(queue: NoneTerminatedQueue[IO, Int]) =
queue.dequeue
// these are IO, not Stream[IO, *] !!!
val io = for {
queue <- createQueue
_ <- enqueueValues(queue).compile.drain.start
result <- dequeueValues(queue).compile.toList
} yield result
io.runUnsafeSync