Scala Slick:插入失败并出现 RejectedExecutionException
Scala Slick: Insert fails with RejectedExecutionException
我有以下 类 运行 针对 MySQL 数据库:
case class User(val userId: Option[String], val firstName: String, val lastName: String, val phoneNum: String, val email: Option[String]) {
}
class Users(tag: Tag) extends Table[User](tag, "USERS") {
def userId = column[String]("USER_ID", O.PrimaryKey)
def firstName = column[String]("FIRST_NAME")
def lastName = column[String]("LAST_NAME")
def phoneNum = column[String]("PHONE_NUM")
def email = column[String]("EMAIL")
def * = (userId.?, firstName, lastName, phoneNum, email.?) <> (User.tupled, User.unapply)
}
class MySQLUserRepository(private val db: Database)(implicit val executor: ExecutionContextExecutor) extends UserService {
val users = TableQuery[Users]
def findByFirstName(firstName: String): Future[immutable.Seq[User]] = {
val query = users.filter { _.firstName === firstName }
runAndThenCleanUp(query)
}
private def runAndThenCleanUp(query: Query[Users, User, Seq]): Future[immutable.Seq[User]] = {
try db.run(query.result).map { _.toList } finally db.close
}
def createUser(user: User) = {
val createAction: DBIO[Option[Int]] = users ++= Seq(user)
db.run(createAction.asTry.map {
_ match {
case Success(res) => res.map { _ => user }
case Failure(e) => println(e); None
}
})
}
}
并测试:
class MySQLUserRepositorySpec extends fixture.FlatSpec with Matchers with BeforeAndAfterAll with ScalaFutures {
private val userRepository = new MySQLUserRepository(db)(global)
implicit val defaultPatience = PatienceConfig(timeout = Span(5, Seconds), interval = Span(500, Millis))
val query = TableQuery[Users]
type FixtureParam = User
def withFixture(test: OneArgTest) = {
val users = userRepository.findByFirstName("John")
users.futureValue shouldBe empty
dumpAllUsers
val testUser = User(Some("1"), "John", "Doe", "111-111-1111", Some("john.doe@gmail.com"))
val newUser = userRepository.createUser(testUser)
newUser.onFailure {case NonFatal(ex) => ex.printStackTrace }
val user = newUser.futureValue
user shouldBe defined
println("Before test")
dumpAllUsers
try {
println("Running test")
withFixture(test.toNoArgTest(user.get)) // "loan" the fixture to the test
} finally { // clean up the fixture
println("After test")
dumpAllUsers
// try db.run(query.delete) finally db.close
}
}
override def afterAll() {
println("Cleaning up")
// try db.run(query.delete) finally db.close
}
private def dumpAllUsers = {
println("Printing all users")
(query.result).map { _.toList }
}
it should "find user with first name" in { testUser =>
val users = userRepository.findByFirstName(testUser.firstName)
verifySingleUser(users.futureValue)
}
}
我一直收到异常:
2015-09-12 00:45:14.592 [ScalaTest-main-running-MySQLUserRepositorySpec] [DEBUG] s.b.D.action - #1: [fused] asTry
java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon@43da41e rejected from java.util.concurrent.ThreadPoolExecutor@148c7c4b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
我看了 and ,没用。我究竟做错了什么?我怀疑 Slick 错误地将 PK 列映射为可为空(由于 Option
类型)。
Table DDL:
CREATE TABLE `akka`.`users` (
`user_id` VARCHAR(16) NOT NULL COMMENT '',
`first_name` VARCHAR(50) NOT NULL COMMENT '',
`last_name` VARCHAR(50) NOT NULL COMMENT '',
`phone_num` VARCHAR(25) NOT NULL COMMENT '',
`email` VARCHAR(50) NULL COMMENT '',
PRIMARY KEY (`user_id`) COMMENT '',
UNIQUE INDEX `phone_num_UNIQUE` (`phone_num` ASC) COMMENT '',
UNIQUE INDEX `email_UNIQUE` (`email` ASC) COMMENT '');
问题出在您的 runAndThenCleanUp
方法上。 db
有一个数据库连接池,db.close
关闭。 db.run
向执行池提交请求,但在执行请求之前,您关闭了池。
不必担心在您的各个方法调用中清理连接 - 只需确保在您的应用程序关闭时调用 db.close
让池正常关闭即可。 Slick(和引擎盖下的 HikariCP)将为您管理连接。
简而言之,改变这个:
private def runAndThenCleanUp(query: Query[Users, User, Seq]): Future[immutable.Seq[User]] = {
try db.run(query.result).map { _.toList } finally db.close
}
对此:
private def runQuery(query: Query[Users, User, Seq]): Future[immutable.Seq[User]] = {
db.run(query.result).map { _.toList }
}
我有以下 类 运行 针对 MySQL 数据库:
case class User(val userId: Option[String], val firstName: String, val lastName: String, val phoneNum: String, val email: Option[String]) {
}
class Users(tag: Tag) extends Table[User](tag, "USERS") {
def userId = column[String]("USER_ID", O.PrimaryKey)
def firstName = column[String]("FIRST_NAME")
def lastName = column[String]("LAST_NAME")
def phoneNum = column[String]("PHONE_NUM")
def email = column[String]("EMAIL")
def * = (userId.?, firstName, lastName, phoneNum, email.?) <> (User.tupled, User.unapply)
}
class MySQLUserRepository(private val db: Database)(implicit val executor: ExecutionContextExecutor) extends UserService {
val users = TableQuery[Users]
def findByFirstName(firstName: String): Future[immutable.Seq[User]] = {
val query = users.filter { _.firstName === firstName }
runAndThenCleanUp(query)
}
private def runAndThenCleanUp(query: Query[Users, User, Seq]): Future[immutable.Seq[User]] = {
try db.run(query.result).map { _.toList } finally db.close
}
def createUser(user: User) = {
val createAction: DBIO[Option[Int]] = users ++= Seq(user)
db.run(createAction.asTry.map {
_ match {
case Success(res) => res.map { _ => user }
case Failure(e) => println(e); None
}
})
}
}
并测试:
class MySQLUserRepositorySpec extends fixture.FlatSpec with Matchers with BeforeAndAfterAll with ScalaFutures {
private val userRepository = new MySQLUserRepository(db)(global)
implicit val defaultPatience = PatienceConfig(timeout = Span(5, Seconds), interval = Span(500, Millis))
val query = TableQuery[Users]
type FixtureParam = User
def withFixture(test: OneArgTest) = {
val users = userRepository.findByFirstName("John")
users.futureValue shouldBe empty
dumpAllUsers
val testUser = User(Some("1"), "John", "Doe", "111-111-1111", Some("john.doe@gmail.com"))
val newUser = userRepository.createUser(testUser)
newUser.onFailure {case NonFatal(ex) => ex.printStackTrace }
val user = newUser.futureValue
user shouldBe defined
println("Before test")
dumpAllUsers
try {
println("Running test")
withFixture(test.toNoArgTest(user.get)) // "loan" the fixture to the test
} finally { // clean up the fixture
println("After test")
dumpAllUsers
// try db.run(query.delete) finally db.close
}
}
override def afterAll() {
println("Cleaning up")
// try db.run(query.delete) finally db.close
}
private def dumpAllUsers = {
println("Printing all users")
(query.result).map { _.toList }
}
it should "find user with first name" in { testUser =>
val users = userRepository.findByFirstName(testUser.firstName)
verifySingleUser(users.futureValue)
}
}
我一直收到异常:
2015-09-12 00:45:14.592 [ScalaTest-main-running-MySQLUserRepositorySpec] [DEBUG] s.b.D.action - #1: [fused] asTry
java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon@43da41e rejected from java.util.concurrent.ThreadPoolExecutor@148c7c4b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
我看了Option
类型)。
Table DDL:
CREATE TABLE `akka`.`users` (
`user_id` VARCHAR(16) NOT NULL COMMENT '',
`first_name` VARCHAR(50) NOT NULL COMMENT '',
`last_name` VARCHAR(50) NOT NULL COMMENT '',
`phone_num` VARCHAR(25) NOT NULL COMMENT '',
`email` VARCHAR(50) NULL COMMENT '',
PRIMARY KEY (`user_id`) COMMENT '',
UNIQUE INDEX `phone_num_UNIQUE` (`phone_num` ASC) COMMENT '',
UNIQUE INDEX `email_UNIQUE` (`email` ASC) COMMENT '');
问题出在您的 runAndThenCleanUp
方法上。 db
有一个数据库连接池,db.close
关闭。 db.run
向执行池提交请求,但在执行请求之前,您关闭了池。
不必担心在您的各个方法调用中清理连接 - 只需确保在您的应用程序关闭时调用 db.close
让池正常关闭即可。 Slick(和引擎盖下的 HikariCP)将为您管理连接。
简而言之,改变这个:
private def runAndThenCleanUp(query: Query[Users, User, Seq]): Future[immutable.Seq[User]] = {
try db.run(query.result).map { _.toList } finally db.close
}
对此:
private def runQuery(query: Query[Users, User, Seq]): Future[immutable.Seq[User]] = {
db.run(query.result).map { _.toList }
}