Akka-http 合并传入连接

Akka-http merge incoming connections

我正在尝试使用 akka-streams 和 akka-http 来解决以下问题:

为了将传入连接合并到一个输出,我想出了以下代码:

val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit b: FlowGraph.Builder[Unit] =>
    import FlowGraph.Implicits._

    val merge = b.add(MergePreferred[IncomingConnection](1))

    val inA: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8200)
    val inB: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8201)

    inA ~> merge.preferred
    inB ~> merge.in(0)
           merge.out ~> Sink.foreach(println)

    ClosedShape
}).run()

所以,我有一个源,其中包含来自 A 和 B 的 IncomingConnection 实例。

现在我想以某种方式处理它们,生成响应并将响应发送到相应的连接。

也许有更好的方法来存档所有这些东西,但我在文档或其他人的问题中找不到解决此类问题的任何示例。

我也觉得这个问题很普遍。

在此先感谢您的帮助。

IncomingConnection 对象用于处理请求和 return 响应的方式是使用 handleXXX 方法。示例基于 documentation and the api:

同步函数

//this processor contains your logic for converting a request into a response
def requestProcessor(httpRequest : HttpRequest) : HttpResponse = ???

val connectionSink = Sink.foreach[IncomingConnection] { conn =>
  conn.handleWithSyncHandler(requestProcessor)
}

connectionSink 然后可以在您的图形构造中使用:

inA ~> merge.preferred
inB ~> merge.in(0)
       merge.out ~> connectionSink

异步函数

同样,如果您的处理器是异步的:

val asyncReqProcessor(httpRequest : HttpRequest) : Future[HttpResponse] = ???

val connectionSink = 
  Sink.foreach[IncomingConnection](_ handleWithAsyncHandler asyncProcessor)

流量

最后,您还可以使用 Flow(我的首选方法):

val flowReqProcessor : Flow[HttpRequest, HttpResponse,_] = ???

val connectionSink = 
  Sink.foreach[IncomingConnection](_ handleWith flowReqProcessor )

关于流,我最喜欢的一个特性是您可以在不同的流中反复使用它们。因此,flowReqProcessor 可以是 val 而不是 def.