如何在 akka-streams map 与 mapAsync 中使用异步驱动程序
How to use asynchronous drivers in akka-streams map vs mapAsync
我刚刚开始使用 reactivecouchbase 异步数据库驱动程序,但正在 运行 处理一些基本的设计问题。
在传统的方法中,我会通过限制与数据库的连接数来限制我对数据库施加的压力。但是,使用异步驱动程序,我可以用新查询淹没数据库吗?
在以下示例中,这变得很重要。
假设我有两种不同的调用数据库的方法。
我的函数调用数据库:
asyncCallDB: Future[DBResponse]
blockingCallDB: DBResponse
现在我想将数据库调用映射到一个流上,我可以在其中使用两个不同的函数:
Flow.map()
Flow.mapAsync(numberOfConcurrentCalls)()
现在我的问题是你会如何select调用数据库:
Flow.map(blockingCallDB) //One call at a time with back preassure
Flow.map(asyncCallDB) //Unlimited calls floods db no back pressure?
Flow.mapAsync(numberOfConcurrentCalls)(blockingCallDB) //Up to numberOfConcurrentCalls at the same time with back pressure
Flow.mapAsync(numberOfConcurrentCalls)(asyncCallDB) //Unlimited calls floods db no back pressure?
我觉得我的理解不足,想理解这种决定。
ReactiveCouchbase 使用 AsyncHttpClient for communication with Couchbase server(s). As you can see in the source code 它调用 setMaximumConnectionsTotal
,它限制并发连接数。实际值取决于您在 couchbase.http.maxTotalConnections
.
中配置的内容
您每创建一个 CouchbaseBucket
,就有一个 AsyncHttpClient
。所以每个 CouchbaseBucket
.
最多有 maxTotalConnections
个连接
来自 Couchbase documentation on N1QL REST API:
The REST API runs synchronously, so once execution of the statement in
the request is started, results are streamed back to the client,
terminating when execution of the statement finishes.
因此实际上每个存储桶的并发查询数量限制为 maxTotalConnections
。
因此,DB 上的背压总是以某种方式受到限制。要么是因为您将 maxTotalConnections
设置为非负数,要么是因为您的客户端由于 RAM 或文件描述符数量有限而无法创建更多连接。
但是,仍然有可能创建太多 Future
,这样您的客户端就会 运行 内存不足。每当您认为可能是这种情况时,您应该使用 mapAsync
,如前所述 , or one of the other techniques mentioned in "Buffers and working with rate" (Akka documentation).
有一个 good description of mapAsync
in the Akka documentation:
Pass incoming elements to a function that return a Future result. When
the future arrives the result is passed downstream. Up to n elements can be processed concurrently...
请记住 Flow.mapAsync
本身不会 运行 任何东西,它只是 returns 一个 Flow
,你必须在 [=23= 之间连接] 和 Sink
,然后是 run
。 Akka Quick Start Guide 以一种非常易于理解的方式描述了这一点。
我刚刚开始使用 reactivecouchbase 异步数据库驱动程序,但正在 运行 处理一些基本的设计问题。 在传统的方法中,我会通过限制与数据库的连接数来限制我对数据库施加的压力。但是,使用异步驱动程序,我可以用新查询淹没数据库吗?
在以下示例中,这变得很重要。
假设我有两种不同的调用数据库的方法。
我的函数调用数据库:
asyncCallDB: Future[DBResponse]
blockingCallDB: DBResponse
现在我想将数据库调用映射到一个流上,我可以在其中使用两个不同的函数:
Flow.map()
Flow.mapAsync(numberOfConcurrentCalls)()
现在我的问题是你会如何select调用数据库:
Flow.map(blockingCallDB) //One call at a time with back preassure
Flow.map(asyncCallDB) //Unlimited calls floods db no back pressure?
Flow.mapAsync(numberOfConcurrentCalls)(blockingCallDB) //Up to numberOfConcurrentCalls at the same time with back pressure
Flow.mapAsync(numberOfConcurrentCalls)(asyncCallDB) //Unlimited calls floods db no back pressure?
我觉得我的理解不足,想理解这种决定。
ReactiveCouchbase 使用 AsyncHttpClient for communication with Couchbase server(s). As you can see in the source code 它调用 setMaximumConnectionsTotal
,它限制并发连接数。实际值取决于您在 couchbase.http.maxTotalConnections
.
您每创建一个 CouchbaseBucket
,就有一个 AsyncHttpClient
。所以每个 CouchbaseBucket
.
maxTotalConnections
个连接
来自 Couchbase documentation on N1QL REST API:
The REST API runs synchronously, so once execution of the statement in the request is started, results are streamed back to the client, terminating when execution of the statement finishes.
因此实际上每个存储桶的并发查询数量限制为 maxTotalConnections
。
因此,DB 上的背压总是以某种方式受到限制。要么是因为您将 maxTotalConnections
设置为非负数,要么是因为您的客户端由于 RAM 或文件描述符数量有限而无法创建更多连接。
但是,仍然有可能创建太多 Future
,这样您的客户端就会 运行 内存不足。每当您认为可能是这种情况时,您应该使用 mapAsync
,如前所述
有一个 good description of mapAsync
in the Akka documentation:
Pass incoming elements to a function that return a Future result. When the future arrives the result is passed downstream. Up to n elements can be processed concurrently...
请记住 Flow.mapAsync
本身不会 运行 任何东西,它只是 returns 一个 Flow
,你必须在 [=23= 之间连接] 和 Sink
,然后是 run
。 Akka Quick Start Guide 以一种非常易于理解的方式描述了这一点。