使用 akka 流对数据库进行 Tcp 请求
Tcp request to the database with akka streams
我正在尝试使用 akka-streams 的 Tcp
客户端向数据库发送查询,但我不明白我错过了什么。
所以我有两种类型 Query
和 Response
可以与 akka 的 ByteString
完美转换。所以我正在用 val conn = Tcp().outgoingConnection("localhost", 28015)
创建一个客户端连接,这给了我一个 Flow[ByteString, ByteString, Future[OutgoingConnection]]
,到目前为止一切顺利。所以我假设源是我对查询的请求,我找不到用查询源提供这个流的最佳方法,而是像 Source(Future.successful(query))
一样构造它,并将它连接到流 source.via(flow)
,这给了我另一个 Source[Response, Unit]
。在这里我不明白如何获得 Future[Response]
,尝试了几个组合器,但它给了我 Materialized
值,我不完全理解它与 values/types 的关系流.
首先:这是什么数据库,为什么要直接通过TCP连接?您确定这会按照您缩进的方式工作吗?您能处理传入响应的框架吗?
至于你关于从 Source[Response, Unit]
中得到 Future[Response]
的问题,它就像 运行 带有 Sink.head
的来源一样简单,即像这样:val res: Future[Result] = source.runWith(Sink.head)
(你当然需要范围内的 implicit val mat = ActorMaterializer()
)。
我强烈建议您先花一些时间了解 Akka Streams documentation,然后再深入使用 Streams。
您可以在 Flow 上使用 join
方法。来自文档:
Join this Flow to another Flow, by cross connecting the inputs and
outputs, creating a RunnableGraph.
+------+ +-------+
| | ~Out~> | |
| this | | other |
| | <~In~ | |
+------+ +-------+
这允许您将连接的输出连接到 Flow 的输入,并将 Flow 的输出连接到连接输入。
具体来说,您可以获取从 outgoingConnection 生成的 Flow
并将其与您创建的 Flow 结合起来以响应查询:
def queryDB(query : ByteString) : Future[ByteString] = ???
val concurrentQueries = 10
val queryResponder =
Flow[ByteString].mapAsync(concurrentQueries)(queryDB)
val server : String = ???
val port : Int = ???
//from the diagram above:
//this = connection
//other = queryResponder
Tcp().outgoingConnection(server, port).join(queryResponder).run()
我正在尝试使用 akka-streams 的 Tcp
客户端向数据库发送查询,但我不明白我错过了什么。
所以我有两种类型 Query
和 Response
可以与 akka 的 ByteString
完美转换。所以我正在用 val conn = Tcp().outgoingConnection("localhost", 28015)
创建一个客户端连接,这给了我一个 Flow[ByteString, ByteString, Future[OutgoingConnection]]
,到目前为止一切顺利。所以我假设源是我对查询的请求,我找不到用查询源提供这个流的最佳方法,而是像 Source(Future.successful(query))
一样构造它,并将它连接到流 source.via(flow)
,这给了我另一个 Source[Response, Unit]
。在这里我不明白如何获得 Future[Response]
,尝试了几个组合器,但它给了我 Materialized
值,我不完全理解它与 values/types 的关系流.
首先:这是什么数据库,为什么要直接通过TCP连接?您确定这会按照您缩进的方式工作吗?您能处理传入响应的框架吗?
至于你关于从 Source[Response, Unit]
中得到 Future[Response]
的问题,它就像 运行 带有 Sink.head
的来源一样简单,即像这样:val res: Future[Result] = source.runWith(Sink.head)
(你当然需要范围内的 implicit val mat = ActorMaterializer()
)。
我强烈建议您先花一些时间了解 Akka Streams documentation,然后再深入使用 Streams。
您可以在 Flow 上使用 join
方法。来自文档:
Join this Flow to another Flow, by cross connecting the inputs and outputs, creating a RunnableGraph.
+------+ +-------+
| | ~Out~> | |
| this | | other |
| | <~In~ | |
+------+ +-------+
这允许您将连接的输出连接到 Flow 的输入,并将 Flow 的输出连接到连接输入。
具体来说,您可以获取从 outgoingConnection 生成的 Flow
并将其与您创建的 Flow 结合起来以响应查询:
def queryDB(query : ByteString) : Future[ByteString] = ???
val concurrentQueries = 10
val queryResponder =
Flow[ByteString].mapAsync(concurrentQueries)(queryDB)
val server : String = ???
val port : Int = ???
//from the diagram above:
//this = connection
//other = queryResponder
Tcp().outgoingConnection(server, port).join(queryResponder).run()