如何将元素排入队列然后将它们出队?

How to enqueue elements to a queue and then dequeue them?

以下片段的导入和隐含:

import cats.effect.{ConcurrentEffect, ContextShift, IO, Timer}
import fs2.Stream
import fs2.concurrent.Queue

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._

implicit val ec: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
implicit val c: ConcurrentEffect[IO] = IO.ioConcurrentEffect
implicit val t: Timer[IO] = IO.timer(ec)

目标是将整数 1 到 3 添加到 fs2 Queue,然后将它们出队。第一次尝试:

val s1 = for {
  q <- Stream.eval(Queue.noneTerminated[IO, Int]) // Create unbounded queue that terminates when it gets None
  _ <- Stream(1,2,3).map(Some(_)).through(q.enqueue) // Enqueue 1 to 3
  _ <- Stream.eval(q.enqueue1(None)) // Terminate queue
  outStream <- q.dequeue // Get dequeue stream
} yield outStream

s1.compile.toList.unsafeRunSync()

这个returnsList(1)。不知道为什么。

第二次尝试:

val s2 = for {
  q <- Stream.eval(Queue.noneTerminated[IO, Int]) // Create unbounded queue that terminates when it gets None
  _ <- Stream(
    Stream(1,2,3).map(Some(_)).through(q.enqueue), // Enqueue 1 to 3
    Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None)) // Wait 1 second and terminate stream
  ).parJoin(2) // Run both streams in parallel
  outStream <- q.dequeue // Get dequeue stream
} yield outStream 

s2.compile.toList.unsafeRunSync()

这个returnsList(1,2)。也不知道为什么。

为什么这些示例返回了它们返回的内容?正确的做法是什么?

看看你在这里实际定义了什么:

val s1 = for {
  q <- Stream.eval(Queue.noneTerminated[IO, Int])
  _ <- Stream(1,2,3).map(Some(_)).through(q.enqueue)
  _ <- Stream.eval(q.enqueue1(None))
  outStream <- q.dequeue // Get dequeue stream
} yield outStream

这与

相同
Stream.eval(Queue.noneTerminated[IO, Int]).flatMap { q =>

  Stream(1,2,3).map(Some(_)).through(q.enqueue).flatMap { _ =>

    Stream.eval(q.enqueue1(None)).flatMap { _ =>
      q.dequeue
    }
  }
}
  • 您在 Stream
  • 中创建了一个队列
  • 然后对于该流的每个元素(对于每个 Queue),您将 3 个 Some 元素加入队列 - 懒惰意味着直到需要该元素时才会评估副作用
  • 然后对于每个 enque,您还将 None 放入队列
  • 在到达出队部分之前,您基本上创建了一个 Some(1), None, Some(2), None, Some(3), None 流!因为它是 none 终止队列,它在第一个 None 之后停止,所以你最终得到 Stream(1) 评估为 List(1)

同时你有第二个例子

val s2 = for {
  q <- Stream.eval(Queue.noneTerminated[IO, Int]) 
  _ <- Stream(
    Stream(1,2,3).map(Some(_)).through(q.enqueue), 
    Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None)) 
  ).parJoin(2) 
  outStream <- q.dequeue 
} yield outStream 

等于

Stream.eval(Queue.noneTerminated[IO, Int]).flatMap { q =>

  Stream(
    Stream(1,2,3).map(Some(_)).through(q.enqueue), 
    Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None)) 
  ).parJoin(2).flatMap { _ =>
    q.dequeue 
  }
}
  • 您在 Stream
  • 中创建了一个队列
  • 然后为该流的每个元素(每个 Queue)创建一个较小的流
    • 您正在排队 Some 1、2、3 延迟 在一个流中
    • 你排队 None 另一个
    • 你不确定地结合两个流,所以当 Some 的序列将被 None 打断时,你没有强有力的保证,在你的测试中,环境使它如此在评估第一个流的 2 个元素后发生

你的基本错误是交替使用 Stream[IO, *]IO:当你在 Stream 上创建 flatMap 时,下一行不会在前一行的所有元素之后计算line 已经被评估,但是在上一行的每个元素都被评估之后。毕竟 Stream 是(懒惰的,有副作用的)集合,所以在理解方面它的行为更像 (Lazy)List 当涉及到操作顺序时。

如果您没有评估一切 Stream 它会按您预期的那样工作:

val createQueue: IO[NoneTerminatedQueue[IO, Int]] = Queue.noneTerminated[IO, Int]

def enqueueValues(queue: NoneTerminatedQueue[IO, Int]) =
  Stream(1,2,3).map(Some(_)).through(queue.enqueue) ++ Stream.eval(queue.enqueue1(None))

def dequeueValues(queue: NoneTerminatedQueue[IO, Int]) =
  queue.dequeue

// these are IO, not Stream[IO, *] !!!
val io = for {
  queue <- createQueue
  _ <- enqueueValues(queue).compile.drain.start
  result <- dequeueValues(queue).compile.toList
} yield result

io.runUnsafeSync