Kafka json 反序列化器中的 Scala classOf 泛型类型

Scala classOf generic type in Kafka json deserializer

我正在使用 Jackson 在 scala 中编写一个 kafka json 反序列化器,但是在提供 jackson 的 readValue() 方法 class 泛型时遇到了问题。例如:

...
import org.apache.kafka.common.serialization.Deserializer

class JsonDeserializer[T] extends Deserializer[Option[T]] {

  val mapper = (new ObjectMapper() with ScalaObjectMapper)
    .registerModule(DefaultScalaModule)
    .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    .registerModule(new JavaTimeModule())
    .findAndRegisterModules()
    .asInstanceOf[ObjectMapper with ScalaObjectMapper]

  def deserialize(topic: String, bytes: Array[Byte]): Option[T] = {
    Option(bytes) match {
      case Some(b) => Some(mapper.readValue(bytes, classOf[T]))
      case None => None
    }

  }

  def configure(configs: java.util.Map[String, _], isKey: Boolean) {}
  def close(): Unit = {}
}

注意 deserialize 方法中的 mapper.readValue(bytes, classOf[T])。编译失败 "class type required but T found".

如何做到这一点?

Java 中的通用类型在运行时被删除,因此如果不显式传递它就无法恢复 Class

好吧,就 Java 而言,这是明确的。您可以在构造时使用 (implicit ct: ClassTag[T]) 或 shorthand [T: ClassTag](隐含地)获得 ClassTag,这允许您稍后检索 Class

import scala.reflect._

class JsonDeserializer[T: ClassTag] extends Deserializer[Option[T]] {
  ...
    mapper.readValue(bytes, classTag[T].runtimeClass.asInstanceOf[Class[T]])