使用多个 ConnectionPool(s) 的正确方法
Correct way of using multiple ConnectionPool(s)
在我的应用程序中,我必须与多个 MySQL
数据库逐一交互(只读)。对于每个数据库,我需要一定数量的连接。与数据库的交互不会发生在 单一延伸 中:我查询数据库,花一些时间处理结果,再次查询数据库,再次处理结果等等。
这些交互中的每一个都需要多个连接[我同时触发多个查询],因此我需要一个ConnectionPool
当我开始与数据库交互时产生并一直存在,直到我完成对该数据库的所有查询(包括我不查询时的临时时间间隔,只处理结果)。
我能够成功创建具有所需连接数的 ConnectionPool
并获得如下所示的 implicit session
def createConnectionPool(poolSize: Int): DBSession = {
implicit val session: AutoSession.type = AutoSession
ConnectionPool.singleton(
url = "myUrl",
user = "myUser",
password = "***",
settings = ConnectionPoolSettings(initialSize = poolSize)
)
session
}
然后我在需要与 DB 交互的整个方法中传递这个 implicit session
。这样,我就可以使用这个 session
触发 poolSize
没有查询 同时 。很公平。
def methodThatCallsAnotherMethod(implicit session: DBSession): Unit = {
...
methodThatInteractsWithDb
...
}
def methodThatInteractsWithDb(implicit session: DBSession): Unit = {
...
getResultsParallely(poolSize = 32, fetchSize = 2000000)
...
}
def getResultsParallely(poolSize: Int, fetchSize: Int)(implicit session: DBSession): Seq[ResultClass] = {
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(poolSize))
val resultsSequenceFuture: Seq[Future[ResultClass]] = {
(0 until poolSize).map { i =>
val limit: Long = fetchSize
val offset: Long = i * fetchSize
Future(methodThatMakesSingleQuery(limit, offset))
}
}
val resultsFutureSequence: Future[Seq[ResultClass]] = Future.sequence(resultsSequenceFuture)
Await.result(resultsFuture, 2.minutes)
}
此技术有 2 个问题:
- 我的应用程序很大并且有很多嵌套方法调用,因此通过所有方法(见下文)传递
implicit session
是不可行的。
- 除了上述与不同数据库的逐个交互之外,我还需要在整个应用程序的整个生命周期中与另一个(固定的)数据库建立单一连接。此连接将用于每隔几分钟进行一次 小型写入操作 n( 记录 我与其他数据库交互的进度)。因此,我需要多个
ConnectionPool
,每个 DB
根据我对 ScalikeJdbc
的 docs 的理解,我想出了以下方法,不需要我在任何地方都传递 implicit session
。
def createConnectionPool(poolName: String, poolSize: Int): Unit = {
ConnectionPool.add(
name = poolName,
url = "myUrl",
user = "myUser",
password = "***",
settings = ConnectionPoolSettings(initialSize = poolSize)
)
}
def methodThatInteractsWithDb(poolName: String): Unit = {
...
(DB(ConnectionPool.get(poolName).borrow())).readOnly { implicit session: DBSession =>
// interact with DB
...
}
...
}
虽然这可行,但我无法再并行化 数据库交互。这种行为很明显,因为我使用的是 borrow()
方法,该方法从 池 获取 单个连接 。反过来,这让我想知道为什么 AutoSession
的东西更早起作用:为什么我能够使用单个 implicit session
同时触发多个查询?如果那件事奏效了,那为什么这行不通呢?但是我没有找到如何从支持多个连接的 ConnectionPool
中获取 DBSession
的示例。
总而言之,我有 2 个问题和 2 个解决方案:每个问题一个。但我需要一个解决这两个问题的单一(通用)解决方案。
ScalikeJdbc
有限的文档没有提供很多帮助,ScalikeJdbc
上的博客/文章几乎不存在。
请提出正确的方法/一些解决方法。
框架版本
Scala 2.11.11
"org.scalikejdbc" %% "scalikejdbc" % "3.2.0"
感谢 , I was able to figure out 释放从 ScalikeJdbc
的 ConnectionPool
借来的连接。可以这样操作:
import scalikejdbc.{ConnectionPool, using}
import java.sql.Connection
using(ConnectionPool.get("poolName").borrow()) { (connection: Connection) =>
// use connection (only once) here
}
// connection automatically returned to pool
有了这个,我现在可以并行化与池的交互了。
为了解决我管理多个 ConnectionPool
和跨多个 class
使用连接的问题,我最终写了一个 ConnectionPoolManager
,可以找到完整的代码 here。通过卸载
的任务
- 创建池
- 从池中借用连接
- 删除池
我可以在我的项目中的任何地方使用的 singleton
对象,我能够清除很多混乱并消除了跨方法链传递 implicit session
的需要。
EDIT-1
虽然我已经 linked ConnectionPoolManager
的完整代码,但这里有一个关于如何着手的快速提示
ConnectionPoolManager
的以下方法可让您借用 ConnectionPool
s
的连接
def getDB(dbName: String, poolNameOpt: Option[String] = None): DB = {
// create a pool for db (only) if it doesn't exist
addPool(dbName, poolNameOpt)
val poolName: String = poolNameOpt.getOrElse(dbName)
DB(ConnectionPool.get(poolName).borrow())
}
此后,在整个代码中,您可以使用上述方法从池中借用连接并进行查询
def makeQuery(dbName: String, poolNameOpt: Option[String]) = {
ConnectionPoolManager.getDB(dbName, poolNameOpt).localTx { implicit session: DBSession =>
// perform ScalikeJdbc SQL query here
}
}
在我的应用程序中,我必须与多个 MySQL
数据库逐一交互(只读)。对于每个数据库,我需要一定数量的连接。与数据库的交互不会发生在 单一延伸 中:我查询数据库,花一些时间处理结果,再次查询数据库,再次处理结果等等。
这些交互中的每一个都需要多个连接[我同时触发多个查询],因此我需要一个ConnectionPool
当我开始与数据库交互时产生并一直存在,直到我完成对该数据库的所有查询(包括我不查询时的临时时间间隔,只处理结果)。
我能够成功创建具有所需连接数的 ConnectionPool
并获得如下所示的 implicit session
def createConnectionPool(poolSize: Int): DBSession = {
implicit val session: AutoSession.type = AutoSession
ConnectionPool.singleton(
url = "myUrl",
user = "myUser",
password = "***",
settings = ConnectionPoolSettings(initialSize = poolSize)
)
session
}
然后我在需要与 DB 交互的整个方法中传递这个 implicit session
。这样,我就可以使用这个 session
触发 poolSize
没有查询 同时 。很公平。
def methodThatCallsAnotherMethod(implicit session: DBSession): Unit = {
...
methodThatInteractsWithDb
...
}
def methodThatInteractsWithDb(implicit session: DBSession): Unit = {
...
getResultsParallely(poolSize = 32, fetchSize = 2000000)
...
}
def getResultsParallely(poolSize: Int, fetchSize: Int)(implicit session: DBSession): Seq[ResultClass] = {
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(poolSize))
val resultsSequenceFuture: Seq[Future[ResultClass]] = {
(0 until poolSize).map { i =>
val limit: Long = fetchSize
val offset: Long = i * fetchSize
Future(methodThatMakesSingleQuery(limit, offset))
}
}
val resultsFutureSequence: Future[Seq[ResultClass]] = Future.sequence(resultsSequenceFuture)
Await.result(resultsFuture, 2.minutes)
}
此技术有 2 个问题:
- 我的应用程序很大并且有很多嵌套方法调用,因此通过所有方法(见下文)传递
implicit session
是不可行的。 - 除了上述与不同数据库的逐个交互之外,我还需要在整个应用程序的整个生命周期中与另一个(固定的)数据库建立单一连接。此连接将用于每隔几分钟进行一次 小型写入操作 n( 记录 我与其他数据库交互的进度)。因此,我需要多个
ConnectionPool
,每个 DB
根据我对 ScalikeJdbc
的 docs 的理解,我想出了以下方法,不需要我在任何地方都传递 implicit session
。
def createConnectionPool(poolName: String, poolSize: Int): Unit = {
ConnectionPool.add(
name = poolName,
url = "myUrl",
user = "myUser",
password = "***",
settings = ConnectionPoolSettings(initialSize = poolSize)
)
}
def methodThatInteractsWithDb(poolName: String): Unit = {
...
(DB(ConnectionPool.get(poolName).borrow())).readOnly { implicit session: DBSession =>
// interact with DB
...
}
...
}
虽然这可行,但我无法再并行化 数据库交互。这种行为很明显,因为我使用的是 borrow()
方法,该方法从 池 获取 单个连接 。反过来,这让我想知道为什么 AutoSession
的东西更早起作用:为什么我能够使用单个 implicit session
同时触发多个查询?如果那件事奏效了,那为什么这行不通呢?但是我没有找到如何从支持多个连接的 ConnectionPool
中获取 DBSession
的示例。
总而言之,我有 2 个问题和 2 个解决方案:每个问题一个。但我需要一个解决这两个问题的单一(通用)解决方案。
ScalikeJdbc
有限的文档没有提供很多帮助,ScalikeJdbc
上的博客/文章几乎不存在。
请提出正确的方法/一些解决方法。
框架版本
Scala 2.11.11
"org.scalikejdbc" %% "scalikejdbc" % "3.2.0"
感谢 ScalikeJdbc
的 ConnectionPool
借来的连接。可以这样操作:
import scalikejdbc.{ConnectionPool, using}
import java.sql.Connection
using(ConnectionPool.get("poolName").borrow()) { (connection: Connection) =>
// use connection (only once) here
}
// connection automatically returned to pool
有了这个,我现在可以并行化与池的交互了。
为了解决我管理多个 ConnectionPool
和跨多个 class
使用连接的问题,我最终写了一个 ConnectionPoolManager
,可以找到完整的代码 here。通过卸载
- 创建池
- 从池中借用连接
- 删除池
我可以在我的项目中的任何地方使用的 singleton
对象,我能够清除很多混乱并消除了跨方法链传递 implicit session
的需要。
EDIT-1
虽然我已经 linked ConnectionPoolManager
的完整代码,但这里有一个关于如何着手的快速提示
ConnectionPoolManager
的以下方法可让您借用 ConnectionPool
s
def getDB(dbName: String, poolNameOpt: Option[String] = None): DB = {
// create a pool for db (only) if it doesn't exist
addPool(dbName, poolNameOpt)
val poolName: String = poolNameOpt.getOrElse(dbName)
DB(ConnectionPool.get(poolName).borrow())
}
此后,在整个代码中,您可以使用上述方法从池中借用连接并进行查询
def makeQuery(dbName: String, poolNameOpt: Option[String]) = {
ConnectionPoolManager.getDB(dbName, poolNameOpt).localTx { implicit session: DBSession =>
// perform ScalikeJdbc SQL query here
}
}