我如何创建告诉 Jackson ObjectMapper 创建泛型类型?
How do I create tell Jackson ObjectMapper to create a generic type?
这对我来说是一个玩 Kafka 和 Scala 的有趣练习。我的目标是创建一个简单的消息类型来发送 kafka 主题。这是我对采用类型参数 [A] 的 generic/re-usable 序列化程序的尝试。
import java.util.{Map => jMap}
import scala.reflect.runtime.universe._
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature._
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
class MySerializer[A : TypeTag]() extends Serializer[A] with Deserializer[A] {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
override def close() = {/*Do Nothing*/}
override def configure(configs: jMap[String, _], isKey: Boolean) = {/*Do Nothing*/}
override def serialize(topic: String, subject: A): Array[Byte] =
mapper.writeValueAsBytes(subject)
override def deserialize(topic: String, bytes: Array[Byte]): A = {
val a: A = mapper.readValue(bytes, A.getClass()) /******PROBLEM****/
return a
}
}
我在反序列化中遇到的错误是 ObjectMapper.readValue 的第二个参数。我要给它什么才能使它 return 我成为通用类型 A?
我的 sbt:
name := "scalafunplay"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.10" % "0.10.2.0",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.8.7",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.8.7"
)
这是我的主要应用程序:
package scalafunplay
object Mistkafer {
def main(args: Array[String]): Unit = {
case class Asset (ruid: String)
val test = new Asset("Dan The Man")
val serializer = new MySerializer[Asset]()
val sampleSerialized = serializer.serialize("test", test)
val sampleUnserialized = serializer.deserialize("test", test)
println("###### RESULT: " + sampleUnserialized)
}
}
我最终决定不使用 Jackson。像这样使用 Java.IO 字节数组和对象 input/output 流更容易:
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
import java.util.{Map => jMap}
import scala.reflect.runtime.universe.TypeTag
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
class ObjectSerializer[A : TypeTag]() extends Serializer[A] with Deserializer[A] {
override def close() = {/*Do Nothing*/}
override def configure(configs: jMap[String, _], isKey: Boolean) = {/*Do Nothing*/}
override def serialize(topic: String, subject: A): Array[Byte] = {
val byteArrInStr = new ByteArrayOutputStream()
val objInpStr = new ObjectOutputStream(byteArrInStr)
objInpStr.writeObject(subject)
byteArrInStr.toByteArray()
}
override def deserialize(topic: String, bytes: Array[Byte]): A = {
val byteArrInStr = new ByteArrayInputStream(bytes)
val objInStr = new ObjectInputStream(byteArrInStr)
objInStr.readObject().asInstanceOf[A]
}
}
这对我来说是一个玩 Kafka 和 Scala 的有趣练习。我的目标是创建一个简单的消息类型来发送 kafka 主题。这是我对采用类型参数 [A] 的 generic/re-usable 序列化程序的尝试。
import java.util.{Map => jMap}
import scala.reflect.runtime.universe._
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature._
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
class MySerializer[A : TypeTag]() extends Serializer[A] with Deserializer[A] {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
override def close() = {/*Do Nothing*/}
override def configure(configs: jMap[String, _], isKey: Boolean) = {/*Do Nothing*/}
override def serialize(topic: String, subject: A): Array[Byte] =
mapper.writeValueAsBytes(subject)
override def deserialize(topic: String, bytes: Array[Byte]): A = {
val a: A = mapper.readValue(bytes, A.getClass()) /******PROBLEM****/
return a
}
}
我在反序列化中遇到的错误是 ObjectMapper.readValue 的第二个参数。我要给它什么才能使它 return 我成为通用类型 A?
我的 sbt:
name := "scalafunplay"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.10" % "0.10.2.0",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.8.7",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.8.7"
)
这是我的主要应用程序:
package scalafunplay
object Mistkafer {
def main(args: Array[String]): Unit = {
case class Asset (ruid: String)
val test = new Asset("Dan The Man")
val serializer = new MySerializer[Asset]()
val sampleSerialized = serializer.serialize("test", test)
val sampleUnserialized = serializer.deserialize("test", test)
println("###### RESULT: " + sampleUnserialized)
}
}
我最终决定不使用 Jackson。像这样使用 Java.IO 字节数组和对象 input/output 流更容易:
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
import java.util.{Map => jMap}
import scala.reflect.runtime.universe.TypeTag
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
class ObjectSerializer[A : TypeTag]() extends Serializer[A] with Deserializer[A] {
override def close() = {/*Do Nothing*/}
override def configure(configs: jMap[String, _], isKey: Boolean) = {/*Do Nothing*/}
override def serialize(topic: String, subject: A): Array[Byte] = {
val byteArrInStr = new ByteArrayOutputStream()
val objInpStr = new ObjectOutputStream(byteArrInStr)
objInpStr.writeObject(subject)
byteArrInStr.toByteArray()
}
override def deserialize(topic: String, bytes: Array[Byte]): A = {
val byteArrInStr = new ByteArrayInputStream(bytes)
val objInStr = new ObjectInputStream(byteArrInStr)
objInStr.readObject().asInstanceOf[A]
}
}