使用多个 ConnectionPool(s) 的正确方法

Correct way of using multiple ConnectionPool(s)

在我的应用程序中,我必须与多个 MySQL 数据库逐一交互(只读)。对于每个数据库,我需要一定数量的连接。与数据库的交互不会发生在 单一延伸 中:我查询数据库,花一些时间处理结果,再次查询数据库,再次处理结果等等。

这些交互中的每一个都需要多个连接[我同时触发多个查询],因此我需要一个ConnectionPool当我开始与数据库交互时产生并一直存在,直到我完成对该数据库的所有查询(包括我不查询时的临时时间间隔,只处理结果)。


我能够成功创建具有所需连接数的 ConnectionPool 并获得如下所示的 implicit session

def createConnectionPool(poolSize: Int): DBSession = {
 implicit val session: AutoSession.type = AutoSession

 ConnectionPool.singleton(
   url = "myUrl",
   user = "myUser",
   password = "***",
   settings = ConnectionPoolSettings(initialSize = poolSize)
 )

 session
}

然后我在需要与 DB 交互的整个方法中传递这个 implicit session。这样,我就可以使用这个 session 触发 poolSize 没有查询 同时 。很公平。

def methodThatCallsAnotherMethod(implicit session: DBSession): Unit = {
  ...
  methodThatInteractsWithDb
  ...
}

def methodThatInteractsWithDb(implicit session: DBSession): Unit = {
  ...
  getResultsParallely(poolSize = 32, fetchSize = 2000000)
  ...
}

def getResultsParallely(poolSize: Int, fetchSize: Int)(implicit session: DBSession): Seq[ResultClass] = {
  import java.util.concurrent.Executors
  import scala.concurrent.ExecutionContext
  import scala.concurrent.duration._

  implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(poolSize))

  val resultsSequenceFuture: Seq[Future[ResultClass]] = {
    (0 until poolSize).map { i =>
      val limit: Long = fetchSize
      val offset: Long = i * fetchSize

      Future(methodThatMakesSingleQuery(limit, offset))
    }
  }
  val resultsFutureSequence: Future[Seq[ResultClass]] = Future.sequence(resultsSequenceFuture)

  Await.result(resultsFuture, 2.minutes)
}

此技术有 2 个问题:

  1. 我的应用程序很大并且有很多嵌套方法调用,因此通过所有方法(见下文)传递 implicit session 是不可行的。
  2. 除了上述与不同数据库的逐个交互之外,我还需要在整个应用程序的整个生命周期中与另一个(固定的)数据库建立单一连接。此连接将用于每隔几分钟进行一次 小型写入操作 n( 记录 我与其他数据库交互的进度)。因此,我需要多个 ConnectionPool,每个 DB

根据我对 ScalikeJdbcdocs 的理解,我想出了以下方法,不需要我在任何地方都传递 implicit session

def createConnectionPool(poolName: String, poolSize: Int): Unit = {
  ConnectionPool.add(
    name = poolName,
    url = "myUrl",
    user = "myUser",
    password = "***",
    settings = ConnectionPoolSettings(initialSize = poolSize)
  )
}

def methodThatInteractsWithDb(poolName: String): Unit = {
  ...
  (DB(ConnectionPool.get(poolName).borrow())).readOnly { implicit session: DBSession =>
    // interact with DB
    ...
  }
  ...
}

虽然这可行,但我无法再并行化 数据库交互。这种行为很明显,因为我使用的是 borrow() 方法,该方法从 获取 单个连接 。反过来,这让我想知道为什么 AutoSession 的东西更早起作用:为什么我能够使用单个 implicit session 同时触发多个查询?如果那件事奏效了,那为什么这行不通呢?但是我没有找到如何从支持多个连接的 ConnectionPool 中获取 DBSession 的示例。


总而言之,我有 2 个问题和 2 个解决方案:每个问题一个。但我需要一个解决这两个问题的单一(通用)解决方案。

ScalikeJdbc 有限的文档没有提供很多帮助,ScalikeJdbc 上的博客/文章几乎不存在。 请提出正确的方法/一些解决方法。


框架版本

感谢 , I was able to figure out 释放从 ScalikeJdbcConnectionPool 借来的连接。可以这样操作:

import scalikejdbc.{ConnectionPool, using}
import java.sql.Connection

using(ConnectionPool.get("poolName").borrow()) { (connection: Connection) =>
    // use connection (only once) here
}
// connection automatically returned to pool

有了这个,我现在可以并行化与池的交互了。


为了解决我管理多个 ConnectionPool 和跨多个 class 使用连接的问题,我最终写了一个 ConnectionPoolManager,可以找到完整的代码 here。通过卸载

的任务
  • 创建池
  • 从池中借用连接
  • 删除池

我可以在我的项目中的任何地方使用的 singleton 对象,我能够清除很多混乱并消除了跨方法链传递 implicit session 的需要。


EDIT-1

虽然我已经 linked ConnectionPoolManager 的完整代码,但这里有一个关于如何着手的快速提示

ConnectionPoolManager 的以下方法可让您借用 ConnectionPools

的连接
def getDB(dbName: String, poolNameOpt: Option[String] = None): DB = {
  // create a pool for db (only) if it doesn't exist
  addPool(dbName, poolNameOpt)

  val poolName: String = poolNameOpt.getOrElse(dbName)
  DB(ConnectionPool.get(poolName).borrow())
}

此后,在整个代码中,您可以使用上述方法从池中借用连接并进行查询

def makeQuery(dbName: String, poolNameOpt: Option[String]) = {
  ConnectionPoolManager.getDB(dbName, poolNameOpt).localTx { implicit session: DBSession =>
    // perform ScalikeJdbc SQL query here
  }
}