如何通过 ZIO 环境在 ZIO 任务之间共享 ZIO 队列

How to share a ZIO Queue between ZIO Tasks via the ZIO Environment

我对 Scala 和 ZIO 有点陌生,运行 遇到了一些奇怪的难题。

我想设置一个包含 ZIO 队列和更高版本的 ZIO 环境 此共享队列中有不同的 ZIO 任务 offertake

我试过这样定义我的环境

    trait MainEnv extends Console with Clock
    {
      val mainQueue = Queue.unbounded[String]
    }

并像这样从单独的任务访问队列

    for {
      env      <- ZIO.environment[MainEnv]
      queue    <- env.mainQueue
      ...

但在我的测试中,我观察到我的每个单独任务都有一个单独的 Queue 实例。
查看 unbounded

的签名
  def unbounded[A]: UIO[Queue[A]]

我观察到它不会立即 return 一个队列,而是 return 一个 return 的效果 一个队列,所以虽然观察到的行为是有道理的,但它根本不是我所希望的,并且 我没有看到获得我想要的行为的明确方法。

对于如何实现我设置不同的目标的任何建议,我将不胜感激 任务通过存储在环境中的共享队列进行通信。


这里是我的代码和输出供参考。

示例执行

bash-3.2$ sbt run
[info] Loading project definition from /private/tmp/env-q/project
[info] Loading settings for project root from build.sbt ...
[info] Set current project to example (in build file:/private/tmp/env-q/)
[info] Compiling 1 Scala source to /private/tmp/env-q/target/scala-2.12/classes ...
[info] Done compiling.
[info] Packaging /private/tmp/env-q/target/scala-2.12/example_2.12-0.0.1-SNAPSHOT.jar ...
[info] Done packaging.
[info] Running example.Main 
env example.Main$$anon@36fbcafd queue zio.Queue$$anon@65b9a444
env example.Main$$anon@36fbcafd queue zio.Queue$$anon@7c050764

(挂在这里 - 注意 env 对象相同但队列对象不同,因此第二个任务被卡住了)

/tmp/env-q/test.scala

这是我的完整测试,它基于 https://www.slideshare.net/jdegoes/zio-queue

幻灯片 37 中的示例
    package example
    import zio.{App, Queue, ZIO}
    import zio.blocking.Blocking
    import zio.clock.Clock
    import zio.console._

    trait MainEnv extends Console with Clock    // environment with queue
    {
      val mainQueue = Queue.unbounded[String]
    }

    object Main extends App                     // main test
    {
      val task1 = for {                         // task to add something to the queue
        env      <- ZIO.environment[MainEnv]
        queue    <- env.mainQueue
        _        <- putStrLn(s"env $env queue $queue")
        _        <- queue.offer("Give me Coffee!")
      } yield ()

      val task2 = for {                         // task to remove+print stuff from queue
        env      <- ZIO.environment[MainEnv]
        queue    <- env.mainQueue
        _        <- putStrLn(s"env $env queue $queue")
        _        <- queue.take.flatMap(putStrLn)
      } yield ()

      val program = ZIO.runtime[MainEnv]        // top level to run both tasks
        .flatMap {
          implicit rts =>
            for {
              _ <- task1.fork
              _ <- task2
            } yield ()
        }

      val runEnv = new MainEnv with Console.Live with Clock.Live

      def run(args: List[String]) =
        program.provide(runEnv).fold(_ => 1, _ => 0)
    }

/tmp/env-q/build.sbt

这是我用的build.sbt

val ZioVersion = "1.0.0-RC13"

lazy val root = (project in file("."))
  .settings(
    organization := "example",
    name := "example",
    version := "0.0.1-SNAPSHOT",
    scalaVersion := "2.12.8",
    scalacOptions ++= Seq("-Ypartial-unification"),
    libraryDependencies ++= Seq(
      "dev.zio"                 %% "zio"                 % ZioVersion,
    ),
    addCompilerPlugin("org.spire-math" %% "kind-projector"     % "0.9.6"),
    addCompilerPlugin("com.olegpy"     %% "better-monadic-for" % "0.2.4")
  )

scalacOptions ++= Seq(
  "-deprecation",               // Emit warning and location for usages of deprecated APIs.
  "-encoding", "UTF-8",         // Specify character encoding used by source files.
  "-language:higherKinds",      // Allow higher-kinded types
  "-language:postfixOps",       // Allows operator syntax in postfix position (deprecated since Scala 2.10)
  "-feature",                   // Emit warning and location for usages of features that should be imported explicitly.
  "-Ypartial-unification",      // Enable partial unification in type constructor inference
  "-Xfatal-warnings",           // Fail the compilation if there are any warnings
)

在 ZIO Core 的 Official Gitter Channel 中,Adam Fraser 建议

You would want to have you environment just have a Queue[String] and then you would want to use a method like provideM with Queue.unbounded to create one queue and provide it to your whole application. That's where provideM as opposed to provide comes in. It let's you satisfy an environment that requires an A by providing a ZIO[A].

深入了解 ZIO 源代码后,在 DefaultTestReporterSpec.scala 中发现了一个有用的示例。

通过将环境定义为

  trait MainEnv extends Console with Clock    // environment with queue
  {
    val mainQueue: Queue[String]
  }

更改我的任务以使用 = 而不是 <- 访问 env.mainQueue(因为 mainQueue 现在是 Queue[String] 而不是 UIO[Queue[String]],删除 runEnv 并在我的测试中更改 run 方法以使用 provideSomeM

  def run(args: List[String]) =
    program.provideSomeM(
      for {
        q <- Queue.unbounded[String]
      } yield new MainEnv with Console.Live with Clock.Live {
        override val mainQueue = q
      }
    ).fold(_ => 1, _ => 0)

我能够得到预期的结果:

sbt:example> run
[info] Running example.Main 
env example.Main$$anon@45bfc0da queue zio.Queue$$anon@13b73d56
env example.Main$$anon@45bfc0da queue zio.Queue$$anon@13b73d56
Give me Coffee!
[success] Total time: 1 s, completed Oct 1, 2019 7:41:47 AM