如何在不具体化的情况下将 return 源队列用于调用方?
How can I use and return Source queue to caller without materializing it?
我正在尝试使用新的 Akka 流,想知道如何使用 return 源队列到调用者而不在我的代码中实现它?
假设我们有一个库,它通过 Source
进行异步调用和 returns 结果。函数看起来像这样
def findArticlesByTitle(text: String): Source[String, SourceQueue[String]] = {
val source = Source.queue[String](100, backpressure)
source.mapMaterializedValue { case queue =>
val url = s"http://.....&term=$text"
httpclient.get(url).map(httpResponseToSprayJson[SearchResponse]).map { v =>
v.idlist.foreach { id =>
queue.offer(id)
}
queue.complete()
}
}
source
}
调用者可能会这样使用它
// There is implicit ActorMaterializer somewhere
val stream = plugin.findArticlesByTitle(title)
val results = stream.runFold(List[String]())((result, article) => article :: result)
当我 运行 时,mapMaterializedValue
中的这段代码永远不会执行。
我不明白为什么我不能访问 SourceQueue
的实例,如果应该由调用者决定如何具体化源的话。
我该如何实施?
在您的代码示例中,您正在 returning source 而不是 source.mapMaterializedValue
的 return 值(方法调用不会改变 Source 对象)。
我正在尝试使用新的 Akka 流,想知道如何使用 return 源队列到调用者而不在我的代码中实现它?
假设我们有一个库,它通过 Source
进行异步调用和 returns 结果。函数看起来像这样
def findArticlesByTitle(text: String): Source[String, SourceQueue[String]] = {
val source = Source.queue[String](100, backpressure)
source.mapMaterializedValue { case queue =>
val url = s"http://.....&term=$text"
httpclient.get(url).map(httpResponseToSprayJson[SearchResponse]).map { v =>
v.idlist.foreach { id =>
queue.offer(id)
}
queue.complete()
}
}
source
}
调用者可能会这样使用它
// There is implicit ActorMaterializer somewhere
val stream = plugin.findArticlesByTitle(title)
val results = stream.runFold(List[String]())((result, article) => article :: result)
当我 运行 时,mapMaterializedValue
中的这段代码永远不会执行。
我不明白为什么我不能访问 SourceQueue
的实例,如果应该由调用者决定如何具体化源的话。
我该如何实施?
在您的代码示例中,您正在 returning source 而不是 source.mapMaterializedValue
的 return 值(方法调用不会改变 Source 对象)。