Akka-http 合并传入连接
Akka-http merge incoming connections
我正在尝试使用 akka-streams 和 akka-http 来解决以下问题:
- 我们有 2 个 HTTP 客户端(A 和 B)向 1 个 HTTP 服务器(C)发送请求。双方使用akka-http进行通信
- 与 B 相比,来自 A 的请求具有更高的优先级,但两个请求应该得到同等处理。所以C应该先处理A的请求,B的请求次之
- 当然,我们希望在每一端都启用背压
为了将传入连接合并到一个输出,我想出了以下代码:
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit b: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val merge = b.add(MergePreferred[IncomingConnection](1))
val inA: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8200)
val inB: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8201)
inA ~> merge.preferred
inB ~> merge.in(0)
merge.out ~> Sink.foreach(println)
ClosedShape
}).run()
所以,我有一个源,其中包含来自 A 和 B 的 IncomingConnection 实例。
现在我想以某种方式处理它们,生成响应并将响应发送到相应的连接。
也许有更好的方法来存档所有这些东西,但我在文档或其他人的问题中找不到解决此类问题的任何示例。
我也觉得这个问题很普遍。
在此先感谢您的帮助。
IncomingConnection
对象用于处理请求和 return 响应的方式是使用 handleXXX
方法。示例基于 documentation and the api:
同步函数
//this processor contains your logic for converting a request into a response
def requestProcessor(httpRequest : HttpRequest) : HttpResponse = ???
val connectionSink = Sink.foreach[IncomingConnection] { conn =>
conn.handleWithSyncHandler(requestProcessor)
}
此 connectionSink
然后可以在您的图形构造中使用:
inA ~> merge.preferred
inB ~> merge.in(0)
merge.out ~> connectionSink
异步函数
同样,如果您的处理器是异步的:
val asyncReqProcessor(httpRequest : HttpRequest) : Future[HttpResponse] = ???
val connectionSink =
Sink.foreach[IncomingConnection](_ handleWithAsyncHandler asyncProcessor)
流量
最后,您还可以使用 Flow(我的首选方法):
val flowReqProcessor : Flow[HttpRequest, HttpResponse,_] = ???
val connectionSink =
Sink.foreach[IncomingConnection](_ handleWith flowReqProcessor )
关于流,我最喜欢的一个特性是您可以在不同的流中反复使用它们。因此,flowReqProcessor 可以是 val
而不是 def
.
我正在尝试使用 akka-streams 和 akka-http 来解决以下问题:
- 我们有 2 个 HTTP 客户端(A 和 B)向 1 个 HTTP 服务器(C)发送请求。双方使用akka-http进行通信
- 与 B 相比,来自 A 的请求具有更高的优先级,但两个请求应该得到同等处理。所以C应该先处理A的请求,B的请求次之
- 当然,我们希望在每一端都启用背压
为了将传入连接合并到一个输出,我想出了以下代码:
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit b: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val merge = b.add(MergePreferred[IncomingConnection](1))
val inA: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8200)
val inB: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8201)
inA ~> merge.preferred
inB ~> merge.in(0)
merge.out ~> Sink.foreach(println)
ClosedShape
}).run()
所以,我有一个源,其中包含来自 A 和 B 的 IncomingConnection 实例。
现在我想以某种方式处理它们,生成响应并将响应发送到相应的连接。
也许有更好的方法来存档所有这些东西,但我在文档或其他人的问题中找不到解决此类问题的任何示例。
我也觉得这个问题很普遍。
在此先感谢您的帮助。
IncomingConnection
对象用于处理请求和 return 响应的方式是使用 handleXXX
方法。示例基于 documentation and the api:
同步函数
//this processor contains your logic for converting a request into a response
def requestProcessor(httpRequest : HttpRequest) : HttpResponse = ???
val connectionSink = Sink.foreach[IncomingConnection] { conn =>
conn.handleWithSyncHandler(requestProcessor)
}
此 connectionSink
然后可以在您的图形构造中使用:
inA ~> merge.preferred
inB ~> merge.in(0)
merge.out ~> connectionSink
异步函数
同样,如果您的处理器是异步的:
val asyncReqProcessor(httpRequest : HttpRequest) : Future[HttpResponse] = ???
val connectionSink =
Sink.foreach[IncomingConnection](_ handleWithAsyncHandler asyncProcessor)
流量
最后,您还可以使用 Flow(我的首选方法):
val flowReqProcessor : Flow[HttpRequest, HttpResponse,_] = ???
val connectionSink =
Sink.foreach[IncomingConnection](_ handleWith flowReqProcessor )
关于流,我最喜欢的一个特性是您可以在不同的流中反复使用它们。因此,flowReqProcessor 可以是 val
而不是 def
.