使用 Anorm 从 mariadb 读取序列化对象

Reading serialized object from mariadb with Anorm

我已经用普通 JDBC 实现了读取序列化对象,现在想使用 Anorm 2.3.8。

Plain JDBC Scala 代码如下:

def loadModel(rName: String, rPdbCode: String) = {
  //Connection Initialization
    Class.forName("org.mariadb.jdbc.Driver")
    val jdbcUrl = s"jdbc:mysql://172.17.0.2:3306/db_profile?user=root&password=root"
    val connection = DriverManager.getConnection(jdbcUrl)

    //Reading Pre-trained model from Database
    var model: InductiveClassifier[MLlibSVM, LabeledPoint] = null
    if (!(connection.isClosed())) {

      val sqlRead = connection.prepareStatement("SELECT r_model FROM MODELS WHERE r_name = ? and r_pdbCode = ?")
      sqlRead.setString(1, rName)
      sqlRead.setString(2, rPdbCode)
      val rs = sqlRead.executeQuery()
      rs.next()

      val modelStream = rs.getObject("r_model").asInstanceOf[Array[Byte]]
      val modelBaip = new ByteArrayInputStream(modelStream)
      val modelOis = new ObjectInputStream(modelBaip)
      model = modelOis.readObject().asInstanceOf[InductiveClassifier[MLlibSVM, LabeledPoint]]

      rs.close
      sqlRead.close
      connection.close()
    } else {
      println("MariaDb Connection is Close")
      System.exit(1)
    }
    model
}

现在我想用 amAnorm 加载我的序列化模型,这样一切都与我的应用程序的其余部分一致,我只使用默认连接。

以下是我的努力,但我无法将 Stream[Row] 转换为 Array[Byte],因为它会引发异常:

ClassCastException: scala.collection.immutable.Stream$Cons cannot be cast to [B]

Anorm 2.3.8代码如下

def loadModel(rName: String, rPdbCode: String) = {
    //Connection Initialization
    var model: InductiveClassifier[MLlibSVM, LabeledPoint] = null
    DB.withConnection { implicit c =>
      val results = SQL(
        """
          | SELECT r_model
          | FROM MODELS
          | WHERE r_name={r_name} 
          | AND r_pdbCode={r_pdbCode};
        """.stripMargin).on(
          "r_name" -> rName,
          "r_pdbCode" -> rPdbCode).apply()
      val byteArray = results.asInstanceOf[Array[Byte]]
      val modelBaip = new ByteArrayInputStream(byteArray)
      val modelOis = new ObjectInputStream(modelBaip)
      model = modelOis.readObject().asInstanceOf[InductiveClassifier[MLlibSVM, LabeledPoint]]

    }

    model
}

此代码来自 Anorm 2.5,但希望它与之前的代码没有什么不同。假设我们将 Model 定义为

case class Model(i: Int, s: String) extends Serializable

这里是 loadModel 和使用简化的 table

调用它的测试包装器 testLoadModel
import anorm._
import java.io._

def loadModel(rName: String): Model = {
  db.withConnection { implicit c =>
    val result = SQL"""
        SELECT r_model
        FROM MODELS
        WHERE r_name=${rName}
      """.as(SqlParser.byteArray("r_model").single)

    Logger.info(s"result ${result.getClass} => $result")
    deserialize[Model](result)
  }
}

def serialize(obj: Serializable): Array[Byte] = {
  val outBuf = new ByteArrayOutputStream()
  val out = new ObjectOutputStream(outBuf)
  out.writeObject(obj)
  out.flush()
  outBuf.toByteArray
}

def deserialize[T](byteArray: Array[Byte]): T = {
  val ois = new ObjectInputStream(new ByteArrayInputStream(byteArray))
  ois.readObject().asInstanceOf[T]
}

def testLoadModel(): Unit = {
  db.withConnection { implicit c =>
    val createRes = SQL(
      """
        |DROP TABLE   IF EXISTS MODELS;
        |
        |CREATE TABLE MODELS(
        | r_name  VARCHAR(50) PRIMARY KEY NOT NULL,
        | r_model  VARBINARY NOT NULL
        |         );
      """.
        stripMargin).execute()
    Logger.info(s"Create result = $createRes")
    val rName = "rName"
    val m0 = Model(42, "Abc")
    val ser0 = serialize(m0)

    val insertRes = SQL(
      """
        | insert into MODELS values ({r_name},{r_model})
      """
        .stripMargin).on("r_name" -> rName, "r_model" -> ser0).executeInsert()
    Logger.info(s"Insert result = $insertRes")

    val m1 = loadModel(rName)
    Logger.info(s"m0 = $m0")
    Logger.info(s"m1 = $m1")
  }
}

主要技巧似乎在于使用 .as(SqlParser.byteArray("r_model").single)

请注意,如果无法保证记录存在,您可能希望使用 .singleOpt。 此外,在您的查询中使用 LIMIT 1 SQL clause 以获得更好的性能可能是有意义的。