并发流不向控制台打印任何内容

Concurrent Streams Does't Print Anything to Console

我正在尝试构建一个关于在 fs2 中使用 Stream.concurrently 方法的示例。我正在开发 producer/consumer 模式,使用 Queue 作为共享状态:

import cats.effect.std.{Queue, Random}

object Fs2Tutorial extends IOApp {
  val random: IO[Random[IO]] = Random.scalaUtilRandom[IO]
  val queue: IO[Queue[IO, Int]] = Queue.bounded[IO, Int](10)

  val producer: IO[Nothing] = for {
    r <- random
    q <- queue
    p <-
      r.betweenInt(1, 11)
      .flatMap(q.offer)
      .flatTap(_ => IO.sleep(1.second))
      .foreverM
  } yield p

  val consumer: IO[Nothing] = for {
    q <- queue
    c <- q.take.flatMap { n =>
      IO.println(s"Consumed $n")
    }.foreverM
  } yield c

  val concurrently: Stream[IO, Nothing] = Stream.eval(producer).concurrently(Stream.eval(consumer))

  override def run(args: List[String]): IO[ExitCode] = {
    concurrently.compile.drain.as(ExitCode.Success)
  }
}

我希望程序打印一些 "Consumed n",一些 n。但是,该程序不会向控制台打印任何内容。

上面的代码有什么问题?

What's wrong with the above code?

您没有在消费者和生产者中使用相同的 Queue,而是他们每个人都在创建自己的新独立 Queue (同样发生在 Random顺便说一句)

这是新手常犯的错误,因为他们还没有掌握 IO

等数据类型背后的主要原则

当您执行 val queue: IO[Queue[IO, Int]] = Queue.bounded[IO, Int](10) 时,您是在说 queue 是一个程序,在评估时将产生类型 Queue[IO, Unit] 的值,这就是所有这一切的重点。
该程序成为一个值,作为任何值,您可以以任何方式操纵它以产生新值,例如使用 flatMap 所以当 consumerproducer 都通过 flatMapping queue 他们都创建了新的独立程序/值。

您可以像这样修复该代码:

import cats.effect.{IO, IOApp}
import cats.effect.std.{Queue, Random}
import cats.syntax.all._
import fs2.Stream

import scala.concurrent.duration._

object Fs2Tutorial extends IOApp.Simple {  
  override final val run: IO[Unit] = {
    val resources =
      (
        Random.scalaUtilRandom[IO],
        Queue.bounded[IO, Int](10)
      ).tupled
    
    val concurrently =
      Stream.eval(resources).flatMap {
        case (random, queue) =>
          val producer = 
            Stream
              .fixedDelay[IO](1.second)
              .evalMap(_ => random.betweenInt(1, 11))
              .evalMap(queue.offer)

        val consumer =
          Stream.fromQueueUnterminated(queue).evalMap(n => IO.println(s"Consumed $n"))
        
        producer.concurrently(consumer)
      }
    
    concurrently.interruptAfter(10.seconds).compile.drain >> IO.println("Finished!")
  }
}

(可以看到运行 here).


PS:我建议您查看 Fabio Labella 的 “程序作为价值” 系列:https://systemfw.org/archive.html