使用 akka 流对数据库进行 Tcp 请求

Tcp request to the database with akka streams

我正在尝试使用 akka-streams 的 Tcp 客户端向数据库发送查询,但我不明白我错过了什么。

所以我有两种类型 QueryResponse 可以与 akka 的 ByteString 完美转换。所以我正在用 val conn = Tcp().outgoingConnection("localhost", 28015) 创建一个客户端连接,这给了我一个 Flow[ByteString, ByteString, Future[OutgoingConnection]],到目前为止一切顺利。所以我假设源是我对查询的请求,我找不到用查询源提供这个流的最佳方法,而是像 Source(Future.successful(query)) 一样构造它,并将它连接到流 source.via(flow),这给了我另一个 Source[Response, Unit]。在这里我不明白如何获得 Future[Response],尝试了几个组合器,但它给了我 Materialized 值,我不完全理解它与 values/types 的关系流.

首先:这是什么数据库,为什么要直接通过TCP连接?您确定这会按照您缩进的方式工作吗?您能处理传入响应的框架吗?

至于你关于从 Source[Response, Unit] 中得到 Future[Response] 的问题,它就像 运行 带有 Sink.head 的来源一样简单,即像这样:val res: Future[Result] = source.runWith(Sink.head)(你当然需要范围内的 implicit val mat = ActorMaterializer())。

我强烈建议您先花一些时间了解 Akka Streams documentation,然后再深入使用 Streams。

您可以在 Flow 上使用 join 方法。来自文档:

Join this Flow to another Flow, by cross connecting the inputs and outputs, creating a RunnableGraph.

+------+        +-------+
|      | ~Out~> |       |
| this |        | other |
|      | <~In~  |       |
+------+        +-------+

这允许您将连接的输出连接到 Flow 的输入,并将 Flow 的输出连接到连接输入。

具体来说,您可以获取从 outgoingConnection 生成的 Flow 并将其与您创建的 Flow 结合起来以响应查询:

def queryDB(query : ByteString) : Future[ByteString] = ???

val concurrentQueries = 10

val queryResponder = 
  Flow[ByteString].mapAsync(concurrentQueries)(queryDB)

val server : String = ???
val port : Int = ???

//from the diagram above:
//this = connection
//other = queryResponder
Tcp().outgoingConnection(server, port).join(queryResponder).run()