如何在不具体化的情况下将 return 源队列用于调用方?

How can I use and return Source queue to caller without materializing it?

我正在尝试使用新的 Akka 流,想知道如何使用 return 源队列到调用者而不在我的代码中实现它?

假设我们有一个库,它通过 Source 进行异步调用和 returns 结果。函数看起来像这样

def findArticlesByTitle(text: String): Source[String, SourceQueue[String]] = {

  val source = Source.queue[String](100, backpressure)

  source.mapMaterializedValue { case queue =>

    val url = s"http://.....&term=$text"
    httpclient.get(url).map(httpResponseToSprayJson[SearchResponse]).map { v =>
      v.idlist.foreach { id =>
        queue.offer(id)
      }

      queue.complete()
    }
  }

  source
}

调用者可能会这样使用它

// There is implicit ActorMaterializer somewhere
val stream = plugin.findArticlesByTitle(title)
val results = stream.runFold(List[String]())((result, article) => article :: result)

当我 运行 时,mapMaterializedValue 中的这段代码永远不会执行。

我不明白为什么我不能访问 SourceQueue 的实例,如果应该由调用者决定如何具体化源的话。

我该如何实施?

在您的代码示例中,您正在 returning source 而不是 source.mapMaterializedValue 的 return 值(方法调用不会改变 Source 对象)。