Scala Slick:插入失败并出现 RejectedExecutionException

Scala Slick: Insert fails with RejectedExecutionException

我有以下 类 运行 针对 MySQL 数据库:

case class User(val userId: Option[String], val firstName: String, val lastName: String, val phoneNum: String, val email: Option[String]) {
}

class Users(tag: Tag) extends Table[User](tag, "USERS") {

  def userId = column[String]("USER_ID", O.PrimaryKey)
  def firstName = column[String]("FIRST_NAME")
  def lastName = column[String]("LAST_NAME")
  def phoneNum = column[String]("PHONE_NUM")
  def email = column[String]("EMAIL")

  def * = (userId.?, firstName, lastName, phoneNum, email.?) <> (User.tupled, User.unapply)
}

class MySQLUserRepository(private val db: Database)(implicit val executor: ExecutionContextExecutor) extends UserService {
  val users = TableQuery[Users]
  def findByFirstName(firstName: String): Future[immutable.Seq[User]] = {
    val query = users.filter { _.firstName === firstName }

    runAndThenCleanUp(query)
  }

  private def runAndThenCleanUp(query: Query[Users, User, Seq]): Future[immutable.Seq[User]] = {
    try db.run(query.result).map { _.toList } finally db.close
  }

  def createUser(user: User) = {
    val createAction: DBIO[Option[Int]] = users ++= Seq(user)

    db.run(createAction.asTry.map {
      _ match {
        case Success(res) => res.map { _ => user }
        case Failure(e) => println(e); None
      }
    })
  }
}

并测试:

class MySQLUserRepositorySpec extends fixture.FlatSpec with Matchers with BeforeAndAfterAll with ScalaFutures {
  private val userRepository = new MySQLUserRepository(db)(global)

  implicit val defaultPatience = PatienceConfig(timeout = Span(5, Seconds), interval = Span(500, Millis))

  val query = TableQuery[Users]

  type FixtureParam = User

  def withFixture(test: OneArgTest) = {
    val users = userRepository.findByFirstName("John")

    users.futureValue shouldBe empty

    dumpAllUsers

    val testUser = User(Some("1"), "John", "Doe", "111-111-1111", Some("john.doe@gmail.com"))

    val newUser = userRepository.createUser(testUser)

    newUser.onFailure {case NonFatal(ex) => ex.printStackTrace }
    val user = newUser.futureValue
    user shouldBe defined

    println("Before test")
    dumpAllUsers

    try {
      println("Running test")
      withFixture(test.toNoArgTest(user.get)) // "loan" the fixture to the test
    } finally { // clean up the fixture
      println("After test")
      dumpAllUsers

//      try db.run(query.delete) finally db.close
    }
  }

  override def afterAll() {
    println("Cleaning up")
//    try db.run(query.delete) finally db.close
  }

  private def dumpAllUsers = {
    println("Printing all users")
    (query.result).map { _.toList }
  }

  it should "find user with first name" in { testUser =>
    val users = userRepository.findByFirstName(testUser.firstName)
    verifySingleUser(users.futureValue)
  }
}

我一直收到异常:

2015-09-12 00:45:14.592 [ScalaTest-main-running-MySQLUserRepositorySpec] [DEBUG] s.b.D.action - #1: [fused] asTry
java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon@43da41e rejected from java.util.concurrent.ThreadPoolExecutor@148c7c4b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)

我看了 and ,没用。我究竟做错了什么?我怀疑 Slick 错误地将 PK 列映射为可为空(由于 Option 类型)。

Table DDL:

CREATE TABLE `akka`.`users` (
  `user_id` VARCHAR(16) NOT NULL COMMENT '',
  `first_name` VARCHAR(50) NOT NULL COMMENT '',
  `last_name` VARCHAR(50) NOT NULL COMMENT '',
  `phone_num` VARCHAR(25) NOT NULL COMMENT '',
  `email` VARCHAR(50) NULL COMMENT '',
  PRIMARY KEY (`user_id`)  COMMENT '',
  UNIQUE INDEX `phone_num_UNIQUE` (`phone_num` ASC)  COMMENT '',
  UNIQUE INDEX `email_UNIQUE` (`email` ASC)  COMMENT '');

问题出在您的 runAndThenCleanUp 方法上。 db 有一个数据库连接池,db.close 关闭db.run 向执行池提交请求,但在执行请求之前,您关闭了池。

不必担心在您的各个方法调用中清理连接 - 只需确保在您的应用程序关闭时调用 db.close 让池正常关闭即可。 Slick(和引擎盖下的 HikariCP)将为您管理连接。

简而言之,改变这个:

private def runAndThenCleanUp(query: Query[Users, User, Seq]): Future[immutable.Seq[User]] = {
  try db.run(query.result).map { _.toList } finally db.close
}

对此:

private def runQuery(query: Query[Users, User, Seq]): Future[immutable.Seq[User]] = {
    db.run(query.result).map { _.toList }
}