如何通过ZStream读取ZHub的消息?

How to read messages from ZHub via ZStream?

我是 ZHub 和 ZStream 的新手,想熟悉他们的 API。

不幸的是,我无法使这个简单的示例起作用:

for
    hub <- Hub.bounded[String](4)
    stream = ZStream.fromHub(hub)
    _ <- hub.publish("Hello")
    _ <- hub.publish("World")
    collected <- stream.runCollect
    _ <- ZIO.foreach(collected) { msg => console.putStrLn(msg) }
yield
    ()

我怀疑这个程序没有终止,因为我正在尝试收集无限流。我还尝试使用 stream.tap(...) 打印消息或关闭集线器。没有任何帮助。

我在这里错过了什么?感谢任何帮助,谢谢。

@adamgfraser 在 GitHub 上提供了一个工作示例:

import zio._
import zio.stream._

object Example extends App {

  def run(args: List[String]): URIO[ZEnv, ExitCode] =
    for {
      promise <- Promise.make[Nothing, Unit]
      hub     <- Hub.bounded[String](2)
      stream = ZStream.managed(hub.subscribe).flatMap { queue =>
                 ZStream.fromEffect(promise.succeed(())) *>
                   ZStream.fromQueue(queue)
               }
      fiber     <- stream.take(2).runCollect.fork
      _         <- promise.await
      _         <- hub.publish("Hello")
      _         <- hub.publish("World")
      collected <- fiber.join
      _         <- ZIO.foreach(collected)(console.putStrLn(_)).orDie
    } yield ExitCode.success
}

我的错误是在等待订阅完成之前将值发布到中心。