喜欢流 - Source.fromPublisher
Akka Stream - Source.fromPublisher
我正在尝试根据 akka 流快速入门指南执行以下代码:
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val songs = Source.fromPublisher(SongsService.stream)
val count: Flow[Song, Int, NotUsed] = Flow[Song].map(_ => 1)
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val counterGraph: RunnableGraph[Future[Int]] =
songs
.via(count)
.toMat(sumSink)(Keep.right)
val sum: Future[Int] = counterGraph.run()
sum.foreach(c => println(s"Total songs processed: $c"))
这里的问题是未来永远不会 return 结果。与文档示例最大的区别是我的来源。
我有一个游戏枚举器,我正在将其转换为 Akka Publisher,结果是 SongsService.stream
当使用定义的列表作为来源时,例如:
val songs = Source(list)
有效,但使用 Source.fromPublisher 无效。
但这里确实不是发布者的问题,我可以做一个简单的操作就可以了:
val songs = Source.fromPublisher(SongsService.stream)
songs.runForeach(println)
它遍历数据库,创建游戏枚举器,将其转换为发布者,然后我可以迭代。
有什么想法吗?
您的发布商可能永远不会完成。
我正在尝试根据 akka 流快速入门指南执行以下代码:
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val songs = Source.fromPublisher(SongsService.stream)
val count: Flow[Song, Int, NotUsed] = Flow[Song].map(_ => 1)
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val counterGraph: RunnableGraph[Future[Int]] =
songs
.via(count)
.toMat(sumSink)(Keep.right)
val sum: Future[Int] = counterGraph.run()
sum.foreach(c => println(s"Total songs processed: $c"))
这里的问题是未来永远不会 return 结果。与文档示例最大的区别是我的来源。
我有一个游戏枚举器,我正在将其转换为 Akka Publisher,结果是 SongsService.stream
当使用定义的列表作为来源时,例如:
val songs = Source(list)
有效,但使用 Source.fromPublisher 无效。
但这里确实不是发布者的问题,我可以做一个简单的操作就可以了:
val songs = Source.fromPublisher(SongsService.stream)
songs.runForeach(println)
它遍历数据库,创建游戏枚举器,将其转换为发布者,然后我可以迭代。
有什么想法吗?
您的发布商可能永远不会完成。