在 scala-stream 流中组合 akka-http 流的最佳方法是什么

What is the best way to combine akka-http flow in a scala-stream flow

我有一个用例,在 Akka-stream 的 n 流之后,我必须获取其中一个的结果并向 HTTP REST 发出请求 API.

最后一个akka-stream流类型,在HTTP请求之前是一个字符串:

val stream1:Flow[T,String,NotUsed] = Flow[T].map(_.toString)

现在,应该指定 HTTP 请求,我想到了这样的事情:

val stream2: Flow[String,Future[HttpRespone],NotUsed] = Flow[String].map(param => Http.singleRequest(HttpRequest(uri=s"host.com/$param")))

然后合并:

val stream3 = stream1 via stream2

这是最好的方法吗?你们实际上会推荐哪些方式,为什么?这个用例范围内的几个最佳实践示例会很棒!

提前致谢:)

您的实施将为每个新参数创建到 "host.com" 的新连接。这是不必要的,并且会阻止 akka 进行某些优化。在引擎盖下,akka 实际上保留了一个连接池以重用打开的连接,但我认为最好在代码中指定你的意图而不是依赖底层实现。

您可以按照 documentation:

中的说明建立单个连接
val connectionFlow: Flow[HttpRequest, HttpResponse, _] = 
  Http().outgoingConnection("host.com")

要利用此连接流,您需要将字符串路径转换为 ​​HttpRequest 对象:

import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Path

def pathToRequest(path : String) = HttpRequest(uri=Uri.Empty.withPath(Path(path)))

val reqFlow = Flow[String] map pathToRequest

最后,将所有流程粘合在一起:

val stream3 = stream1 via reqFlow via connectionFlow

这是使用不同请求对象连续查询同一服务器的最常见模式。