喜欢流 - Source.fromPublisher

Akka Stream - Source.fromPublisher

我正在尝试根据 akka 流快速入门指南执行以下代码:

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()

val songs = Source.fromPublisher(SongsService.stream)

val count: Flow[Song, Int, NotUsed] = Flow[Song].map(_ => 1)

val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)

val counterGraph: RunnableGraph[Future[Int]] =
  songs
    .via(count)
    .toMat(sumSink)(Keep.right)

val sum: Future[Int] = counterGraph.run()

sum.foreach(c => println(s"Total songs processed: $c"))

这里的问题是未来永远不会 return 结果。与文档示例最大的区别是我的来源。

我有一个游戏枚举器,我正在将其转换为 Akka Publisher,结果是 SongsService.stream

当使用定义的列表作为来源时,例如:

val songs = Source(list)

有效,但使用 Source.fromPublisher 无效。

但这里确实不是发布者的问题,我可以做一个简单的操作就可以了:

val songs = Source.fromPublisher(SongsService.stream)
songs.runForeach(println)

它遍历数据库,创建游戏枚举器,将其转换为发布者,然后我可以迭代。

有什么想法吗?

您的发布商可能永远不会完成。