如何通过ZStream读取ZHub的消息?
How to read messages from ZHub via ZStream?
我是 ZHub 和 ZStream 的新手,想熟悉他们的 API。
不幸的是,我无法使这个简单的示例起作用:
for
hub <- Hub.bounded[String](4)
stream = ZStream.fromHub(hub)
_ <- hub.publish("Hello")
_ <- hub.publish("World")
collected <- stream.runCollect
_ <- ZIO.foreach(collected) { msg => console.putStrLn(msg) }
yield
()
我怀疑这个程序没有终止,因为我正在尝试收集无限流。我还尝试使用 stream.tap(...)
打印消息或关闭集线器。没有任何帮助。
我在这里错过了什么?感谢任何帮助,谢谢。
@adamgfraser 在 GitHub 上提供了一个工作示例:
import zio._
import zio.stream._
object Example extends App {
def run(args: List[String]): URIO[ZEnv, ExitCode] =
for {
promise <- Promise.make[Nothing, Unit]
hub <- Hub.bounded[String](2)
stream = ZStream.managed(hub.subscribe).flatMap { queue =>
ZStream.fromEffect(promise.succeed(())) *>
ZStream.fromQueue(queue)
}
fiber <- stream.take(2).runCollect.fork
_ <- promise.await
_ <- hub.publish("Hello")
_ <- hub.publish("World")
collected <- fiber.join
_ <- ZIO.foreach(collected)(console.putStrLn(_)).orDie
} yield ExitCode.success
}
我的错误是在等待订阅完成之前将值发布到中心。
我是 ZHub 和 ZStream 的新手,想熟悉他们的 API。
不幸的是,我无法使这个简单的示例起作用:
for
hub <- Hub.bounded[String](4)
stream = ZStream.fromHub(hub)
_ <- hub.publish("Hello")
_ <- hub.publish("World")
collected <- stream.runCollect
_ <- ZIO.foreach(collected) { msg => console.putStrLn(msg) }
yield
()
我怀疑这个程序没有终止,因为我正在尝试收集无限流。我还尝试使用 stream.tap(...)
打印消息或关闭集线器。没有任何帮助。
我在这里错过了什么?感谢任何帮助,谢谢。
@adamgfraser 在 GitHub 上提供了一个工作示例:
import zio._
import zio.stream._
object Example extends App {
def run(args: List[String]): URIO[ZEnv, ExitCode] =
for {
promise <- Promise.make[Nothing, Unit]
hub <- Hub.bounded[String](2)
stream = ZStream.managed(hub.subscribe).flatMap { queue =>
ZStream.fromEffect(promise.succeed(())) *>
ZStream.fromQueue(queue)
}
fiber <- stream.take(2).runCollect.fork
_ <- promise.await
_ <- hub.publish("Hello")
_ <- hub.publish("World")
collected <- fiber.join
_ <- ZIO.foreach(collected)(console.putStrLn(_)).orDie
} yield ExitCode.success
}
我的错误是在等待订阅完成之前将值发布到中心。